返回首页 Redis 源码日志

源码日志

Redis 服务框架

Redis 基础数据结构

Redis 内功心法

Redis 应用

其他

Redis 事务机制

Redis 事务简述

MULTI,EXEC,DISCARD,WATCH 四个命令是 Redis 事务的四个基础命令。其中:

  1. MULTI,告诉 Redis 服务器开启一个事务。注意,只是开启,而不是执行
  2. EXEC,告诉 Redis 开始执行事务
  3. DISCARD,告诉 Redis 取消事务
  4. WATCH,监视某一个键值对,它的作用是在事务执行之前如果监视的键值被修改,事务会被取消。

在介绍 Redis 事务之前,先来展开 Redis 命令队列的内部实现。

Redis 命令队列

Redis 允许一个客户端不间断执行多条命令:发送 MULTI 后,用户键入多条命令;再发送 EXEC 即可不间断执行之前输入的多条命令。因为,Redis 是单进程单线的工作模式,因此多条命令的执行是不会被中断的。

> MULTI
OK
> INCR foo
QUEUED
> INCR bar
QUEUED
> EXEC
1) (integer) 1
2) (integer) 1

内部实现不难:Redis 服务器收到来自客户端的 MULTI 命令后,为客户端保存一个命令队列结构体,直到收到 EXEC 后才开始执行命令队列中的命令。

下面是命令队列的数据结构:

// 命令结构体,命令队列专用
/* Client MULTI/EXEC state */
typedef struct multiCmd {
    // 命令参数
    robj **argv;
    // 参数个数
    int argc;
    // 命令结构体,包含了与命令相关的参数,譬如命令执行函数
    // 如需更详细了解,参看redis.c 中的redisCommandTable 全局参数
    struct redisCommand *cmd;
} multiCmd;
// 命令队列结构体
typedef struct multiState {
    // 命令队列
    multiCmd *commands; /* Array of MULTI commands */
    // 命令的个数
    int count; /* Total number of MULTI commands */
    // 以下两个参数暂时没有用到,和主从复制有关
    int minreplicas; /* MINREPLICAS for synchronous replication */
    time_t minreplicas_timeout; /* MINREPLICAS timeout as unixtime. */
} multiState;

通由上面给出的 Redis 客户端操作,来看看 Redis 服务器的状态变化:

> MULTI
OK
> INCR foo
QUEUED
> INCR bar
QUEUED
> EXEC
1) (integer) 1
2) (integer) 1

processCommand() 函数中的一段代码可以窥探命令入队的操作:

