美文网首页
Redis源码研究之事务

Redis源码研究之事务

作者: wenmingxing | 来源:发表于2018-04-30 20:37 被阅读31次

    本文主要说明Redis事务功能的实现。

    建议阅读:
    1、Redis事务的理论介绍见:Redis之事务实现

    I、上帝视角

    1、Redis通过一组命令来实现事务:
    · MULTI,开启一个事务;
    · EXEC,执行事务;
    · DISCARD,取消事务;
    · WATCH,监视某一个键值对,如果被修改则事务会被取消,这是一个乐观锁

    II、事务命令队列

    1、Redis服务器收到来自客户端的MULTI命令后,为客户端维护一个命令队列结构体,直到收到EXEC后,开始依次执行命令队列中的命令。

    // 命令队列结构体
    /*src/redis.h/multiState*/
    typedef struct multiState {
        // 命令队列,其实是链表
        multiCmd *commands; /* Array of MULTI commands */
    
        // 命令的个数
        int count; /* Total number of MULTI commands */
    
        // 以下两个参数暂时没有用到,和主从复制有关
        int minreplicas; /* MINREPLICAS for synchronous replication */
        time_t minreplicas_timeout; /* MINREPLICAS timeout as unixtime. */
    } multiState;  
    

    每个命令节点为multiCmd,其结构为:

    /*保存事务命令*/
    /*src/redis.h/multiCmd*/
    /* Client MULTI/EXEC state */
    typedef struct multiCmd {
        // 命令参数
        robj **argv;
        // 参数个数
        int argc;
        // 命令结构体,包含了与命令相关的参数,譬如命令执行函数
        // 如需更详细了解,参看redis.c 中的redisCommandTable 全局参数
        struct redisCommand *cmd;
    } multiCmd;  
    
    

    2、processCommand函数中可以看到关于入队的操作:

    int processCommand(redisClient *c) {
        ......
        // 加入命令队列的情况
        /* Exec the command */
        if (c->flags & REDIS_MULTI &&
            c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
            c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
        {
        // 命令入队
        queueMultiCommand(c);
        addReply(c,shared.queued);
       
     // 真正执行命令。
        // 注意,如果是设置了多命令模式,那么不是直接执行命令,而是让命令入队
        } else {
            call(c,REDIS_CALL_FULL);
        if (listLength(server.ready_keys))
            handleClientsBlockedOnLists();
        }
        return REDIS_OK;
    }  
    
     /* 将一个新命令添加到事务队列中*/  
    /*src/multi.c/queueMultiCommand*/
    void queueMultiCommand(redisClient *c) {
        multiCmd *mc;
        int j;
    
        // 为新数组元素分配空间
        c->mstate.commands = zrealloc(c->mstate.commands,
                sizeof(multiCmd)*(c->mstate.count+1));
    
        // 指向新元素
        mc = c->mstate.commands+c->mstate.count;
    
        // 设置事务的命令、命令参数数量,以及命令的参数
        mc->cmd = c->cmd;
        mc->argc = c->argc;
        mc->argv = zmalloc(sizeof(robj*)*c->argc);
        memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc);
        for (j = 0; j < c->argc; j++)
            incrRefCount(mc->argv[j]);
    
        // 事务命令数量计数器增一
        c->mstate.count++;
    }
    

    III、事务的执行与取消

    1、当用户发出exec命令时,在MULTI之后添加的命令都会被执行,但是需要注意几点:
    · WATCH监视的键值对是否被修改,如果被修改,则会被标记为REDIS_DIRTY_CAS,调用discardTransaction取消事务;
    · 是否入队错误,客户端将会标记为REDIS_DIRTY_EXEC,也导致事务被取消;

    /*执行事务命令*/  
    /*src/multi.c/execCommand*/ 
    void execCommand(redisClient *c) {
       int j;
       robj **orig_argv;
       int orig_argc;
       struct redisCommand *orig_cmd;
       int must_propagate = 0; /* Need to propagate MULTI/EXEC to AOF / slaves? */
    
       // 客户端没有执行事务
       if (!(c->flags & REDIS_MULTI)) {
           addReplyError(c,"EXEC without MULTI");
           return;
       }
    
       /* Check if we need to abort the EXEC because:
        *
        * 检查是否需要阻止事务执行,因为:
        *
        * 1) Some WATCHed key was touched.
        *    有被监视的键已经被修改了
        *
        * 2) There was a previous error while queueing commands.
        *    命令在入队时发生错误
        *    (注意这个行为是 2.6.4 以后才修改的,之前是静默处理入队出错命令)
        *
        * A failed EXEC in the first case returns a multi bulk nil object
        * (technically it is not an error but a special behavior), while
        * in the second an EXECABORT error is returned. 
        *
        * 第一种情况返回多个批量回复的空对象
        * 而第二种情况则返回一个 EXECABORT 错误
        */
       if (c->flags & (REDIS_DIRTY_CAS|REDIS_DIRTY_EXEC)) {
    
           addReply(c, c->flags & REDIS_DIRTY_EXEC ? shared.execaborterr :
                                                     shared.nullmultibulk);
    
           // 取消事务
           discardTransaction(c);
    
           goto handle_monitor;
       }
    
       /* Exec all the queued commands */
       // 已经可以保证安全性了,取消客户端对所有键的监视
       unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */
    
       // 因为事务中的命令在执行时可能会修改命令和命令的参数
       // 所以为了正确地传播命令,需要现备份这些命令和参数
       orig_argv = c->argv;
       orig_argc = c->argc;
       orig_cmd = c->cmd;
    
       addReplyMultiBulkLen(c,c->mstate.count);
    
       // 执行事务中的命令
       for (j = 0; j < c->mstate.count; j++) {
    
           // 因为 Redis 的命令必须在客户端的上下文中执行
           // 所以要将事务队列中的命令、命令参数等设置给客户端
           c->argc = c->mstate.commands[j].argc;
           c->argv = c->mstate.commands[j].argv;
           c->cmd = c->mstate.commands[j].cmd;
    
           /* Propagate a MULTI request once we encounter the first write op.
            *
            * 当遇上第一个写命令时,传播 MULTI 命令。
            *
            * This way we'll deliver the MULTI/..../EXEC block as a whole and
            * both the AOF and the replication link will have the same consistency
            * and atomicity guarantees. 
            *
            * 这可以确保服务器和 AOF 文件以及附属节点的数据一致性。
            */
           if (!must_propagate && !(c->cmd->flags & REDIS_CMD_READONLY)) {
    
               // 传播 MULTI 命令
               execCommandPropagateMulti(c);
    
               // 计数器,只发送一次
               must_propagate = 1;
           }
    
           // 执行命令
           call(c,REDIS_CALL_FULL);
    
           /* Commands may alter argc/argv, restore mstate. */
           // 因为执行后命令、命令参数可能会被改变
           // 比如 SPOP 会被改写为 SREM
           // 所以这里需要更新事务队列中的命令和参数
           // 确保附属节点和 AOF 的数据一致性
           c->mstate.commands[j].argc = c->argc;
           c->mstate.commands[j].argv = c->argv;
           c->mstate.commands[j].cmd = c->cmd;
       }
    
       // 还原命令、命令参数
       c->argv = orig_argv;
       c->argc = orig_argc;
       c->cmd = orig_cmd;
    
       // 清理事务状态
       discardTransaction(c);
    
       /* Make sure the EXEC command will be propagated as well if MULTI
        * was already propagated. */
       // 将服务器设为脏,确保 EXEC 命令也会被传播
       if (must_propagate) server.dirty++;
    
    handle_monitor:
       /* Send EXEC to clients waiting data from MONITOR. We do it here
        * since the natural order of commands execution is actually:
        * MUTLI, EXEC, ... commands inside transaction ...
        * Instead EXEC is flagged as REDIS_CMD_SKIP_MONITOR in the command
        * table, and we do it here with correct ordering. */
       if (listLength(server.monitors) && !server.loading)
           replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
    }  
    

    2、取消事务

    函数discardTransaction为取消事务:

    /*取消事务*/
    /*src/multi.c/disacrdTransaction*/
    void discardTransaction(redisClient *c) {
        // 清空命令队列
        freeClientMultiState(c);
        // 初始化命令队列
        initClientMultiState(c);
        // 取消标记flag
        c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS|REDIS_DIRTY_EXEC);;
        unwatchAllKeys(c);
    }  
    

    IV、WATCH

    WATCH是一个乐观锁,让Redis拥有了check-and-set(CAS)特性。

    4.1 redisClient与redisDb中的数据结构

    redisClientredisDb结构体中维护了WATCH相关的数据结构:

    1、每个redisClient都维护一个链表,记录自己所被监视的key:

    /*src/redis.h/redisClient*/
    typedef struct redisClient {
        ......
        //正在被监视的键
        list *watched_keys;
        ......
    } redisClient;    
    

    2、每个redisDb都维护了一个watched_keys的字典,key为被监视的数据库键,value为一个链表,记录所有监视相应数据库键的客户端:

    /*src/redis.h/redisDb*/
    typedef struct redisDb {
        ......
        //正在被监视的键
        dict *watched_keys;
        ......
    } redisDb;    
    

    4.2 WATCH实现

    /*watch命令*/
    /*src/multi.c/watchCommand*/
    void watchCommand(redisClient *c) {
        int j;
    
        // 不能在事务开始后执行
        if (c->flags & REDIS_MULTI) {
            addReplyError(c,"WATCH inside MULTI is not allowed");
            return;
        }
    
        // 监视输入的任意个键
        for (j = 1; j < c->argc; j++)
            watchForKey(c,c->argv[j]);
    
        addReply(c,shared.ok);
    }
    
    

    其中主要调用了watchForKey函数,为真正的监视键值函数:

    /*监视特定键值,即为维护两个用于watch的数据结构*/  
    /*src/multi/watchForKey*/  
    
    void watchForKey(redisClient *c, robj *key) {
    
        list *clients = NULL;
        listIter li;
        listNode *ln;
        watchedKey *wk;
    
        /* Check if we are already watching for this key */
        // 检查 key 是否已经保存在 watched_keys 链表中,
        // 如果是的话,直接返回
        listRewind(c->watched_keys,&li);
        while((ln = listNext(&li))) {
            wk = listNodeValue(ln);
            if (wk->db == c->db && equalStringObjects(key,wk->key))
                return; /* Key already watched */
        }
    
        // 键不存在于 watched_keys ,添加它
    
        // 以下是一个 key 不存在于字典的例子:
        // before :
        // {
        //  'key-1' : [c1, c2, c3],
        //  'key-2' : [c1, c2],
        // }
        // after c-10086 WATCH key-1 and key-3:
        // {
        //  'key-1' : [c1, c2, c3, c-10086],
        //  'key-2' : [c1, c2],
        //  'key-3' : [c-10086]
        // }
    
        /* This key is not already watched in this DB. Let's add it */
        // 检查 key 是否存在于数据库的 watched_keys 字典中
        clients = dictFetchValue(c->db->watched_keys,key);
        // 如果不存在的话,添加它
        if (!clients) { 
            // 值为链表
            clients = listCreate();
            // 关联键值对到字典
            dictAdd(c->db->watched_keys,key,clients);
            incrRefCount(key);
        }
        // 将客户端添加到链表的末尾
        listAddNodeTail(clients,c);
    
        /* Add the new key to the list of keys watched by this client */
        // 将新 watchedKey 结构添加到客户端 watched_keys 链表的表尾
        // 以下是一个添加 watchedKey 结构的例子
        // before:
        // [
        //  {
        //   'key': 'key-1',
        //   'db' : 0
        //  }
        // ]
        // after client watch key-123321 in db 0:
        // [
        //  {
        //   'key': 'key-1',
        //   'db' : 0
        //  }
        //  ,
        //  {
        //   'key': 'key-123321',
        //   'db': 0
        //  }
        // ]
        wk = zmalloc(sizeof(*wk));
        wk->key = key;
        wk->db = c->db;
        incrRefCount(key);
        listAddNodeTail(c->watched_keys,wk);
    }
    
    4.3 标记某个key是否被修改
    /*src/multi.c/touchWatchedKey*/
    /* "Touch" a key, so that if this key is being WATCHed by some client the
     * next EXEC will fail. 
     *
     * “触碰”一个键,如果这个键正在被某个/某些客户端监视着,
     * 那么这个/这些客户端在执行 EXEC 时事务将失败。
     */
    void touchWatchedKey(redisDb *db, robj *key) {
        list *clients;
        listIter li;
        listNode *ln;
    
        // 字典为空,没有任何键被监视
        if (dictSize(db->watched_keys) == 0) return;
    
        // 获取所有监视这个键的客户端
        clients = dictFetchValue(db->watched_keys, key);
        if (!clients) return;
    
        /* Mark all the clients watching this key as REDIS_DIRTY_CAS */
        /* Check if we are already watching for this key */
        // 遍历所有客户端,打开他们的 REDIS_DIRTY_CAS 标识
        listRewind(clients,&li);
        while((ln = listNext(&li))) {
            redisClient *c = listNodeValue(ln);
    
            c->flags |= REDIS_DIRTY_CAS;
        }
    }
    

    【参考】
    [1] 《Redis设计与实现》
    [2] 《Redis源码日志》

    相关文章

      网友评论

          本文标题:Redis源码研究之事务

          本文链接:https://www.haomeiwen.com/subject/plltrftx.html