Redis学习之事务

作者: lixin_karl | 来源:发表于2019-05-22 15:42 被阅读0次

    介绍

       Redis 事务可以一次执行多个命令, 并且带有以下两个重要的保证:

    • 批量操作在发送 EXEC 命令前被放入队列缓存。

    • 收到 EXEC 命令后进入事务执行,事务中任意命令执行失败,其余的命令依然被执行。

    事务过程不同的客户端是没有事务之间的影响的,一个事务从开始到执行会经历以下三个阶段:

    • 开始事务

    • 命令入队

    • 执行事务

    一、相关命令

    • Exec 执行所有事务块内的命令。

    • Watch 监视一个(或多个) key ,如果在事务执行之前这个(或这些) key 被其他命令所改动,那么事务将被打断。

    • Discard 取消事务,放弃执行事务块内的所有命令。

    • Unwatch 取消 WATCH 命令对所有 key 的监视。

    • Multi 标记一个事务块的开始。

    二、数据结构

    typedef struct client {
        ...
        list *watched_keys; /* 事务操作 监控的keys链表 值为watchedKey结构*/
        multiState mstate;  /* 事务状态 */
        ...
    }client;
    typedef struct multiState {
        multiCmd *commands;     /*事务命令队列 */
        int count;              /*命令总个数*/
        ...
    } multiState;
    其中、client.watched_keys里面放的Node都是数据时如下结构体
    //监视key
    typedef struct watchedKey {
        robj *key;//监视的key
        redisDb *db;//哪个db里面的
    } watchedKey;
    
    //redisDb结构
    typedef struct redisDb {
        ...
        dict *watched_keys;         /*数据库的监控key字典用来实现事务的 key -> client链表*/
        ...
    } redisDb;
    

    三、API

    void multiCommand(client *c);//事务开始
    void execCommand(client *c);//事务执行
    void discardCommand(client *c);//取消事务
    void watchCommand(client *c);//监控客户端的key
    void unwatchCommand(client *c);//取消监控客户端正在监控的key
    

    四、重要API解析

    • multiCommand

      void multiCommand(client *c) {//设置事务标志
          if (c->flags & CLIENT_MULTI) {
              addReplyError(c,"MULTI calls can not be nested");
              return;
          }
          c->flags |= CLIENT_MULTI;//设置事务标志位
          addReply(c,shared.ok);
      }
      
    • watchCommand

      /* 监视特定的key */
      void watchForKey(client *c, robj *key) {
          list *clients = NULL;
          listIter li;
          listNode *ln;
          watchedKey *wk;
          listRewind(c->watched_keys,&li);
          while((ln = listNext(&li))) {
              wk = listNodeValue(ln);
              if (wk->db == c->db && equalStringObjects(key,wk->key))
                  return; /* 如果这个key已经被监控了 直接返回*/
          }
          /* 否则加入监视 */
          clients = dictFetchValue(c->db->watched_keys,key);
          if (!clients) {
              clients = listCreate();
              dictAdd(c->db->watched_keys,key,clients);
              incrRefCount(key);
          }
          listAddNodeTail(clients,c);//加入到监视keys的链表
          wk = zmalloc(sizeof(*wk));
          wk->key = key;
          wk->db = c->db;
          incrRefCount(key);
          listAddNodeTail(c->watched_keys,wk);//加入到watched_keys中
      }
      
      /* 键如果被改变 事务执行会失败*/
      void touchWatchedKey(redisDb *db, robj *key) {
          list *clients;
          listIter li;
          listNode *ln;
      
          if (dictSize(db->watched_keys) == 0) return;
          clients = dictFetchValue(db->watched_keys, key);
          if (!clients) return;
      
          //如果key被改变了,那么监视这个key的所有client都要被设置CLIENT_DIRTY_CAS
          listRewind(clients,&li);
          while((ln = listNext(&li))) {
              client *c = listNodeValue(ln);
      
              c->flags |= CLIENT_DIRTY_CAS;
          }
      }
      
    • execCommand

      void execCommand(client *c) {//执行事务
          int j;
          robj **orig_argv;
          int orig_argc;
          struct redisCommand *orig_cmd;
          int must_propagate = 0; /* 需要将 MULTI/EXEC 同步到 AOF / slaves? */
          int was_master = server.masterhost == NULL;
      
          if (!(c->flags & CLIENT_MULTI)) {//如果并没有设置MULTI标志,无操作
              addReplyError(c,"EXEC without MULTI");
              return;
          }
          //如果事务期间被监视的key被修改了 停止事务,返回nil
          //入队的命令如果有误的话,执行事务error
          if (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC)) {
              addReply(c, c->flags & CLIENT_DIRTY_EXEC ? shared.execaborterr :
                                                         shared.nullarray[c->resp]);
              discardTransaction(c);
              goto handle_monitor;
          }
          //如果对于从库有修改key的操作 取消事务
          if (!server.loading && server.masterhost && server.repl_slave_ro &&
              !(c->flags & CLIENT_MASTER) && c->mstate.cmd_flags & CMD_WRITE)
          {
              addReplyError(c,
                  "Transaction contains write commands but instance "
                  "is now a read-only replica. EXEC aborted.");
              discardTransaction(c);
              goto handle_monitor;
          }
      
          /* 执行所有命令*/
          unwatchAllKeys(c); /* 取消此用户所有监视key 提升cpu效率 */
          orig_argv = c->argv;
          orig_argc = c->argc;
          orig_cmd = c->cmd;
          addReplyArrayLen(c,c->mstate.count);
          for (j = 0; j < c->mstate.count; j++) {
              c->argc = c->mstate.commands[j].argc;
              c->argv = c->mstate.commands[j].argv;
              c->cmd = c->mstate.commands[j].cmd;
              //如果有修改key的操作的话 需要同步到 aof 和 从库
              if (!must_propagate && !(c->cmd->flags & (CMD_READONLY|CMD_ADMIN))) {
                  execCommandPropagateMulti(c);
                  must_propagate = 1;
              }
              //执行命令
              call(c,server.loading ? CMD_CALL_NONE : CMD_CALL_FULL);
              /* 以上函数可能会改变argc argv. */
              c->mstate.commands[j].argc = c->argc;
              c->mstate.commands[j].argv = c->argv;
              c->mstate.commands[j].cmd = c->cmd;
          }
          c->argv = orig_argv;
          c->argc = orig_argc;
          c->cmd = orig_cmd;
          discardTransaction(c);
          /* 需要将 MULTI/EXEC 同步到 AOF / slaves */
          if (must_propagate) {
              int is_master = server.masterhost == NULL;
              server.dirty++;
              if (server.repl_backlog && was_master && !is_master) {
                  char *execcmd = "*1\r\n$4\r\nEXEC\r\n";
                  feedReplicationBacklog(execcmd,strlen(execcmd));
              }
          }
      handle_monitor:
          if (listLength(server.monitors) && !server.loading)
              replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
      }
      

    参考

    相关文章

      网友评论

        本文标题:Redis学习之事务

        本文链接:https://www.haomeiwen.com/subject/xcjuzqtx.html