// 执行命令
int processCommand(redisClient *c) {
    ......
    // 加入命令队列的情况
    /* Exec the command */
    if (c->flags & REDIS_MULTI &&
        c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
        c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
    {
    // 命令入队
    queueMultiCommand(c);
    addReply(c,shared.queued);
    // 真正执行命令。
    // 注意,如果是设置了多命令模式,那么不是直接执行命令,而是让命令入队
    } else {
        call(c,REDIS_CALL_FULL);
    if (listLength(server.ready_keys))
        handleClientsBlockedOnLists();
    }
    return REDIS_OK;
}

键值的监视

稍后再展开事务执行和取消的部分。

Redis 的官方文档上说,WATCH 命令是为了让 Redis 拥有 check-and-set(CAS) 的特性。CAS 的意思是,一个客户端在修改某个值之前,要检测它是否更改;如果没有更改,修改操作才能成功。

一个不含 CAS 特性的例子:

含有 CAS 特性的例子:

在后一个例子中,client A 第一次尝试修改失败,因为 client B 修改了 score.client A 失败过后,再次尝试修改才成功。Redis 事务的 CAS 特性借助了键值的监视。

Redis 数据集结构体 redisDB 和客户端结构体 redisClient 都会保存键值监视的相关数据。

监视键值的过程:

// WATCH 命令执行函数
void watchCommand(redisClient *c) {
    int j;
    // WATCH 命令不能在MULTI 和EXEC 之间调用
    if (c->flags & REDIS_MULTI) {
        addReplyError(c,"WATCH inside MULTI is not allowed");
        return;
    }
    // 监视所给出的键
    for (j = 1; j < c->argc; j++)
        watchForKey(c,c->argv[j]);
        addReply(c,shared.ok);
    }
    // 监视键值函数
    /* Watch for the specified key */
void watchForKey(redisClient *c, robj *key) {
    list *clients = NULL;
    listIter li;
    listNode *ln;
    watchedKey *wk;
    // 是否已经监视该键值
    /* Check if we are already watching for this key */
    listRewind(c->watched_keys,&li);
    while((ln = listNext(&li))) {
        wk = listNodeValue(ln);
    if (wk->db == c->db && equalStringObjects(key,wk->key))
        return; /* Key already watched */
    }
    // 获取监视该键值的客户端链表
    /* This key is not already watched in this DB. Let's add it */
    clients = dictFetchValue(c->db->watched_keys,key);
    // 如果不存在链表,需要新建一个
    if (!clients) {
        clients = listCreate();
        dictAdd(c->db->watched_keys,key,clients);
        incrRefCount(key);
    }
    // 尾插法。将客户端添加到链表尾部
    listAddNodeTail(clients,c);
    // 将监视键添加到redisClient.watched_keys 的尾部
    /* Add the new key to the list of keys watched by this client */
    wk = zmalloc(sizeof(*wk));
    wk->key = key;
    wk->db = c->db;
    incrRefCount(key);
    listAddNodeTail(c->watched_keys,wk);
}

当客户端键值被修改的时候,监视该键值的所有客户端都会被标记为 REDISDIRTY-CAS,表示此该键值对被修改过,因此如果这个客户端已经进入到事务状态,它命令队列中的命令是不会被执行的。

touchWatchedKey() 是标记某键值被修改的函数,它一般不被 signalModifyKey() 函数包装。下面是 touchWatchedKey() 的实现。

// 标记键值对的客户端为REDIS_DIRTY_CAS,表示其所监视的数据已经被修改过
/* "Touch" a key, so that if this key is being WATCHed by some client the
* next EXEC will fail. */
void touchWatchedKey(redisDb *db, robj *key) {
    list *clients;
    listIter li;
    listNode *ln;
    // 获取监视key 的所有客户端
    if (dictSize(db->watched_keys) == 0) return;
        clients = dictFetchValue(db->watched_keys, key);
    if (!clients) return;
        // 标记监视key 的所有客户端REDIS_DIRTY_CAS
        /* Mark all the clients watching this key as REDIS_DIRTY_CAS */
        /* Check if we are already watching for this key */
        listRewind(clients,&li);
    while((ln = listNext(&li))) {
        redisClient *c = listNodeValue(ln);
        // REDIS_DIRTY_CAS 更改的时候会设置此标记
        c->flags |= REDIS_DIRTY_CAS;
    }
}

redis 事务的执行与取消

当用户发出 EXEC 的时候,在它 MULTI 命令之后提交的所有命令都会被执行。从代码的实现来看,如果客户端监视的数据被修改,它会被标记 REDIS_DIRTY_CAS,会调用 discardTransaction() 从而取消该事务。特别的,用户开启一个事务后会提交多个命令,如果命令在入队过程中出现错误,譬如提交的命令本身不存在,参数错误和内存超额等,都会导致客户端被标记 REDIS_DIRTY_EXEC,被标记 REDIS_DIRTY_EXEC 会导致事务被取消。

因此总结一下:

  • REDIS_DIRTY_CAS 更改的时候会设置此标记
  • REDIS_DIRTY_EXEC 命令入队时出现错误,此标记会导致 EXEC 命令执行失败

下面是执行事务的过程:

// 执行事务内的所有命令
void execCommand(redisClient *c) {
    int j;
    robj **orig_argv;
    int orig_argc;
    struct redisCommand *orig_cmd;
    int must_propagate = 0; /* Need to propagate MULTI/EXEC to AOF / slaves? */
    // 必须设置多命令标记
    if (!(c->flags & REDIS_MULTI)) {
        addReplyError(c,"EXEC without MULTI");
        return;
    }
    // 停止执行事务命令的情况:
    // 1. 被监视的数据被修改
    // 2. 命令队列中的命令执行失败
    /* Check if we need to abort the EXEC because:
    * 1) Some WATCHed key was touched.
    * 2) There was a previous error while queueing commands.
    * A failed EXEC in the first case returns a multi bulk nil object
    * (technically it is not an error but a special behavior), while
    * in the second an EXECABORT error is returned. */
    if (c->flags & (REDIS_DIRTY_CAS|REDIS_DIRTY_EXEC)) {
        addReply(c, c->flags & REDIS_DIRTY_EXEC ? shared.execaborterr :
        shared.nullmultibulk);
        discardTransaction(c);
        goto handle_monitor;
    }
    // 执行队列中的所有命令
    /* Exec all the queued commands */
    unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */
    // 保存当前的命令,一般为MULTI,在执行完所有的命令后会恢复。
    orig_argv = c->argv;
    orig_argc = c->argc;
    orig_cmd = c->cmd;
    addReplyMultiBulkLen(c,c->mstate.count);
    for (j = 0; j < c->mstate.count; j++) {
        // 命令队列中的命令被赋值给当前的命令
        c->argc = c->mstate.commands[j].argc;
        c->argv = c->mstate.commands[j].argv;
        c->cmd = c->mstate.commands[j].cmd;
        // 遇到包含写操作的命令需要将MULTI 命令写入AOF 文件
        /* Propagate a MULTI request once we encounter the first write op.
        * This way we'll deliver the MULTI/..../EXEC block as a whole and
        * both the AOF and the replication link will have the same consistency
        * and atomicity guarantees. */
    if (!must_propagate && !(c->cmd->flags & REDIS_CMD_READONLY)) {
        execCommandPropagateMulti(c);
        must_propagate = 1;
    }
    // 调用call() 执行
    call(c,REDIS_CALL_FULL);
    // 这几句是多余的
    /* Commands may alter argc/argv, restore mstate. */
    c->mstate.commands[j].argc = c->argc;
    c->mstate.commands[j].argv = c->argv;
    c->mstate.commands[j].cmd = c->cmd;
    }
    // 恢复当前的命令,一般为MULTI
    c->argv = orig_argv;
    c->argc = orig_argc;
    c->cmd = orig_cmd;
    // 事务已经执行完毕,清理与此事务相关的信息,如命令队列和客户端标记
    discardTransaction(c);
    /* Make sure the EXEC command will be propagated as well if MULTI
    * was already propagated. */
    if (must_propagate) server.dirty++;
    ......
}

如上所说,被监视的键值被修改或者命令入队出错都会导致事务被取消:

// 取消事务
void discardTransaction(redisClient *c) {
    // 清空命令队列
    freeClientMultiState(c);
    // 初始化命令队列
    initClientMultiState(c);
    // 取消标记flag
    c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS|REDIS_DIRTY_EXEC);;
    unwatchAllKeys(c);
}

