Redis的事务、管道、LUA

一、事务

Redis的事务和我们平常用的关系型数据库的事务是有区别的,至少四大特性中的原子性是不保证的。

1、假如在一个事务中,语法错误,那么Redis检测出来之后,整个事务是回滚的,任何一个都不会执行。

其实这么说不是特别严谨,只是表象看起来是回滚的。Redis就根本没有回滚的概念,就是不支持。只是在有语法错误时,事务中命令都不会执行。

110.138.1.21:6379> multi
OK
110.138.1.21:6379> discard
OK
110.138.1.21:6379> multi
OK
110.138.1.21:6379> set book yesbook
QUEUED
110.138.1.21:6379> set paper yespaper
QUEUED
110.138.1.21:6379> ok
(error) ERR unknown command 'ok'
110.138.1.21:6379> incr book
QUEUED
110.138.1.21:6379> set paper nopaper
QUEUED
110.138.1.21:6379> exec
(error) EXECABORT Transaction discarded because of previous errors.
110.138.1.21:6379> get paper
(nil)

Redis直接就报了上述错误,且discard了。

那么,当在事务中没有语法错误,而是运行错误,即在执行Redis命令的时侯出现的错误,那么Redis还会执行其他的命令,不会像Mysql等数据库那样保证原子性。也就是说Redis并不支持事务的回滚。

比如,给一个存储String类型value的key赋整数,这种就是运行错误。

例子;

110.138.1.21:6379> get book
(nil)
110.138.1.21:6379> multi
OK
110.138.1.21:6379> set book yesook
QUEUED
110.138.1.21:6379> incr book
QUEUED
10.138.1.21:6379> set paper yespaper
QUEUED
110.138.1.21:6379> exec
1) OK
2) (error) ERR value is not an integer or out of range
3) OK
110.138.1.21:6379> get paper
"yespaper"
110.138.1.21:6379> get book
"yesook"

看上面的例子,set paper,set book都正常执行了,只是自增那条命令没有执行。

Redis作者对于不支持回滚这点的解释是复杂的回滚机制实现与Redis所追求的极致性能背道而驰。

Redis的事务是通过一个multiState的一个结构体实现的。对于每一个redis客户端,都有一个该字段。

typedef struct multiState {
    multiCmd *commands;     /* Array of MULTI commands */
    int count;              /* Total number of MULTI commands */
    int cmd_flags;          /* The accumulated command flags OR-ed together.
                               So if at least a command has a given flag, it
                               will be set in this field. */
    int cmd_inv_flags;      /* Same as cmd_flags, OR-ing the ~flags. so that it
                               is possible to know if all the commands have a
                               certain flag. */
    int minreplicas;        /* MINREPLICAS for synchronous replication */
    time_t minreplicas_timeout; /* MINREPLICAS timeout as unixtime. */
} multiState;

该结构体记录了进入队列的待执行命令和统计值。

当一个处于事务状态的客户端,执行了EXEC命令,REdis就会依次拿出队列的命令进行执行,顺序是FIFO。

其实看到这应该很清楚了,他所谓的事务并不是真正意义的事务。对于语法错误的命令,在入队的过程中,redis就会拒绝执行事务,不会让其他命令再执行,主要是通过加入 CLIENT_DIRTY_EXEC实现

看一下入队操作:

void queueMultiCommand(client *c) {
    multiCmd *mc;
    int j;

    /* No sense to waste memory if the transaction is already aborted.
     * this is useful in case client sends these in a pipeline, or doesn't
     * bother to read previous responses and didn't notice the multi was already
     * aborted. */
    if (c->flags & CLIENT_DIRTY_EXEC)
        return;

    c->mstate.commands = zrealloc(c->mstate.commands,
            sizeof(multiCmd)*(c->mstate.count+1));
    mc = c->mstate.commands+c->mstate.count;
    mc->cmd = c->cmd;
    mc->argc = c->argc;
    mc->argv = zmalloc(sizeof(robj*)*c->argc);
    memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc);
    for (j = 0; j < c->argc; j++)
        incrRefCount(mc->argv[j]);
    c->mstate.count++;
    c->mstate.cmd_flags |= c->cmd->flags;
    c->mstate.cmd_inv_flags |= ~c->cmd->flags;
}

