Redis的事务、管道、LUA
一、事务
Redis的事务和我们平常用的关系型数据库的事务是有区别的,至少四大特性中的原子性是不保证的。
1、假如在一个事务中,语法错误,那么Redis检测出来之后,整个事务是回滚的,任何一个都不会执行。
其实这么说不是特别严谨,只是表象看起来是回滚的。Redis就根本没有回滚的概念,就是不支持。只是在有语法错误时,事务中命令都不会执行。
110.138.1.21:6379> multi
OK
110.138.1.21:6379> discard
OK
110.138.1.21:6379> multi
OK
110.138.1.21:6379> set book yesbook
QUEUED
110.138.1.21:6379> set paper yespaper
QUEUED
110.138.1.21:6379> ok
(error) ERR unknown command 'ok'
110.138.1.21:6379> incr book
QUEUED
110.138.1.21:6379> set paper nopaper
QUEUED
110.138.1.21:6379> exec
(error) EXECABORT Transaction discarded because of previous errors.
110.138.1.21:6379> get paper
(nil)
Redis直接就报了上述错误,且discard了。
那么,当在事务中没有语法错误,而是运行错误,即在执行Redis命令的时侯出现的错误,那么Redis还会执行其他的命令,不会像Mysql等数据库那样保证原子性。也就是说Redis并不支持事务的回滚。
比如,给一个存储String类型value的key赋整数,这种就是运行错误。
例子;
110.138.1.21:6379> get book
(nil)
110.138.1.21:6379> multi
OK
110.138.1.21:6379> set book yesook
QUEUED
110.138.1.21:6379> incr book
QUEUED
10.138.1.21:6379> set paper yespaper
QUEUED
110.138.1.21:6379> exec
1) OK
2) (error) ERR value is not an integer or out of range
3) OK
110.138.1.21:6379> get paper
"yespaper"
110.138.1.21:6379> get book
"yesook"
看上面的例子,set paper,set book都正常执行了,只是自增那条命令没有执行。
Redis作者对于不支持回滚这点的解释是复杂的回滚机制实现与Redis所追求的极致性能背道而驰。
Redis的事务是通过一个multiState的一个结构体实现的。对于每一个redis客户端,都有一个该字段。
typedef struct multiState {
multiCmd *commands; /* Array of MULTI commands */
int count; /* Total number of MULTI commands */
int cmd_flags; /* The accumulated command flags OR-ed together.
So if at least a command has a given flag, it
will be set in this field. */
int cmd_inv_flags; /* Same as cmd_flags, OR-ing the ~flags. so that it
is possible to know if all the commands have a
certain flag. */
int minreplicas; /* MINREPLICAS for synchronous replication */
time_t minreplicas_timeout; /* MINREPLICAS timeout as unixtime. */
} multiState;
该结构体记录了进入队列的待执行命令和统计值。
当一个处于事务状态的客户端,执行了EXEC命令,REdis就会依次拿出队列的命令进行执行,顺序是FIFO。
其实看到这应该很清楚了,他所谓的事务并不是真正意义的事务。对于语法错误的命令,在入队的过程中,redis就会拒绝执行事务,不会让其他命令再执行,主要是通过加入 CLIENT_DIRTY_EXEC实现 。
看一下入队操作:
void queueMultiCommand(client *c) {
multiCmd *mc;
int j;
/* No sense to waste memory if the transaction is already aborted.
* this is useful in case client sends these in a pipeline, or doesn't
* bother to read previous responses and didn't notice the multi was already
* aborted. */
if (c->flags & CLIENT_DIRTY_EXEC)
return;
c->mstate.commands = zrealloc(c->mstate.commands,
sizeof(multiCmd)*(c->mstate.count+1));
mc = c->mstate.commands+c->mstate.count;
mc->cmd = c->cmd;
mc->argc = c->argc;
mc->argv = zmalloc(sizeof(robj*)*c->argc);
memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc);
for (j = 0; j < c->argc; j++)
incrRefCount(mc->argv[j]);
c->mstate.count++;
c->mstate.cmd_flags |= c->cmd->flags;
c->mstate.cmd_inv_flags |= ~c->cmd->flags;
}
没有语法错误的,就算是遇到执行错误,也仅仅是单条命令出现问题,还是对其他的毫无影响,因为本身它就是顺序执行的。
看一下事务执行或丢弃的源码.
discard命令:
void discardTransaction(client *c) {
freeClientMultiState(c);
initClientMultiState(c);
c->flags &= ~(CLIENT_MULTI|CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC);
unwatchAllKeys(c);
}
void freeClientMultiState(client *c) {
int j;
//队列中所有命令全部释放
for (j = 0; j < c->mstate.count; j++) {
int i;
multiCmd *mc = c->mstate.commands+j;
for (i = 0; i < mc->argc; i++)
decrRefCount(mc->argv[i]);
zfree(mc->argv);
}
zfree(c->mstate.commands);
}
再看下执行命令。简单思想就是首先检查是否可执行:
/* Check if we need to abort the EXEC because:
* 1) Some WATCHed key was touched.
* 2) There was a previous error while queueing commands.
* A failed EXEC in the first case returns a multi bulk nil object
* (technically it is not an error but a special behavior), while
* in the second an EXECABORT error is returned. */
if (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC)) {
addReply(c, c->flags & CLIENT_DIRTY_EXEC ? shared.execaborterr :
shared.nullarray[c->resp]);
discardTransaction(c);
goto handle_monitor;
}
然后执行命令:
/* Exec all the queued commands */
unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */
orig_argv = c->argv;
orig_argc = c->argc;
orig_cmd = c->cmd;
addReplyArrayLen(c,c->mstate.count);
for (j = 0; j < c->mstate.count; j++) {
c->argc = c->mstate.commands[j].argc;
c->argv = c->mstate.commands[j].argv;
c->cmd = c->mstate.commands[j].cmd;
/* Propagate a MULTI request once we encounter the first command which
* is not readonly nor an administrative one.
* This way we'll deliver the MULTI/..../EXEC block as a whole and
* both the AOF and the replication link will have the same consistency
* and atomicity guarantees. */
if (!must_propagate &&
!server.loading &&
!(c->cmd->flags & (CMD_READONLY|CMD_ADMIN)))
{
execCommandPropagateMulti(c);
must_propagate = 1;
}
int acl_keypos;
int acl_retval = ACLCheckCommandPerm(c,&acl_keypos);
if (acl_retval != ACL_OK) {
addACLLogEntry(c,acl_retval,acl_keypos,NULL);
addReplyErrorFormat(c,
"-NOPERM ACLs rules changed between the moment the "
"transaction was accumulated and the EXEC call. "
"This command is no longer allowed for the "
"following reason: %s",
(acl_retval == ACL_DENIED_CMD) ?
"no permission to execute the command or subcommand" :
"no permission to touch the specified keys");
} else {
call(c,server.loading ? CMD_CALL_NONE : CMD_CALL_FULL);
}
/* Commands may alter argc/argv, restore mstate. */
c->mstate.commands[j].argc = c->argc;
c->mstate.commands[j].argv = c->argv;
c->mstate.commands[j].cmd = c->cmd;
}
c->argv = orig_argv;
c->argc = orig_argc;
c->cmd = orig_cmd;
discardTransaction(c);
/* Make sure the EXEC command will be propagated as well if MULTI
* was already propagated. */
if (must_propagate) {
int is_master = server.masterhost == NULL;
server.dirty++;
/* If inside the MULTI/EXEC block this instance was suddenly
* switched from master to slave (using the SLAVEOF command), the
* initial MULTI was propagated into the replication backlog, but the
* rest was not. We need to make sure to at least terminate the
* backlog with the final EXEC. */
if (server.repl_backlog && was_master && !is_master) {
char *execcmd = "*1\r\n$4\r\nEXEC\r\n";
feedReplicationBacklog(execcmd,strlen(execcmd));
}
}
handle_monitor:
/* Send EXEC to clients waiting data from MONITOR. We do it here
* since the natural order of commands execution is actually:
* MUTLI, EXEC, ... commands inside transaction ...
* Instead EXEC is flagged as CMD_SKIP_MONITOR in the command
* table, and we do it here with correct ordering. */
if (listLength(server.monitors) && !server.loading)
replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
}
Redis事务中经常配合watch实现乐观锁,可以解决高并发的一些问题,一般可以用于抢购,秒杀等或活动。简单思想就是在事务开启前监控一个变量,在修改的时侯如果该变量和目前的值相等,就修改,否则不修改,事务exec不会执行。它的机制就是通过判断CLIENT_DIRTY_CAS判断.当某个事务中被监视的key被修改了,那么Redis就会将该客户端的CLIENT_DIRTY_CAS打开,告诉你,你的事务被破坏了。那么Redis是怎么判断被watch的key被破坏了呢?其实是通过一个全局字典的watched_keys来实现的。
每当我们在执行WATCH命令时,都会更新至该字典中。看下源码:
/* "Touch" a key, so that if this key is being WATCHed by some client the
* next EXEC will fail. */
void touchWatchedKey(redisDb *db, robj *key) {
list *clients;
listIter li;
listNode *ln;
if (dictSize(db->watched_keys) == 0) return;
clients = dictFetchValue(db->watched_keys, key);
if (!clients) return;
/* Mark all the clients watching this key as CLIENT_DIRTY_CAS */
/* Check if we are already watching for this key */
listRewind(clients,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags |= CLIENT_DIRTY_CAS;
}
}
管道pipeline
它的一个基本思想就是能够在一次网络请求中传入多个Redis命令,减少write和read的次数,一般都是在Redis客户端来实现的。通过在一个Pipeline中的命令先写到一个Pipeline buffer中,然后将命令依次写到socker send buffer中发送出去。当然,虽然可一起发出去,但接受响应也是阻塞的,必须要等到所有命令的响应结果。
首先看一下,创建命令过程:
class Pipeline implements ClientContextInterface
{
private $client;
private $pipeline;
private $responses = array();
private $running = false;
/**
* @param ClientInterface $client Client instance used by the context.
*/
public function __construct(ClientInterface $client)
{
$this->client = $client;
$this->pipeline = new \SplQueue();
}
/**
* Queues a command into the pipeline buffer.
*
* @param string $method Command ID.
* @param array $arguments Arguments for the command.
*
* @return $this
*/
public function __call($method, $arguments)
{
$command = $this->client->createCommand($method, $arguments);
$this->recordCommand($command);
return $this;
}
/**
* Queues a command instance into the pipeline buffer.
*
* @param CommandInterface $command Command to be queued in the buffer.
*/
protected function recordCommand(CommandInterface $command)
{
$this->pipeline->enqueue($command);
}
对于每一个pipeline中的命令首先是放入到一个队列中。
执行命令的过程:
protected function executePipeline(ConnectionInterface $connection, \SplQueue $commands)
{
foreach ($commands as $command) {
$connection->writeRequest($command);
}
$responses = array();
$exceptions = $this->throwServerExceptions();
while (!$commands->isEmpty()) {
$command = $commands->dequeue();
$response = $connection->readResponse($command);
if (!$response instanceof ResponseInterface) {
$responses[] = $command->parseResponse($response);
} elseif ($response instanceof ErrorResponseInterface && $exceptions) {
$this->exception($connection, $response);
} else {
$responses[] = $response;
}
}
return $responses;
}
不过目前几乎很少有使用管道的了,都用lua写。因为脚本的性能要比pipeline的性能要好。此外一般pipeline的实现都是在客户端,通过操作系统的读写buffer实现,lua是直接在Redis服务器上支持。
LUA脚本
这个就不分析Redis的lua实现原理了,简单说一下就是通过lua可实现原子操作。此外,Redis在lua中提供了redis.call和redis.pcall两个函数。call是遇到问题时就会向上抛出异常,Lua脚本不再继续执行;pcall是记录错误信息,还会接着向下进行。类似于语言里的try catch。
我们知道当我们代码里出现异常,服务器会出问题。为了避免出错导致服务器异常。对于redis.call,Redis会在上层再封装一层pcall函数。
简单例子:
10.38.163.219:6379> eval 'redis.pcall("get",KEYS[1]);return redis.call("smembers",KEYS[1])' 1 testset2
1) "1"
2) "2"
3) "3"
4) "4"
5) "5"
6) "6"
7) "8"
8) "9"
10.38.163.219:6379> eval 'redis.call("get",KEYS[1]);return redis.call("smembers",KEYS[1])' 1 testset2
(error) ERR Error running script (call to f_6ff7ff4cf361459fbc2f3f7d52699287582d1be1): @user_script:1: WRONGTYPE Operation against a key holding the wrong kind of value
再看一下redis是怎么封装我们的call函数的:
if (lua_pcall(lua,0,0,0)) {
if (c != NULL) {
addReplyErrorFormat(c,"Error running script (new function): %s\n",
lua_tostring(lua,-1));
}
lua_pop(lua,1);
sdsfree(sha);
return NULL;
}
此外,为了避免每次请求都发一次脚本主体,Redis提供了脚本缓存功能。然后通过EVALSHA 到某一个hash 脚本缓存,传入参数就可以直接执行。
直接用script load加载脚本到服务器,返回sha值,后每次都可以使用该sha1值获取该脚本,通过evalsha执行。
10.38.163.219:6379> evalsha 6ff7ff4cf361459fbc2f3f7d52699287582d1be1 1 testset2
(error) ERR Error running script (call to f_6ff7ff4cf361459fbc2f3f7d52699287582d1be1): @user_script:1: WRONGTYPE Operation against a key holding the wrong kind of value
10.38.163.219:6379>
最后说一点,由于lua可以写入多个命令,且执行的时候,脚本外的客户端请求都被阻塞了,所以就尽量避免长时间执行的操作。考虑这一点,Redis也可以设置最大的执行时间,超过这个时间就会有如下操作:
-
Redis 记录一个脚本正在超时运行
-
Redis 开始重新接受其他客户端的命令请求,但是只有
SCRIPT KILL
和SHUTDOWN NOSAVE
两个命令会被处理,对于其他命令请求, Redis 服务器只是简单地返回BUSY
错误。 -
可以使用
SCRIPT KILL
命令将一个仅执行只读命令的脚本杀死,因为只读命令并不修改数据,因此杀死这个脚本并不破坏数据的完整性 -
如果脚本已经执行过写命令,那么唯一允许执行的操作就是
SHUTDOWN NOSAVE
,它通过停止服务器来阻止当前数据集写入磁盘
参考资料:
Using pipelining to speedup Redis queries
微信分享/微信扫码阅读