Redis 事务番外篇

你可能已经注意到「事务」这个词。在学习数据库原理的时候有提到过事务的 ACID,即原子性、一致性、隔离性、持久性。接下来,看看 Redis 事务是否支持 ACID。

原子性,即一个事务中的所有操作,要么全部完成,要么全部不完成,不会结束在中间某个环节。Redis 事务不支持原子性,最明显的是 Redis 不支持回滚操作。一致性,在事务开始之前和事务结束以后,数据库的完整性没有被破坏。这一点,Redis 事务能够保证。

隔离性,当两个或者多个事务并发访问(此处访问指查询和修改的操作)数据库的同一数据时所表现出的相互关系。Redis 不存在多个事务的问题,因为 Redis 是单进程单线程的工作模式。

持久性,在事务完成以后,该事务对数据库所作的更改便持久地保存在数据库之中,并且是完全的。Redis 提供两种持久化的方式,即 RDB 和 AOF。RDB 持久化只备份当前内存中的数据集,事务执行完毕时,其数据还在内存中,并未立即写入到磁盘,所以 RDB 持久化不能保证 Redis 事务的持久性。再来讨论 AOF 持久化,我在《深入剖析 Redis AOF 持久化策略》中讨论过:Redis AOF 有后台执行和边服务边备份两种方式。后台执行和 RDB 持久化类似,只能保存当前内存中的数据集;边备份边服务的方式中,因为 Redis 只是每间隔 2s 才进行一次备份,因此它的持久性也是不完整的!

当然,我们可以自己修改源码保证 Redis 事务的持久性,这不难。

还有一个亮点,就是 check-and-set CAS。一个修改操作不断的判断X 值是否已经被修改,直到 X 值没有被其他操作修改,才设置新的值。Redis 借助 WATCH/MULTI 命令来实现 CAS 操作的。

实际操作中,多个线程尝试修改一个全局变量,通常我们会用锁,从读取这个变量的时候就开始锁住这个资源从而阻挡其他线程的修改,修改完毕后才释放锁,这是悲观锁的做法。相对应的有一种乐观锁,乐观锁假定其他用户企图修改你正在修改的对象的概率很小,直到提交变更的时候才加锁,读取和修改的情况都不加锁。一般情况下,不同客户端会访问修改不同的键值对,因此一般 check 一次就可以 set 了,而不需要重复 check 多次。

上一篇: 主从复制 下一篇: Redis 与 Lua 脚...