没有语法错误的,就算是遇到执行错误,也仅仅是单条命令出现问题,还是对其他的毫无影响,因为本身它就是顺序执行的。

看一下事务执行或丢弃的源码.

discard命令:

void discardTransaction(client *c) {
    freeClientMultiState(c);
    initClientMultiState(c);
    c->flags &= ~(CLIENT_MULTI|CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC);
    unwatchAllKeys(c);
}

void freeClientMultiState(client *c) {
    int j;

    //队列中所有命令全部释放
    for (j = 0; j < c->mstate.count; j++) {
        int i;
        multiCmd *mc = c->mstate.commands+j;

        for (i = 0; i < mc->argc; i++)
            decrRefCount(mc->argv[i]);
        zfree(mc->argv);
    }
    zfree(c->mstate.commands);
}

再看下执行命令。简单思想就是首先检查是否可执行:


    /* Check if we need to abort the EXEC because:
     * 1) Some WATCHed key was touched.
     * 2) There was a previous error while queueing commands.
     * A failed EXEC in the first case returns a multi bulk nil object
     * (technically it is not an error but a special behavior), while
     * in the second an EXECABORT error is returned. */
    if (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC)) {
        addReply(c, c->flags & CLIENT_DIRTY_EXEC ? shared.execaborterr :
                                                   shared.nullarray[c->resp]);
        discardTransaction(c);
        goto handle_monitor;
    }

然后执行命令:

 /* Exec all the queued commands */
    unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */
    orig_argv = c->argv;
    orig_argc = c->argc;
    orig_cmd = c->cmd;
    addReplyArrayLen(c,c->mstate.count);
    for (j = 0; j < c->mstate.count; j++) {
        c->argc = c->mstate.commands[j].argc;
        c->argv = c->mstate.commands[j].argv;
        c->cmd = c->mstate.commands[j].cmd;

        /* Propagate a MULTI request once we encounter the first command which
         * is not readonly nor an administrative one.
         * This way we'll deliver the MULTI/..../EXEC block as a whole and
         * both the AOF and the replication link will have the same consistency
         * and atomicity guarantees. */
        if (!must_propagate &&
            !server.loading &&
            !(c->cmd->flags & (CMD_READONLY|CMD_ADMIN)))
        {
            execCommandPropagateMulti(c);
            must_propagate = 1;
        }

        int acl_keypos;
        int acl_retval = ACLCheckCommandPerm(c,&acl_keypos);
        if (acl_retval != ACL_OK) {
            addACLLogEntry(c,acl_retval,acl_keypos,NULL);
            addReplyErrorFormat(c,
                "-NOPERM ACLs rules changed between the moment the "
                "transaction was accumulated and the EXEC call. "
                "This command is no longer allowed for the "
                "following reason: %s",
                (acl_retval == ACL_DENIED_CMD) ?
                "no permission to execute the command or subcommand" :
                "no permission to touch the specified keys");
        } else {
            call(c,server.loading ? CMD_CALL_NONE : CMD_CALL_FULL);
        }

        /* Commands may alter argc/argv, restore mstate. */
        c->mstate.commands[j].argc = c->argc;
        c->mstate.commands[j].argv = c->argv;
        c->mstate.commands[j].cmd = c->cmd;
    }
    c->argv = orig_argv;
    c->argc = orig_argc;
    c->cmd = orig_cmd;
    discardTransaction(c);

    /* Make sure the EXEC command will be propagated as well if MULTI
     * was already propagated. */
    if (must_propagate) {
        int is_master = server.masterhost == NULL;
        server.dirty++;
        /* If inside the MULTI/EXEC block this instance was suddenly
         * switched from master to slave (using the SLAVEOF command), the
         * initial MULTI was propagated into the replication backlog, but the
         * rest was not. We need to make sure to at least terminate the
         * backlog with the final EXEC. */
        if (server.repl_backlog && was_master && !is_master) {
            char *execcmd = "*1\r\n$4\r\nEXEC\r\n";
            feedReplicationBacklog(execcmd,strlen(execcmd));
        }
    }

