美文网首页
redis源码阅读

redis源码阅读

作者: 147258_d8b2 | 来源:发表于2020-07-24 22:20 被阅读0次

    redis源码阅读


    1.redis集群

    redis集群储存数据使用哈希槽,类似于hash环。将key通过crc16编码映射为0-16384其中的一个值。并将该key储存到相应的服务器上
    
    集群储存示意图
     redis 客户端命令这里不详细解释
    

    客户端API特征

    • 支持redis集群:

      • 连接redis集群,并发送命令.
    • 支持多key命令:

      • 支持 MSET, MGETDEL命令.
    • 支持管道:

      • 支持管道,并且支持多key命令.
    • 支持异步:

      • 用户可以使用异步模式.

    2.客户端hiredis-vip

    hiredis-vip代码建立在原有的hiredis之上,其实现了对redis集群的访问
    hiredis-vip的源码比hiredis多了下面文件(只带.c):
        * hircluster.c     ----redis集群
        * adlist.c         ----C模仿list结构
        * command.c        ----传输命令
        * crc16.c          ----crc16编码
        * hiarray.c        ----C模仿array结构
        * hiutil.c         ----工具
    只介绍hircluster.c和command.c,其他都是工具。
    
    2.1.1结构体

    结构体cluster_node:服务器节点信息

        typedef struct cluster_node  
        {
            sds name;
            sds addr;
            sds host;
            int port;
            uint8_t role;     /* 主服务 OR 备用服务 */
            uint8_t myself; 
            redisContext *con;
            redisAsyncContext *acon;
            struct hilist *slots;
            struct hilist *slaves;
            int failure_count;
            void *data;   
            struct hiarray *migrating; 
            struct hiarray *importing;  
        }cluster_node;
    

    结构体redisClusterContext:与redis集群连接的上下文

        typedef struct redisClusterContext {
            int err; 
            char errstr[128]; 
            sds ip;
            int port;
            int flags;
            enum redisConnectionType connection_type;
            struct timeval *connect_timeout;
            struct timeval *timeout;
            struct hiarray *slots;
            struct dict *nodes;
            cluster_node *table[REDIS_CLUSTER_SLOTS];
            uint64_t route_version;
            int max_redirect_count;
            int retry_count;
            struct hilist *requests;
            int need_update_route;
            int64_t update_route_time;
        } redisClusterContext;
    

    结构体redisClusterAsyncContext:与redis集群异步连接

        typedef struct redisClusterAsyncContext {
            redisClusterContext *cc;
            int err;
            char errstr[128];
            void *data;
            void *adapter;
            adapterAttachFn *attach_fn;
            redisDisconnectCallback *onDisconnect;
            redisConnectCallback *onConnect;
        } redisClusterAsyncContext;
    

    结构体之间的关系


    结构体关系图
    2.1.2 集群请求

    实现hiredis-vip的redis集群请求分为三步

    1. 集群结构体初始化
    /*初始化结构体*/
    redisClusterContext *redisClusterContextInit(void);
    /*初始化结构体(调用redisClusterContextInit),添加集群节点(调用redisClusterSetOptionAddNodes),更新集群节点hash槽信息(调用cluster_update_route)*/
    redisClusterContext *redisClusterConnect(const char *addrs, int flags);
    redisClusterContext *redisClusterConnectWithTimeout(const char *addrs, 
    const struct timeval tv, int flags);
    redisClusterContext *redisClusterConnectNonBlock(const char *addrs, int flags);
    
    
    1. 集群请求的相关配置
    int redisClusterSetOptionAddNode(redisClusterContext *cc, const char *addr);//填充redisClusterContext的nodes
    int redisClusterSetOptionAddNodes(redisClusterContext *cc, const char *addrs);//填充redisClusterContext的nodes
    int redisClusterSetOptionConnectBlock(redisClusterContext *cc);//填充redisClusterContext的flag
    int redisClusterSetOptionConnectNonBlock(redisClusterContext *cc);//填充redisClusterContext的flag
    int redisClusterSetOptionParseSlaves(redisClusterContext *cc);//填充redisClusterContext的flag
    int redisClusterSetOptionParseOpenSlots(redisClusterContext *cc);//填充redisClusterContext的flag
    int redisClusterSetOptionRouteUseSlots(redisClusterContext *cc);//填充redisClusterContext的flag
    int redisClusterSetOptionConnectTimeout(redisClusterContext *cc, const struct timeval tv);//填充redisClusterContext的connect_timeout
    int redisClusterSetOptionTimeout(redisClusterContext *cc, const struct timeval tv);//填充redisClusterContext的timeout
    int redisClusterSetOptionMaxRedirect(redisClusterContext *cc,  int max_redirect_count);//填充redisClusterContext的max_redirect_count
    
    1. 建立和发送请求
      1. 格式化命令,如:SET foo bar1 转为 *3\r\n$3\r\nSET\r\n$3\r\nfoo\r\n$4\r\nbar1\r\n。*3表示3个参数,$3表示该参数占3个字符,这也是最后客户端发送给服务端的消息
      2. 创建command结构体,并将格式化的命令填充该结构体。根据key使用crc16编码计算hash值。并通过hash值找到node。
      3. 根据node创建redisContext结构体,建立socket连接,并将连接放在redisContext结构体,将格式化的命令放入结构体的缓冲区。
      4. 发送并接收请求。
    /*下面3个函数是格式化命令*/
    void *redisClustervCommand(redisClusterContext *cc, const char *format, va_list ap);
    void *redisClusterCommand(redisClusterContext *cc, const char *format, ...);
    void *redisClusterCommandArgv(redisClusterContext *cc, int argc, const char **argv, const size_t *argvlen);
    /*创建command结构体,计算key的hash值,填充该结构体*/
    void *redisClusterFormattedCommand(redisClusterContext *cc, char *cmd, int len);
    /*下面四个函数将命令放入该结构体的缓冲区。*/
    int redisClusterAppendFormattedCommand(redisClusterContext *cc, char *cmd, int len);
    int redisClustervAppendCommand(redisClusterContext *cc, const char *format, va_list ap);
    int redisClusterAppendCommand(redisClusterContext *cc, const char *format, ...);
    int redisClusterAppendCommandArgv(redisClusterContext *cc, int argc, const char **argv, const size_t *argvlen);
    /*根据node创建上下文也就是redisContext结构体。再此函数中建立socket连接,并将连接放在redisContext结构体*/
    redisContext *ctx_get_by_node(redisClusterContext *cc, struct cluster_node *node);
    /*执行命令,调用上面的ctx_get_by_node函数*/
    int redisGetReply(redisContext *c, void **reply);
    /*
     * 更新集群的node配置。
     */
    int cluster_update_route(redisClusterContext *cc);
    struct dict *parse_cluster_nodes(redisClusterContext *cc, char *str, int str_len, int flags);
    struct dict *parse_cluster_slots(redisClusterContext *cc, redisReply *reply, int flags);
    

    2.1.2 异步接口

    异步接口结构体

    typedef struct redisClusterAsyncContext {
        redisClusterContext *cc;
        int err;
        char errstr[128]; 
        void *data;
        void *adapter;
        adapterAttachFn *attach_fn;
        redisDisconnectCallback *onDisconnect;
        redisConnectCallback *onConnect;
    } redisClusterAsyncContext;
    

    异步方法

    /*初始化redis集群结构体redisClusterContext(调用redisClusterConnectNonBlock),初始化异步结构体*/
    redisClusterAsyncContext *redisClusterAsyncConnect(const char *addrs, int flags);
    
    /*设置回掉连接和断开连接的函数*/
    int redisClusterAsyncSetConnectCallback(redisClusterAsyncContext *acc, redisConnectCallback *fn);
    int redisClusterAsyncSetDisconnectCallback(redisClusterAsyncContext *acc, redisDisconnectCallback *fn);
    /*下面四个函数将命令放入该结构体的缓冲区。调用actx_get_by_node来获取redisContext结构体*/
    int redisClusterAsyncFormattedCommand(redisClusterAsyncContext *acc, redisClusterCallbackFn *fn, void *privdata, char *cmd, int len);
    int redisClustervAsyncCommand(redisClusterAsyncContext *acc, redisClusterCallbackFn *fn, void *privdata, const char *format, va_list ap);
    int redisClusterAsyncCommand(redisClusterAsyncContext *acc, redisClusterCallbackFn *fn, void *privdata, const char *format, ...);
    int redisClusterAsyncCommandArgv(redisClusterAsyncContext *acc, redisClusterCallbackFn *fn, void *privdata, int argc, const char **argv, const size_t *argvlen);
    /*断开连接*/
    void redisClusterAsyncDisconnect(redisClusterAsyncContext *acc);
    void redisClusterAsyncFree(redisClusterAsyncContext *acc);
    /*异步根据node获取redisAsyncContext上下文*/
    redisAsyncContext *actx_get_by_node(redisClusterAsyncContext *acc, cluster_node *node);
    

    3.hiredis


    4.redis

    ae事件处理

    redis实现了自己的事件库,这个事件库被实现在ae.c
    理解redis事件库怎样工作的最好方式是明白redis如何使用它。

    Event Loop 初始化

    initServer 方法定义在 redis.c ,该方法用于初始化全局变量serverredisServer 类型结构体)。server变量中一个最重要的变量是el (aeEventLoop 类型结构体)。<--!server.el包含了ae事件!>

    initServer 方法通过调用 aeCreateEventLoop 方法初始化server.elaeEventLoop结构体定义如下:

    typedef struct aeEventLoop
    {
        int maxfd;/*最大句柄*/
        long long timeEventNextId;/*定时事件的个数,*/
        aeFileEvent events[AE_SETSIZE]; /*已经注册的句柄事件*/
        aeFiredEvent fired[AE_SETSIZE]; /*已经触发的句柄事件*/
        aeTimeEvent *timeEventHead; /*定时事件链表*/
        int stop;/*循环是否结束*/
        void *apidata; 
        aeBeforeSleepProc *beforesleep;/*指针函数,每次sleep事件前调用的方法*/
        aeBeforeSleepProc *aftersleep;/*指针函数,每次sleep事件后调用的方法*/
    } aeEventLoop;
    /*注:1.aftersleep在原文中没有,但是在redis5代码中出现。其中sleep指阻塞或者等待,每次循环时都会等待一段时间(该时间由触发最短时间事件的时间决定)。下文会介绍。
    2.redis5支持的IO复用:epoll,select,evport,kqueue。本文中以epoll为准
    3.apidata储存epoll句柄和epoll_event结构体数组,epoll_
    */
    

    aeCreateEventLoop

    aeCreateEventLoop 首先分配内存给 aeEventLoop 结构体,然后调用 ae_epoll.c:aeApiCreateae_epoll.c:aeApiCreate会使用合适的IO复用接口

    aeApiCreate 分配内存给 aeApiStateaeApiState 有两个变量:epfd用于储存 epoll_create 返回的 epoll句柄, events数组变量(struct epoll_event类型 )。event的索引为socket句柄

    aeCreateTimeEvent

    aeCreateTimeEvent 接受如下参数:

    • eventLoop: 这是 server.el
    • milliseconds: 定时时间
    • proc: 函数指针,定时事件的函数。
    • clientData: 大部分为 NULL.
    • finalizerProc:指向从定时事件列表中删除定时事件之前必须调用的函数的指针。
      initServer 调用aeCreateTimeEvent 添加定时事件到 server.eltimeEventHeadtimeEventHead 是双向链表。原文调用如下:
    aeCreateTimeEvent(server.el /*eventLoop*/, 1 /*milliseconds*/, serverCron /*proc*/, NULL /*clientData*/, NULL /*finalizerProc*/);
    
    

    redis.c:serverCron 里面包含了多个定时事件。

    aeCreateFileEvent

    The essence of aeCreateFileEvent function is to execute epoll_ctl system call which adds a watch for EPOLLIN event on the listening descriptor create by anetTcpServer and associate it with the epoll descriptor created by a call to aeCreateEventLoop

    aeCreateFileEvent函数的本质是执行epoll_ctl系统调用。aeCreateFileEvent方法使用epoll监控socket句柄,并为其配置mask表示socket句柄的读写方式。

    initServer将以下参数传递给aeCreateFileEvent

    • server.el:由aeCreateEventLoop创建的ae事件句柄。
    • server.fd:需要监听的句柄。同时也作为一个索引从eventLoop->events获取write和read回掉函数。
    • AE_READABLE:表示监听句柄的模式
    • acceptHandler:句柄事件响应的回到函数。该函数指针存储在eventLoop->events[server.fd]->rfileProc

    这样就完成了Redis事件循环的初始化。

    Event Loop Processing

    main 将会调用aeMain 进行事件的循环。aeMain的循环在主进程,所以想要移植ae事件的同学注意aeMain一定要在代码的最后。

    aeMain 使用while循环调用 ae.c:aeProcessEvents 。在while会不停的判断是否有aftersleep函数,如果有就执行。ae.c:aeProcessEvents 循环同时处理句柄事件和定时事件。

    aeProcessEvents

    1. ae.c:aeProcessEvents通过aeSearchNearestTimer找到最近执行的定时事件。并获取该时间间隔。
    2. 将最短时间间隔赋值给tvp。如果最短时间间隔为负,将tvp设置为0,如redis中第一次的定时时间为1毫秒。
    3. 调用aeApiPoll函数。aeApiPoll函数实际调用 epoll_wait来sleep,sleep时间为tvp。之后aeApiPoll函数将有响应的socket句柄放入eventLoop->fired
    4. 查看是否有aftersleep函数,有则执行。
    5. 查看eventLoop->fired中是否有需要处理的句柄,有则处理
    6. 执行processTimeEventsprocessTimeEvents用于处理定时事件
    7. processTimeEvents中遍历链表执行定时任务。如果定时任务被贴上删除标记则删除。

    eventLoop->fired 数组元素内容:

    • fd: 已经触发socket事件的句柄
    • mask: 事件描述,可读或者可写

    我们以客户端请求连接来说明那些函数被调用了。

    1. 客户端向服务器请求一个连接。
    2. aeApiPoll将会监测到服务器句柄触发可读。将服务器句柄放入 eventLoop->fired
    3. ae事件检测到eventLoop->fired有触发,调用acceptHandler
    4. acceptHandler 实际执行 accept ,获得客户端句柄。调用aeCreateFileEvent将客户端句柄放入ae事件中。如下
    if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
        readQueryFromClient, c) == AE_ERR) {
        freeClient(c);
        return NULL;
    }
    
    

    processTimeEvents

    ae.processTimeEvents 遍历链表eventLoop->timeEventHead。如果达到时间就执行定时事件。每次执行一次完定时事件,都会用 ae.c:aeAddMilliSeconds重新设置下一次执行时间。在redis中,所有的定时事件都在serverCron中。通过配置文件hz 10设置频率。默认1000/10秒执行一次。

    持久化

    rdb的优势

    • RDB是Redis数据的非常紧凑的单文件时间点表示。RDB文件非常适合备份。例如每天备份一次。这使你可以在灾难情况下轻松还原数据集的不同版本。
    • RDB非常适合灾难恢复,因为它可以将单个压缩文件传输到远程数据中心,也可以传输到Amazon S3上。
    • RDB最大限度地提高了Redis的性能,因为Redis通过fork的方式生成一个子进程,子进程执行rdb储存工作。父实例将永远不会执行磁盘I/O等操作。
    • 与AOF相比,RDB允许大型数据集更快地重启。
      RDB的缺点
    • 如果是着重强调数据完整性和安全性,rdb并不是一个好方案,因为他常常是分钟以上的时间进行备份。
    • RDB需要经常使用fork()才能使用子进程将其持久化在磁盘上。如果数据集很大,Fork()可能很耗时,并且如果数据集很大且CPU性能不佳,则可能导致Redis停止为客户端服务几毫秒甚至一秒钟。
      AOF的优势
    • 使用AOF Redis更加持久:您可以使用不同的fsync策略:完全没有fsync,每秒fsync,每个查询都fsync。使用默认策略fsync时,每秒的写入性能仍然很好,fsync是使用后台线程执行的,并且在没有进行fsync的情况下,主线程将尽力执行写入操作。
    • AOF日志是仅追加的日志。类似于linux的history功能。即使由于某种原因(磁盘已满或其他原因)以半写命令结束日志,redis-check-aof工具也可以轻松修复它。
    • Redis的aof文件太大时,Redis可以在后台自动重写AOF。重写是完全安全的,因为Redis继续追加到旧文件时,会生成一个全新的文件,一旦准备好第二个文件,Redis就会切换两者并开始追加到新的那一个。
    • AOF以易于理解和解析的格式包含所有操作的日志。您甚至可以轻松导出AOF文件。例如,即使您使用FLUSHALL命令刷新了所有错误文件,如果在此期间未执行任何日志重写操作,您仍然可以保存数据集,只是停止服务器,删除最新命令并重新启动Redis。
      AOF的缺点
    • 对于同一数据集,AOF文件通常大于等数据量的RDB文件。
      根据确切的fsync策略,AOF可能比RDB慢。通常,在将fsync设置为每秒的情况下,性能仍然很高,并且在禁用fsync的情况下,即使在高负载下,它也应与RDB一样快。即使在巨大的写负载的情况下,RDB仍然能够提供有关最大延迟的更多保证。
    • 过去,我们在特定命令中遇到过罕见的错误(例如,其中有一个涉及阻止命令,例如BRPOPLPUSH),导致生成的AOF在重新加载时无法完全重现相同的数据集。这种错误很少见,我们在测试套件中进行了测试,可以自动创建随机的复杂数据集,然后重新加载它们以检查一切正常,但是对于RDB持久性来说,这种错误几乎是不可能的。为了更清楚地说明这一点:Redis AOF像MySQL或MongoDB一样,以增量方式更新现有状态,而RDB快照一次又一次地创建所有内容,从概念上讲更健壮。但是-1)应该注意的是,每次Redis重写AOF时,都会从数据集中包含的实际数据开始重新创建AOF,与始终附加AOF文件(或重写为读取旧AOF而不是读取内存中的数据)相比,提高了对错误的抵抗力。2)我们从未收到用户关于在现实世界中检测到的AOF损坏的单个报告。
      rdb源码解读
      rdb在出现在两个地方。1.程序结束之前。2.serverCron定时器操作。这里我们主要看serverCron。
    int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
        //省略...
        if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 || ldbPendingChildren())
        {
            //省略...
        } else {
            for (j = 0; j < server.saveparamslen; j++) {
                //轮流判断save规则,符合规则就执行持久化。规则见redis.conf
                struct saveparam *sp = server.saveparams+j;
                if (server.dirty >= sp->changes &&
                    server.unixtime-server.lastsave > sp->seconds &&
                    (server.unixtime-server.lastbgsave_try >
                     CONFIG_BGSAVE_RETRY_DELAY ||
                     server.lastbgsave_status == C_OK))
                {
                    serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...",
                        sp->changes, (int)sp->seconds);
                    rdbSaveInfo rsi, *rsiptr;
                    rsiptr = rdbPopulateSaveInfo(&rsi);
                    rdbSaveBackground(server.rdb_filename,rsiptr);
                    break;
                }
            }
            //省略...
        }
        //省略...
        return 1000/server.hz;
    }
    //转到rdbSaveBackground函数
    int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {
         //省略...
        openChildInfoPipe();
        //fork子进程进行持久化
        if ((childpid = fork()) == 0) {
            int retval;
    
            /* Child */
            closeListeningSockets(0);
            redisSetProcTitle("redis-rdb-bgsave");
            retval = rdbSave(filename,rsi);
            if (retval == C_OK) {
                size_t private_dirty = zmalloc_get_private_dirty(-1);
    
                if (private_dirty) {
                    serverLog(LL_NOTICE,
                        "RDB: %zu MB of memory used by copy-on-write",
                        private_dirty/(1024*1024));
                }
    
                server.child_info_data.cow_size = private_dirty;
                sendChildInfo(CHILD_INFO_TYPE_RDB);
            }
            exitFromChild((retval == C_OK) ? 0 : 1);
        } else {
            /* Parent */
            //省略...
        }
        //省略...
        return C_OK; 
    }
    //转到rdbSave
    int rdbSave(char *filename, rdbSaveInfo *rsi) {
        //省略...
        if (rdbSaveRio(&rdb,&error,RDB_SAVE_NONE,rsi) == C_ERR) {
            errno = error;
            goto werr;
        }
        //省略...
    }
    //转到rdbSaveRio,这里执行内存持久化
    int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
        dictIterator *di = NULL;
        dictEntry *de;
        char magic[10];
        int j;
        uint64_t cksum;
        size_t processed = 0;
    
        if (server.rdb_checksum)
            rdb->update_cksum = rioGenericUpdateChecksum;
        //添加文件头部
        snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
        if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
        //添加其他变量,创建时间,内存占用,bit值
        if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr;
        //循环数据库
        for (j = 0; j < server.dbnum; j++) {
            redisDb *db = server.db+j;
            dict *d = db->dict;
            if (dictSize(d) == 0) continue;
            di = dictGetSafeIterator(d);
            //储存TYPE为数据库编号,表示后面一个字节指数据库号
            if (rdbSaveType(rdb,RDB_OPCODE_SELECTDB) == -1) goto werr;
            //储存数据库号
            if (rdbSaveLen(rdb,j) == -1) goto werr;
    
            uint64_t db_size, expires_size;
            db_size = dictSize(db->dict);
            expires_size = dictSize(db->expires);
            //储存数据库的一些信息
            if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr;
            if (rdbSaveLen(rdb,db_size) == -1) goto werr;
            if (rdbSaveLen(rdb,expires_size) == -1) goto werr;
    
            /* 循环字典 */
            while((de = dictNext(di)) != NULL) {
                sds keystr = dictGetKey(de);
                robj key, *o = dictGetVal(de);
                long long expire;
    
                initStaticStringObject(key,keystr);
                expire = getExpire(db,&key);
                //储存key value值
                if (rdbSaveKeyValuePair(rdb,&key,o,expire) == -1) goto werr;
                //....
            }
            dictReleaseIterator(di);
            di = NULL; /* So that we don't release it again on error. */
        }
        //脚本缓存
        if (rsi && dictSize(server.lua_scripts)) {
            di = dictGetIterator(server.lua_scripts);
            while((de = dictNext(di)) != NULL) {
                robj *body = dictGetVal(de);
                if (rdbSaveAuxField(rdb,"lua",3,body->ptr,sdslen(body->ptr)) == -1)
                    goto werr;
            }
            dictReleaseIterator(di);
            di = NULL; /* So that we don't release it again on error. */
        }
        //结束字符
        if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr;
        //校验位,crc16编码
        cksum = rdb->cksum;
        memrev64ifbe(&cksum);
        if (rioWrite(rdb,&cksum,8) == 0) goto werr;
        return C_OK;
    
    werr:
        if (error) *error = errno;
        if (di) dictReleaseIterator(di);
        return C_ERR;
    }
    //转到rdbSaveKeyValuePair
    int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) {
        int savelru = server.maxmemory_policy & MAXMEMORY_FLAG_LRU;
        int savelfu = server.maxmemory_policy & MAXMEMORY_FLAG_LFU;
    
        //储存过期时间
        if (expiretime != -1) {
            if (rdbSaveType(rdb,RDB_OPCODE_EXPIRETIME_MS) == -1) return -1;
            if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1;
        }
    
        //储存lru信息,https://www.cnblogs.com/hapjin/archive/2019/06/07/10933405.html
        if (savelru) {
            uint64_t idletime = estimateObjectIdleTime(val);
            idletime /= 1000; /* Using seconds is enough and requires less space.*/
            if (rdbSaveType(rdb,RDB_OPCODE_IDLE) == -1) return -1;
            if (rdbSaveLen(rdb,idletime) == -1) return -1;
        }
    
        //储存lfu信息
        if (savelfu) {
            uint8_t buf[1];
            buf[0] = LFUDecrAndReturn(val);
            /* We can encode this in exactly two bytes: the opcode and an 8
             * bit counter, since the frequency is logarithmic with a 0-255 range.
             * Note that we do not store the halving time because to reset it
             * a single time when loading does not affect the frequency much. */
            if (rdbSaveType(rdb,RDB_OPCODE_FREQ) == -1) return -1;
            if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
        }
        //储存类型
        if (rdbSaveObjectType(rdb,val) == -1) return -1;
        //储存key,value。格式为[len][date]。当value是"100","-17"这中能被编码成整形时,就储存整形形式
        if (rdbSaveStringObject(rdb,key) == -1) return -1;
        if (rdbSaveObject(rdb,val,key) == -1) return -1;
        return 1;
    }
    //savekey相当于save string。
    //转到rdbSaveObject,储存内容。
    //最后调用write持久化。
    ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key) {
        ssize_t n = 0, nwritten = 0;
        if (o->type == OBJ_STRING) {
            /* Save a string value */
            if ((n = rdbSaveStringObject(rdb,o)) == -1) return -1;
            nwritten += n;
        } else if (o->type == OBJ_LIST) {
            /* Save a list value */
            if (o->encoding == OBJ_ENCODING_QUICKLIST) {
                quicklist *ql = o->ptr;
                quicklistNode *node = ql->head;
    
                if ((n = rdbSaveLen(rdb,ql->len)) == -1) return -1;
                nwritten += n;
    
                while(node) {
                    if (quicklistNodeIsCompressed(node)) {
                        void *data;
                        size_t compress_len = quicklistGetLzf(node, &data);
                        if ((n = rdbSaveLzfBlob(rdb,data,compress_len,node->sz)) == -1) return -1;
                        nwritten += n;
                    } else {
                        if ((n = rdbSaveRawString(rdb,node->zl,node->sz)) == -1) return -1;
                        nwritten += n;
                    }
                    node = node->next;
                }
            } else {
                serverPanic("Unknown list encoding");
            }
        } else if (o->type == OBJ_SET) {
            /* Save a set value */
            if (o->encoding == OBJ_ENCODING_HT) {
                dict *set = o->ptr;
                dictIterator *di = dictGetIterator(set);
                dictEntry *de;
    
                if ((n = rdbSaveLen(rdb,dictSize(set))) == -1) {
                    dictReleaseIterator(di);
                    return -1;
                }
                nwritten += n;
    
                while((de = dictNext(di)) != NULL) {
                    sds ele = dictGetKey(de);
                    if ((n = rdbSaveRawString(rdb,(unsigned char*)ele,sdslen(ele)))
                        == -1)
                    {
                        dictReleaseIterator(di);
                        return -1;
                    }
                    nwritten += n;
                }
                dictReleaseIterator(di);
            } else if (o->encoding == OBJ_ENCODING_INTSET) {
                size_t l = intsetBlobLen((intset*)o->ptr);
    
                if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;
                nwritten += n;
            } else {
                serverPanic("Unknown set encoding");
            }
        } else if (o->type == OBJ_ZSET) {
            /* Save a sorted set value */
            if (o->encoding == OBJ_ENCODING_ZIPLIST) {
                size_t l = ziplistBlobLen((unsigned char*)o->ptr);
    
                if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;
                nwritten += n;
            } else if (o->encoding == OBJ_ENCODING_SKIPLIST) {
                zset *zs = o->ptr;
                zskiplist *zsl = zs->zsl;
    
                if ((n = rdbSaveLen(rdb,zsl->length)) == -1) return -1;
                nwritten += n;
    
                /* We save the skiplist elements from the greatest to the smallest
                 * (that's trivial since the elements are already ordered in the
                 * skiplist): this improves the load process, since the next loaded
                 * element will always be the smaller, so adding to the skiplist
                 * will always immediately stop at the head, making the insertion
                 * O(1) instead of O(log(N)). */
                zskiplistNode *zn = zsl->tail;
                while (zn != NULL) {
                    if ((n = rdbSaveRawString(rdb,
                        (unsigned char*)zn->ele,sdslen(zn->ele))) == -1)
                    {
                        return -1;
                    }
                    nwritten += n;
                    if ((n = rdbSaveBinaryDoubleValue(rdb,zn->score)) == -1)
                        return -1;
                    nwritten += n;
                    zn = zn->backward;
                }
            } else {
                serverPanic("Unknown sorted set encoding");
            }
        } else if (o->type == OBJ_HASH) {
            /* Save a hash value */
            if (o->encoding == OBJ_ENCODING_ZIPLIST) {
                size_t l = ziplistBlobLen((unsigned char*)o->ptr);
    
                if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;
                nwritten += n;
    
            } else if (o->encoding == OBJ_ENCODING_HT) {
                dictIterator *di = dictGetIterator(o->ptr);
                dictEntry *de;
    
                if ((n = rdbSaveLen(rdb,dictSize((dict*)o->ptr))) == -1) {
                    dictReleaseIterator(di);
                    return -1;
                }
                nwritten += n;
    
                while((de = dictNext(di)) != NULL) {
                    sds field = dictGetKey(de);
                    sds value = dictGetVal(de);
    
                    if ((n = rdbSaveRawString(rdb,(unsigned char*)field,
                            sdslen(field))) == -1)
                    {
                        dictReleaseIterator(di);
                        return -1;
                    }
                    nwritten += n;
                    if ((n = rdbSaveRawString(rdb,(unsigned char*)value,
                            sdslen(value))) == -1)
                    {
                        dictReleaseIterator(di);
                        return -1;
                    }
                    nwritten += n;
                }
                dictReleaseIterator(di);
            } else {
                serverPanic("Unknown hash encoding");
            }
        } else if (o->type == OBJ_STREAM) {
            /* Store how many listpacks we have inside the radix tree. */
            stream *s = o->ptr;
            rax *rax = s->rax;
            if ((n = rdbSaveLen(rdb,raxSize(rax))) == -1) return -1;
            nwritten += n;
    
            /* Serialize all the listpacks inside the radix tree as they are,
             * when loading back, we'll use the first entry of each listpack
             * to insert it back into the radix tree. */
            raxIterator ri;
            raxStart(&ri,rax);
            raxSeek(&ri,"^",NULL,0);
            while (raxNext(&ri)) {
                unsigned char *lp = ri.data;
                size_t lp_bytes = lpBytes(lp);
                if ((n = rdbSaveRawString(rdb,ri.key,ri.key_len)) == -1) return -1;
                nwritten += n;
                if ((n = rdbSaveRawString(rdb,lp,lp_bytes)) == -1) return -1;
                nwritten += n;
            }
            raxStop(&ri);
    
            /* Save the number of elements inside the stream. We cannot obtain
             * this easily later, since our macro nodes should be checked for
             * number of items: not a great CPU / space tradeoff. */
            if ((n = rdbSaveLen(rdb,s->length)) == -1) return -1;
            nwritten += n;
            /* Save the last entry ID. */
            if ((n = rdbSaveLen(rdb,s->last_id.ms)) == -1) return -1;
            nwritten += n;
            if ((n = rdbSaveLen(rdb,s->last_id.seq)) == -1) return -1;
            nwritten += n;
    
            /* The consumer groups and their clients are part of the stream
             * type, so serialize every consumer group. */
    
            /* Save the number of groups. */
            size_t num_cgroups = s->cgroups ? raxSize(s->cgroups) : 0;
            if ((n = rdbSaveLen(rdb,num_cgroups)) == -1) return -1;
            nwritten += n;
    
            if (num_cgroups) {
                /* Serialize each consumer group. */
                raxStart(&ri,s->cgroups);
                raxSeek(&ri,"^",NULL,0);
                while(raxNext(&ri)) {
                    streamCG *cg = ri.data;
    
                    /* Save the group name. */
                    if ((n = rdbSaveRawString(rdb,ri.key,ri.key_len)) == -1)
                        return -1;
                    nwritten += n;
    
                    /* Last ID. */
                    if ((n = rdbSaveLen(rdb,cg->last_id.ms)) == -1) return -1;
                    nwritten += n;
                    if ((n = rdbSaveLen(rdb,cg->last_id.seq)) == -1) return -1;
                    nwritten += n;
    
                    /* Save the global PEL. */
                    if ((n = rdbSaveStreamPEL(rdb,cg->pel,1)) == -1) return -1;
                    nwritten += n;
    
                    /* Save the consumers of this group. */
                    if ((n = rdbSaveStreamConsumers(rdb,cg)) == -1) return -1;
                    nwritten += n;
                }
                raxStop(&ri);
            }
        } else if (o->type == OBJ_MODULE) {
            /* Save a module-specific value. */
            RedisModuleIO io;
            moduleValue *mv = o->ptr;
            moduleType *mt = mv->type;
            moduleInitIOContext(io,mt,rdb,key);
    
            /* Write the "module" identifier as prefix, so that we'll be able
             * to call the right module during loading. */
            int retval = rdbSaveLen(rdb,mt->id);
            if (retval == -1) return -1;
            io.bytes += retval;
    
            /* Then write the module-specific representation + EOF marker. */
            mt->rdb_save(&io,mv->value);
            retval = rdbSaveLen(rdb,RDB_MODULE_OPCODE_EOF);
            if (retval == -1) return -1;
            io.bytes += retval;
    
            if (io.ctx) {
                moduleFreeContext(io.ctx);
                zfree(io.ctx);
            }
            return io.error ? -1 : (ssize_t)io.bytes;
        } else {
            serverPanic("Unknown object type");
        }
        return nwritten;
    }
    //优化处理1.提前分配好rio内存空间
    

    集群

    数据结构

    哨兵模式

    模块

    相关文章

      网友评论

          本文标题:redis源码阅读

          本文链接:https://www.haomeiwen.com/subject/imzsrctx.html