美文网首页
看!源码Redis之listpack(stream消息存储结构)

看!源码Redis之listpack(stream消息存储结构)

作者: starskye | 来源:发表于2023-01-15 18:40 被阅读0次

    此结构在Stream中扮演着存储消息的角色。

    //listpack 头部大小为 6 其中total bytes 为4个字节 numelement 为2个字节
    #define LP_HDR_SIZE 6
    //结束符
    #define LP_EOF 0xFF
    //设置total值
    //p自然是 listpack的指针 而他的前四个字节存储的便是其字节的总数
    //所以此处需要将一个四个字节的 int 赋值给四个byte 此处需要注意 存储方式为小端
    //因为将低位存储到了 0字节 依次类推
    #define lpSetTotalBytes(p,v) do { \
        (p)[0] = (v)&0xff; \
        (p)[1] = ((v)>>8)&0xff; \
        (p)[2] = ((v)>>16)&0xff; \
        (p)[3] = ((v)>>24)&0xff; \
    } while(0)
    //设置listpack的元素数
    //与上方相同 存储为小端
    #define lpSetNumElements(p,v) do { \
        (p)[4] = (v)&0xff; \
        (p)[5] = ((v)>>8)&0xff; \
    } while(0)
    //创建一个listpack
    unsigned char *lpNew(void) {
        //声明一个7字节的空间 其中+1 是因为 listpack有个结束符 0xFF占用一个字节
        unsigned char *lp = lp_malloc(LP_HDR_SIZE+1);
        //如果声明失败返回null
        if (lp == NULL) return NULL;
        //设置lp的total bytes初始化数据为 7
        lpSetTotalBytes(lp,LP_HDR_SIZE+1);
        //设置默认element数为 0
        lpSetNumElements(lp,0);
        //设置结束符为 0xFF
        lp[LP_HDR_SIZE] = LP_EOF;
        return lp;
    }
    //listpack中 entry的最大编码长度为 9
    #define LP_MAX_INT_ENCODING_LEN 9
    //listpack中 entry长度描述最大为5个字节
    #define LP_MAX_BACKLEN_SIZE 5
    //前插
    #define LP_BEFORE 0
    //后插
    #define LP_AFTER 1
    //替换
    #define LP_REPLACE 2
    //listpack 插入数据、删除数据、替换数据 根据参数调用来确定
    // lp listpack的指针 ele 新数据 size 数据长度 p 如果是插入则此数据为需要插如到的那个数据,如果是删除则为需要删除的数据
    // where 需要操作的位置LP_BEFORE(前插) 、 LP_AFTER(后插) 、LP_REPLACE(替换) newp 操作完数据的下一个数据指针
    //如果是删除 则ele为NULL
    unsigned char *lpInsert(unsigned char *lp, unsigned char *ele, uint32_t size, unsigned char *p, int where, unsigned char **newp) {
        //首先声明当前entry的 需要编码大小 和 长度描述大小 此处backlen用于后往前遍历 所以此处长度只包含 编码长度+数据长度
        unsigned char intenc[LP_MAX_INT_ENCODING_LEN];
        unsigned char backlen[LP_MAX_BACKLEN_SIZE];
        //计算当前编码的长度
        uint64_t enclen;
    
        //如果ele为null说明需要删除 而此处的删除是将当前p占用的字节替换为空字节
        if (ele == NULL) where = LP_REPLACE;
        //如果是在之后插入 则将其转换为向前插入   这样做少些逻辑代码 值得学习
        if (where == LP_AFTER) {
            //首先跳转到下一个节点
            p = lpSkip(p);
            //设置where为before
            where = LP_BEFORE;
        }
    
        //计算当前p 在lp的偏移值
        unsigned long poff = p-lp;
    
        //根据ele 计算需要的编码类型 共 9中类型
        //1、 7 位的无符号整型 加上内容共占 1个字节 结构为 0xxx xxxx x是存储数据的地方 0标示位
        //2、 63长度以下的字符串  结构为 10LL LLLL L是存储字符串长度的地方 10 标识位 后续便是内容
        //3、 13位的有符号整型 加上内容共占 2个字节 结构为 110x xxxx xxxx xxxx x是存储数据的地方 110 标示位
        //4、 4095长度以下 63 长度以上的字符串 结构为 1110  LLLL LLLL LLLL L存储字符串长度的地方 1110 标识位 后续便是L计算出长度大小的内容
        //5、 4294967295 长度以下 4095 长度以上的字符串 结构为 1111 0000 后续四个字节存储 字符串长度 1111 0000 标示位
        //6、 16位整型 结构为 1111 0001 也是为标示位 后续两个字节为 数据值 共 3个字节大小
        //7、 24位整型 结构为 1111 0010 也是为标示位 后续三个字节为 数据值 共 4个字节大小
        //8、 32位整型 结构为 1111 0100 也是为标示位 后续四个字节为 数据值 共 5个字节大小
        //9、 64位整型 结构为 1111 1000 也是为标示位 后续八个字节为 数据值 共 9个字节大小 然而也是已知占用最多的类型所以intenc为9
        int enctype;
        if (ele) {
            //便是使用上方的规则计算当前ele的类型编码 存储到 intenc中 在根据编码的类型和数据获取对应的长度 enclen
            enctype = lpEncodeGetType(ele,size,intenc,&enclen);
        } else {
            //如果为NULL 则设置type为-1 长度为 0
            enctype = -1;
            enclen = 0;
        }
    
       //计算当前backlen需要占用的大小 根据enclen计算 规则如下
       //1、enclen <= 127  占用 1 个字节
       //2、enclen < 16383 占用 2 个字节
       //3、enclen < 2097151 占用 3 个字节
       //4、enclen < 268435455 占用4个字节
       //5、如果不满足上方的条件则占用五个字节 并且将enclen 写入到backlen中
        unsigned long backlen_size = ele ? lpEncodeBacklen(backlen,enclen) : 0;
    
        //获取当前的字节数
        uint64_t old_listpack_bytes = lpGetTotalBytes(lp);
        //如果是替换则计算需要需要替换的字节数 即 encoding size + content size + backlen size 便是一个entry的总共大小
        uint32_t replaced_len  = 0;
        if (where == LP_REPLACE) {
            //解析当前p的 encoding size + content size
            replaced_len = lpCurrentEncodedSize(p);
            //根据获取到len 计算下backlen占用的size
            replaced_len += lpEncodeBacklen(NULL,replaced_len);
        }
        //计算一下最新listpack需要的字节数 当前字节数 + 新ele的字节数 - 需要替换元素的字节数
        uint64_t new_listpack_bytes = old_listpack_bytes + enclen + backlen_size
                                      - replaced_len;
        //获取到最新的字节数后 判断当前字节数是否超过了限制 即 4294967295 因为listpack的长度描述 total bytes 是四个字节 无符号的最大数便是4294967295
        //如果超过那么不在执行 返回NULL 因为程序出错
        if (new_listpack_bytes > UINT32_MAX) return NULL;
    
        //获取需要被替换的目标地址
        unsigned char *dst = lp + poff;
    
        //如果上方计算的最新大小超过了原有的大小
        if (new_listpack_bytes > old_listpack_bytes) {
            //那么遍进行realloc 重新分配内存空间
            if ((lp = lp_realloc(lp,new_listpack_bytes)) == NULL) return NULL;
            //分配完成后再次计算需要操作的目标位置
            dst = lp + poff;
        }
    
       //判断当前位置 因为之前将after修改为了before所以此处只有 before和replace
        if (where == LP_BEFORE) {
            //如果是before操作 那么此处将 dst 位置的数据 移动到 dst+enclen+backlen_size的位置
            //这样做变空出了当前ele大小的位置方便后续操作
            memmove(dst+enclen+backlen_size,dst,old_listpack_bytes-poff);
        } else {
            //如果是替换 那么计算当前元素大小和 原先元素大小的差 如果是正数 则往后移 如果是负数则向前移
            long lendiff = (enclen+backlen_size)-replaced_len;
            //dst + replacelen 便是 当前元素的下一个元素地址 然后在加上上方计算的差 这样便计算出 下一个元素需要位移的具体位置
            //然后设置需要位移的元素首地址 再次计算此元素的长度 总长度-当前替换元素的偏移-当前替换元素的大小 便是后面所有元素的占用大小
            memmove(dst+replaced_len+lendiff,
                    dst+replaced_len,
                    old_listpack_bytes-poff-replaced_len);
        }
    
        //如果当前listpack的大小小于了原先的大小 则需要丢弃后面空出来的字节
        //因为上方已经进行了插入/替换的操作所以此处尾部的字节数 必然是多余的
        if (new_listpack_bytes < old_listpack_bytes) {
            if ((lp = lp_realloc(lp,new_listpack_bytes)) == NULL) return NULL;
            //根据偏移 再次计算当前元素的首地址  因为上方经过了插入 和 替换所以此处指针一定是新元素的地址
            dst = lp + poff;
        }
    
        //如果需要回传新元素的地址 则设置为dst
        if (newp) {
            *newp = dst;
            //如果是删除操作并且当前删除的元素是最后一个 那么 不存在新元素所以为NULL
            if (!ele && dst[0] == LP_EOF) *newp = NULL;
        }
        //如果ele不为空 要么是替换 要么是 添加 但是之前都是对原先内存的移动并没有赋值
        //所以此处是将新元素的具体数据赋值到内存中
        if (ele) {
            //如果是整型 那么直接copy即可
            if (enctype == LP_ENCODING_INT) {
                memcpy(dst,intenc,enclen);
            } else {
                //否则需要将字符串编码为指定的encoding 然后在复制
                //此方法的操作是根据size编码为上方的对应类型然后memcpy拷贝到dst中
                lpEncodeString(dst,ele,size);
            }
            //将dst偏移指定的长度
            dst += enclen;
            //将backlen也拷贝进dst中
            memcpy(dst,backlen,backlen_size);
            //dst back的长度
            dst += backlen_size;
        }
    
        //如果当前操作类型不是replace 或者 ele == null
        //都是需要对元素个数进行操作的 因为不是replace说明是插入 需要 ++ 如果ele为null 说明是删除需要 --
        if (where != LP_REPLACE || ele == NULL) {
            //获取当前元素个数
            uint32_t num_elements = lpGetNumElements(lp);
            //如果当前长度不是65535 即 short的最大值 无符号 那么就对num进行设置
            if (num_elements != LP_HDR_NUMELE_UNKNOWN) {
                if (ele)
                    //如果存在ele说明是添加则 +1
                    lpSetNumElements(lp,num_elements+1);
                else
                    //如果不存在说明是删除则 --
                    lpSetNumElements(lp,num_elements-1);
            }
        }
    
        //将当前最新的bytes大小设置到lp中
        lpSetTotalBytes(lp,new_listpack_bytes);
        //到此处可以总结 下newp 如果是插入或者替换 那么此值为当前p的偏移首地址 如果是删除则为当前p的下一个元素的地址
        return lp;
    }
    //获取当前listpack的第一个元素
    unsigned char *lpFirst(unsigned char *lp) {
        //在listpack的首地址基础上跳过头部 便是第一个元素的首地址
        lp += LP_HDR_SIZE;
        //如果首地址的值是0xff 那么说明list为空 直接返回null
        if (lp[0] == LP_EOF) return NULL;
        //否则返回首地址
        return lp;
    }
    
    //获取当前listpack的最后一个元素
    unsigned char *lpLast(unsigned char *lp) {
        //根据当前listpack的首地址 加上 当前listpack的总字节数 便超过了当前listpack的内存空间 进行-1 回到EOF 0xff的地址
        unsigned char *p = lp+lpGetTotalBytes(lp)-1;
        //根据最后一个字节向前根据backlen算出前一个字节的长度 根据长度进行 p - backlen 便是最后一个元素的首地址
        return lpPrev(lp,p);
    }
    //根据当前元素的首地址 计算前一个元素的首地址
    unsigned char *lpPrev(unsigned char *lp, unsigned char *p) {
        //如果当前元素的地址 减去 listpack的地址 等于6 说明当前lp为空 则不进行计算
        if (p-lp == LP_HDR_SIZE) return NULL;
        //不为空则跳过当前元素的首地址 到达上一个元素的backlen的位置
        p--;
        //根据backlen向前计算 获取到元素的长度
        uint64_t prevlen = lpDecodeBacklen(p);
        //当前获取到的元素长度是编码的长度+数据长度 还缺少了backlen的长度所以此处根据上方解析出的backlen在计算下backlen的长度
        //从而获取到当前p 到上一个元素长度
        prevlen += lpEncodeBacklen(NULL,prevlen);
        //但是之前为了计算backlen所以p进行了--已经不是原来的那个位置 但是我们计算出的长度是根据原先的位置计算的
        //所以此处需要+1 才是上一个元素的真正位置
        return p-prevlen+1;
    }
    //如果需要获取下一个元素那么必须需要当前的元素
    //lp listpack的指针地址 p 需要查询下个元素的指针地址
    unsigned char *lpNext(unsigned char *lp, unsigned char *p) {
        //此处暂时没有使用到了lp 但是在编译是会提示未使用 所以此处使用此方法欺骗编译器
        ((void) lp);
        //next使用的是skip跳过当前p
        p = lpSkip(p);
        //如果跳过的结果是EOF 结束符 那么返回null 否则返回跳过结果
        if (p[0] == LP_EOF) return NULL;
        return p;
    }
    //跳过当前p 获取到下一个元素的地址
    unsigned char *lpSkip(unsigned char *p) {
        //首先计算当前元素占用的长度
        unsigned long entrylen = lpCurrentEncodedSize(p);
        //根据长度计算出back的长度
        entrylen += lpEncodeBacklen(NULL,entrylen);
        //根据计算出的 backlen的长度 + entrylen的长度便是下一个元素的长度
        //所以此处p加上上方的计算结果便可
        p += entrylen;
        return p;
    }
    //获取当前元素的数据
    // p 为元素的首地址 count 如果元素存储的是整型并且复合之前编码的 那么count为返回值 但是此处返回值优势char* 所以需要一个参数做为结果
    //inbuff如果想获取到当前整型的buff数据 那么此值为返回值 如果需要inbuff 则count语义改成为写入到inbuff的字节个数
    unsigned char *lpGet(unsigned char *p, int64_t *count, unsigned char *intbuf) {
        //如果为整型 此处便是计算 count的临时变量
        int64_t val;
        //uval 根据char计算出的值  negstart存储的值是否为负数根据此值进行判别
        //negmax 如果是负数 需要此值将当前正数计算为负数
        uint64_t uval, negstart, negmax;
        //如果当前是无符号整型 那么直接计算uval即可
        if (LP_ENCODING_IS_7BIT_UINT(p[0])) {
            negstart = UINT64_MAX;
            negmax = 0;
            //当前char & 一个0x7f 便是具体值
            uval = p[0] & 0x7f;
            //如果当前是六位的字符串 那么此处 p +1 就是数据体 因为encoding站1个字节 之前编码有详细说明过
        } else if (LP_ENCODING_IS_6BIT_STR(p[0])) {
            *count = LP_ENCODING_6BIT_STR_LEN(p);
            return p+1;
            //如果当前是13位的有符号整型
        } else if (LP_ENCODING_IS_13BIT_INT(p[0])) {
            //根据&0x1f 获取到高位值 在位移 8位 设置为高位数据 然后 或 | p1 低位值
            uval = ((p[0]&0x1f)<<8) | p[1];
            //他的最大值不能超过的 4096 如果超过那么便是负数
            negstart = (uint64_t)1<<12;
            //计算负数的基数 基数减去 uval -1 便是最终的结果
            negmax = 8191;
            //与13同理
        } else if (LP_ENCODING_IS_16BIT_INT(p[0])) {
            uval = (uint64_t)p[1] |
                   (uint64_t)p[2]<<8;
            negstart = (uint64_t)1<<15;
            negmax = UINT16_MAX;
            //如果13同理
        } else if (LP_ENCODING_IS_24BIT_INT(p[0])) {
            uval = (uint64_t)p[1] |
                   (uint64_t)p[2]<<8 |
                   (uint64_t)p[3]<<16;
            negstart = (uint64_t)1<<23;
            negmax = UINT32_MAX>>8;
            //与13同理
        } else if (LP_ENCODING_IS_32BIT_INT(p[0])) {
            uval = (uint64_t)p[1] |
                   (uint64_t)p[2]<<8 |
                   (uint64_t)p[3]<<16 |
                   (uint64_t)p[4]<<24;
            negstart = (uint64_t)1<<31;
            negmax = UINT32_MAX;
            //与13同理
        } else if (LP_ENCODING_IS_64BIT_INT(p[0])) {
            uval = (uint64_t)p[1] |
                   (uint64_t)p[2]<<8 |
                   (uint64_t)p[3]<<16 |
                   (uint64_t)p[4]<<24 |
                   (uint64_t)p[5]<<32 |
                   (uint64_t)p[6]<<40 |
                   (uint64_t)p[7]<<48 |
                   (uint64_t)p[8]<<56;
            negstart = (uint64_t)1<<63;
            negmax = UINT64_MAX;
            //如果是12位的字符串 那么需要p+2 encoding长度为 2
        } else if (LP_ENCODING_IS_12BIT_STR(p[0])) {
            *count = LP_ENCODING_12BIT_STR_LEN(p);
            return p+2;
            //如果是32位的字符串 那么需要p+5 encoding长度为 5
        } else if (LP_ENCODING_IS_32BIT_STR(p[0])) {
            *count = LP_ENCODING_32BIT_STR_LEN(p);
            return p+5;
        } else {
            //如果都不是那么设置默认值 方便调试
            uval = 12345678900000000ULL + p[0];
            negstart = UINT64_MAX;
            negmax = 0;
        }
    
        //如果当前值超过了限定的值 则说明 到达了负数
        if (uval >= negstart) {
            //之前设置的基数 减去当前值
            uval = negmax-uval;
            val = uval;
            //取反-1 就是当时存储的具体值
            val = -val-1;
        } else {
            //否则将读出的数据存储到val中
            val = uval;
        }
    
        //如果需要字符串的buff那么snpringf存储进去
        if (intbuf) {
            //此时count为写入到inbuff中的字节长度
            *count = snprintf((char*)intbuf,LP_INTBUF_SIZE,"%lld",(long long)val);
            //然后返回intbuff
            return intbuf;
        } else {
            //否则设置count 此为具体value的值
            *count = val;
            return NULL;
        }
    }
    //追加元素 其实就是添加最后一个元素
    //lp listpack 指针 ele 当前元素的指针 size 当前元素的大小
    unsigned char *lpAppend(unsigned char *lp, unsigned char *ele, uint32_t size) {
        //首先获取当前元素的占用字节数
        uint64_t listpack_bytes = lpGetTotalBytes(lp);
        //当前lp首地址加上字节数 减去 1 就是 eof的地址
        unsigned char *eofptr = lp + listpack_bytes - 1;
        //只要在调用insert的时候 指定位before 设置需要插入的元素为eof便为append
        return lpInsert(lp,ele,size,eofptr,LP_BEFORE,NULL);
    }
    //元素的删除 之前在insert中讲解过 lp listpack 指针 p 元素指针 newp如果需要删除元素的下一个指针则不为空
    unsigned char *lpDelete(unsigned char *lp, unsigned char *p, unsigned char **newp) {
        //设置插入值为NULL 类型为REPLACE 便是删除
        return lpInsert(lp,NULL,0,p,LP_REPLACE,newp);
    }
    
    

    相关文章

      网友评论

          本文标题:看!源码Redis之listpack(stream消息存储结构)

          本文链接:https://www.haomeiwen.com/subject/dlanphtx.html