handle_monitor:
    /* Send EXEC to clients waiting data from MONITOR. We do it here
     * since the natural order of commands execution is actually:
     * MUTLI, EXEC, ... commands inside transaction ...
     * Instead EXEC is flagged as CMD_SKIP_MONITOR in the command
     * table, and we do it here with correct ordering. */
    if (listLength(server.monitors) && !server.loading)
        replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
}

Redis事务中经常配合watch实现乐观锁,可以解决高并发的一些问题,一般可以用于抢购,秒杀等或活动。简单思想就是在事务开启前监控一个变量,在修改的时侯如果该变量和目前的值相等,就修改,否则不修改,事务exec不会执行。它的机制就是通过判断CLIENT_DIRTY_CAS判断.当某个事务中被监视的key被修改了,那么Redis就会将该客户端的CLIENT_DIRTY_CAS打开,告诉你,你的事务被破坏了。那么Redis是怎么判断被watch的key被破坏了呢?其实是通过一个全局字典的watched_keys来实现的。

每当我们在执行WATCH命令时,都会更新至该字典中。看下源码:

/* "Touch" a key, so that if this key is being WATCHed by some client the
 * next EXEC will fail. */
void touchWatchedKey(redisDb *db, robj *key) {
    list *clients;
    listIter li;
    listNode *ln;

    if (dictSize(db->watched_keys) == 0) return;
    clients = dictFetchValue(db->watched_keys, key);
    if (!clients) return;

    /* Mark all the clients watching this key as CLIENT_DIRTY_CAS */
    /* Check if we are already watching for this key */
    listRewind(clients,&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);

        c->flags |= CLIENT_DIRTY_CAS;
    }
}

管道pipeline

它的一个基本思想就是能够在一次网络请求中传入多个Redis命令,减少write和read的次数,一般都是在Redis客户端来实现的。通过在一个Pipeline中的命令先写到一个Pipeline buffer中,然后将命令依次写到socker send buffer中发送出去。当然,虽然可一起发出去,但接受响应也是阻塞的,必须要等到所有命令的响应结果。

首先看一下,创建命令过程:

class Pipeline implements ClientContextInterface
{
    private $client;
    private $pipeline;

    private $responses = array();
    private $running = false;

    /**
     * @param ClientInterface $client Client instance used by the context.
     */
    public function __construct(ClientInterface $client)
    {
        $this->client = $client;
        $this->pipeline = new \SplQueue();
    }

    /**
     * Queues a command into the pipeline buffer.
     *
     * @param string $method    Command ID.
     * @param array  $arguments Arguments for the command.
     *
     * @return $this
     */
    public function __call($method, $arguments)
    {
        $command = $this->client->createCommand($method, $arguments);
        $this->recordCommand($command);

        return $this;
    }

    /**
     * Queues a command instance into the pipeline buffer.
     *
     * @param CommandInterface $command Command to be queued in the buffer.
     */
    protected function recordCommand(CommandInterface $command)
    {
        $this->pipeline->enqueue($command);
    }

对于每一个pipeline中的命令首先是放入到一个队列中。

执行命令的过程:

 protected function executePipeline(ConnectionInterface $connection, \SplQueue $commands)
    {
        foreach ($commands as $command) {
            $connection->writeRequest($command);
        }

        $responses = array();
        $exceptions = $this->throwServerExceptions();

        while (!$commands->isEmpty()) {
            $command = $commands->dequeue();
            $response = $connection->readResponse($command);

            if (!$response instanceof ResponseInterface) {
                $responses[] = $command->parseResponse($response);
            } elseif ($response instanceof ErrorResponseInterface && $exceptions) {
                $this->exception($connection, $response);
            } else {
                $responses[] = $response;
            }
        }

        return $responses;
    }

不过目前几乎很少有使用管道的了,都用lua写。因为脚本的性能要比pipeline的性能要好。此外一般pipeline的实现都是在客户端,通过操作系统的读写buffer实现,lua是直接在Redis服务器上支持。

LUA脚本

这个就不分析Redis的lua实现原理了,简单说一下就是通过lua可实现原子操作。此外,Redis在lua中提供了redis.call和redis.pcall两个函数。call是遇到问题时就会向上抛出异常,Lua脚本不再继续执行;pcall是记录错误信息,还会接着向下进行。类似于语言里的try catch。

我们知道当我们代码里出现异常,服务器会出问题。为了避免出错导致服务器异常。对于redis.call,Redis会在上层再封装一层pcall函数。

简单例子:

10.38.163.219:6379> eval 'redis.pcall("get",KEYS[1]);return redis.call("smembers",KEYS[1])' 1 testset2
1) "1"
2) "2"
3) "3"
4) "4"
5) "5"
6) "6"
7) "8"
8) "9"
10.38.163.219:6379> eval 'redis.call("get",KEYS[1]);return redis.call("smembers",KEYS[1])' 1 testset2
(error) ERR Error running script (call to f_6ff7ff4cf361459fbc2f3f7d52699287582d1be1): @user_script:1: WRONGTYPE Operation against a key holding the wrong kind of value 

再看一下redis是怎么封装我们的call函数的:

 if (lua_pcall(lua,0,0,0)) {
        if (c != NULL) {
            addReplyErrorFormat(c,"Error running script (new function): %s\n",
                lua_tostring(lua,-1));
        }
        lua_pop(lua,1);
        sdsfree(sha);
        return NULL;
    }

此外,为了避免每次请求都发一次脚本主体,Redis提供了脚本缓存功能。然后通过EVALSHA 到某一个hash 脚本缓存,传入参数就可以直接执行。

直接用script load加载脚本到服务器,返回sha值,后每次都可以使用该sha1值获取该脚本,通过evalsha执行。

10.38.163.219:6379> evalsha 6ff7ff4cf361459fbc2f3f7d52699287582d1be1 1 testset2
(error) ERR Error running script (call to f_6ff7ff4cf361459fbc2f3f7d52699287582d1be1): @user_script:1: WRONGTYPE Operation against a key holding the wrong kind of value 
10.38.163.219:6379> 

最后说一点,由于lua可以写入多个命令,且执行的时候,脚本外的客户端请求都被阻塞了,所以就尽量避免长时间执行的操作。考虑这一点,Redis也可以设置最大的执行时间,超过这个时间就会有如下操作:

  • Redis 记录一个脚本正在超时运行

  • Redis 开始重新接受其他客户端的命令请求,但是只有 SCRIPT KILL SHUTDOWN NOSAVE 两个命令会被处理,对于其他命令请求, Redis 服务器只是简单地返回 BUSY 错误。

  • 可以使用 SCRIPT KILL 命令将一个仅执行只读命令的脚本杀死,因为只读命令并不修改数据,因此杀死这个脚本并不破坏数据的完整性

  • 如果脚本已经执行过写命令,那么唯一允许执行的操作就是 SHUTDOWN NOSAVE ,它通过停止服务器来阻止当前数据集写入磁盘

参考资料:

redis专题14 性能提升之管道技术

Using pipelining to speedup Redis queries

深入分析Redis的lua脚本运行原理

--------EOF---------
微信分享/微信扫码阅读