美文网首页
t_string.c

t_string.c

作者: 生命就是个Bug | 来源:发表于2019-04-16 11:40 被阅读0次

    Redis的t_string.c是对于string数据结构的实现。
    底层是基于 sdsdict 实现的。

    首先,了解一下redisDb结构。

    /* Redis database representation. There are multiple databases identified
     * by integers from 0 (the default database) up to the max configured
     * database. The database number is the 'id' field in the structure. */
    typedef struct redisDb {
        //存储普通key
        dict *dict;                 /* The keyspace for this DB */
        //存储设置了过期时间的key
        dict *expires;              /* Timeout of keys with a timeout set */
        //存储BLPOP下的key
        dict *blocking_keys;        /* Keys with clients waiting for data (BLPOP)*/
        //存储PUSH了的key
        dict *ready_keys;           /* Blocked keys that received a PUSH */
        //存储事务监听的key
        dict *watched_keys;         /* WATCHED keys for MULTI/EXEC CAS */
        //数据库id
        int id;                     /* Database ID */
        //所有key的平均ttl
        long long avg_ttl;          /* Average TTL, just for stats */
        //逐一尝试整理碎片的key名称列表
        list *defrag_later;         /* List of key names to attempt to defrag one by one, gradually. */
    } redisDb;
    

    Redis允许的最大字符串长度为512MB。

    //string的最大长度512MB
    static int checkStringLength(client *c, long long size) {
        if (size > 512*1024*1024) {
            addReplyError(c,"string exceeds maximum allowed size (512MB)");
            return C_ERR;
        }
        return C_OK;
    }
    

    1. set类型命令的实现

    void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) {
        long long milliseconds = 0; /* initialized to avoid any harmness warning */
    
        //如果设置了expire过期时间
        if (expire) {
            //类型转换
            if (getLongLongFromObjectOrReply(c, expire, &milliseconds, NULL) != C_OK)
                return;
            //合法性校验
            if (milliseconds <= 0) {
                addReplyErrorFormat(c,"invalid expire time in %s",c->cmd->name);
                return;
            }
            //单位转换
            if (unit == UNIT_SECONDS) milliseconds *= 1000;
        }
    
        //检查flags是否包含NX、XX
        if ((flags & OBJ_SET_NX && lookupKeyWrite(c->db,key) != NULL) ||
            (flags & OBJ_SET_XX && lookupKeyWrite(c->db,key) == NULL))
        {
            addReply(c, abort_reply ? abort_reply : shared.nullbulk);
            return;
        }
        //设置key、value
        setKey(c->db,key,val);
        //脏+1,事务中使用
        server.dirty++;
        //设置过期时间
        if (expire) setExpire(c,c->db,key,mstime()+milliseconds);
        //注册定时事件
        notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id);
        //注册过期事件
        if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC,
            "expire",key,c->db->id);
        //响应客户端
        addReply(c, ok_reply ? ok_reply : shared.ok);
    }
    
    /* SET key value [NX] [XX] [EX <seconds>] [PX <milliseconds>] */
    //SET命令
    void setCommand(client *c) {
        int j;
        robj *expire = NULL;
        int unit = UNIT_SECONDS;
        int flags = OBJ_SET_NO_FLAGS;
    
        //set key value 后面的参数 NX XX EX PX
        for (j = 3; j < c->argc; j++) {
            char *a = c->argv[j]->ptr;
            robj *next = (j == c->argc-1) ? NULL : c->argv[j+1];
    
            if ((a[0] == 'n' || a[0] == 'N') &&
                (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&
                !(flags & OBJ_SET_XX))
            {
                flags |= OBJ_SET_NX;
            } else if ((a[0] == 'x' || a[0] == 'X') &&
                       (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&
                       !(flags & OBJ_SET_NX))
            {
                flags |= OBJ_SET_XX;
            } else if ((a[0] == 'e' || a[0] == 'E') &&
                       (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&
                       !(flags & OBJ_SET_PX) && next)
            {
                flags |= OBJ_SET_EX;
                unit = UNIT_SECONDS;
                expire = next;
                j++;
            } else if ((a[0] == 'p' || a[0] == 'P') &&
                       (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&
                       !(flags & OBJ_SET_EX) && next)
            {
                flags |= OBJ_SET_PX;
                unit = UNIT_MILLISECONDS;
                expire = next;
                j++;
            } else {
                addReply(c,shared.syntaxerr);
                return;
            }
        }
    
        //尝试进行编码
        c->argv[2] = tryObjectEncoding(c->argv[2]);
        setGenericCommand(c,flags,c->argv[1],c->argv[2],expire,unit,NULL,NULL);
    }
    
    //setnx命令
    void setnxCommand(client *c) {
        c->argv[2] = tryObjectEncoding(c->argv[2]);
        setGenericCommand(c,OBJ_SET_NX,c->argv[1],c->argv[2],NULL,0,shared.cone,shared.czero);
    }
    
    //set ex命令
    void setexCommand(client *c) {
        c->argv[3] = tryObjectEncoding(c->argv[3]);
        setGenericCommand(c,OBJ_SET_NO_FLAGS,c->argv[1],c->argv[3],c->argv[2],UNIT_SECONDS,NULL,NULL);
    }
    
    //set px命令  psetex命令
    void psetexCommand(client *c) {
        c->argv[3] = tryObjectEncoding(c->argv[3]);
        setGenericCommand(c,OBJ_SET_NO_FLAGS,c->argv[1],c->argv[3],c->argv[2],UNIT_MILLISECONDS,NULL,NULL);
    }
    

    setKey的实现如下:

    /* High level Set operation. This function can be used in order to set
     * a key, whatever it was existing or not, to a new object.
     *
     * 1) The ref count of the value object is incremented.
     * 2) clients WATCHing for the destination key notified.
     * 3) The expire time of the key is reset (the key is made persistent).
     *
     * All the new keys in the database should be created via this interface. */
    void setKey(redisDb *db, robj *key, robj *val) {
        //查找key
        if (lookupKeyWrite(db,key) == NULL) {
            //没有,就直接添加
            dbAdd(db,key,val);
        } else {
            //存在的话,就覆盖
            dbOverwrite(db,key,val);
        }
        //引用数+1
        incrRefCount(val);
        //移除expire dict
        removeExpire(db,key);
        signalModifiedKey(db,key);
    }
    

    dbAdd的实现如下:

    /* Add the key to the DB. It's up to the caller to increment the reference
     * counter of the value if needed.
     *
     * The program is aborted if the key already exists. */
    void dbAdd(redisDb *db, robj *key, robj *val) {
        //创建对象
        sds copy = sdsdup(key->ptr);
        //add 到 db的dict里
        int retval = dictAdd(db->dict, copy, val);
    
        serverAssertWithInfo(NULL,key,retval == DICT_OK);
    
        //list\zset类型存储
        if (val->type == OBJ_LIST ||
            val->type == OBJ_ZSET)
            signalKeyAsReady(db, key);
        //集群找slot及存储
        if (server.cluster_enabled) slotToKeyAdd(key);
    }
    

    dbOverwrite的实现如下:

    /* Overwrite an existing key with a new value. Incrementing the reference
     * count of the new value is up to the caller.
     * This function does not modify the expire time of the existing key.
     *
     * The program is aborted if the key was not already present. */
    void dbOverwrite(redisDb *db, robj *key, robj *val) {
        //在db的dict中查找key
        dictEntry *de = dictFind(db->dict,key->ptr);
    
        serverAssertWithInfo(NULL,key,de != NULL);
        dictEntry auxentry = *de;
        //获取value
        robj *old = dictGetVal(de);
        //lru
        if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
            val->lru = old->lru;
        }
        //设置新值
        dictSetVal(db->dict, de, val);
        
        //异步回收老对象,逻辑删除
        if (server.lazyfree_lazy_server_del) {
            freeObjAsync(old);
            dictSetVal(db->dict, &auxentry, NULL);
        }
        //调用析构函数
        dictFreeVal(db->dict, &auxentry);
    }
    

    2. get类型命令的实现

    int getGenericCommand(client *c) {
        robj *o;
        //在dict中查找key
        if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL)
            return C_OK;
    
        if (o->type != OBJ_STRING) {
            addReply(c,shared.wrongtypeerr);
            return C_ERR;
        } else {
            addReplyBulk(c,o);
            return C_OK;
        }
    }
    
    //get命令
    void getCommand(client *c) {
        getGenericCommand(c);
    }
    
    //getset命令
    void getsetCommand(client *c) {
        //get
        if (getGenericCommand(c) == C_ERR) return;
        //set
        c->argv[2] = tryObjectEncoding(c->argv[2]);
        setKey(c->db,c->argv[1],c->argv[2]);
        notifyKeyspaceEvent(NOTIFY_STRING,"set",c->argv[1],c->db->id);
        server.dirty++;
    }
    

    3. 字符串覆盖

    //setrange命令
    void setrangeCommand(client *c) {
        robj *o;
        long offset;
        sds value = c->argv[3]->ptr;
    
        if (getLongFromObjectOrReply(c,c->argv[2],&offset,NULL) != C_OK)
            return;
    
        if (offset < 0) {
            addReplyError(c,"offset is out of range");
            return;
        }
    
        //查找key
        o = lookupKeyWrite(c->db,c->argv[1]);
        if (o == NULL) {
            /* Return 0 when setting nothing on a non-existing string */
            if (sdslen(value) == 0) {
                addReply(c,shared.czero);
                return;
            }
    
            //检查是否超过512MB
            if (checkStringLength(c,offset+sdslen(value)) != C_OK)
                return;
    
            o = createObject(OBJ_STRING,sdsnewlen(NULL, offset+sdslen(value)));
            //新增key
            dbAdd(c->db,c->argv[1],o);
        } else {
            size_t olen;
    
            /* Key exists, check type */
            if (checkType(c,o,OBJ_STRING))
                return;
    
            /* Return existing string length when setting nothing */
            olen = stringObjectLen(o);
            if (sdslen(value) == 0) {
                addReplyLongLong(c,olen);
                return;
            }
    
            /* Return when the resulting string exceeds allowed size */
            if (checkStringLength(c,offset+sdslen(value)) != C_OK)
                return;
    
            /* Create a copy when the object is shared or encoded. */
            o = dbUnshareStringValue(c->db,c->argv[1],o);
        }
    
        if (sdslen(value) > 0) {
            //填充\x00
            o->ptr = sdsgrowzero(o->ptr,offset+sdslen(value));
            //赋值
            memcpy((char*)o->ptr+offset,value,sdslen(value));
            signalModifiedKey(c->db,c->argv[1]);
            notifyKeyspaceEvent(NOTIFY_STRING,
                "setrange",c->argv[1],c->db->id);
            server.dirty++;
        }
        addReplyLongLong(c,sdslen(o->ptr));
    }
    
    //getrange命令
    void getrangeCommand(client *c) {
        robj *o;
        long long start, end;
        char *str, llbuf[32];
        size_t strlen;
    
        //start
        if (getLongLongFromObjectOrReply(c,c->argv[2],&start,NULL) != C_OK)
            return;
        //end
        if (getLongLongFromObjectOrReply(c,c->argv[3],&end,NULL) != C_OK)
            return;
        //key
        if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptybulk)) == NULL ||
            checkType(c,o,OBJ_STRING)) return;
    
        //类型转换
        if (o->encoding == OBJ_ENCODING_INT) {
            str = llbuf;
            strlen = ll2string(llbuf,sizeof(llbuf),(long)o->ptr);
        } else {
            str = o->ptr;
            strlen = sdslen(str);
        }
    
        //起止索引校验
        if (start < 0 && end < 0 && start > end) {
            addReply(c,shared.emptybulk);
            return;
        }
        if (start < 0) start = strlen+start;
        if (end < 0) end = strlen+end;
        if (start < 0) start = 0;
        if (end < 0) end = 0;
        if ((unsigned long long)end >= strlen) end = strlen-1;
    
        /* Precondition: end >= 0 && end < strlen, so the only condition where
         * nothing can be returned is: start > end. */
        if (start > end || strlen == 0) {
            addReply(c,shared.emptybulk);
        } else {
            addReplyBulkCBuffer(c,(char*)str+start,end-start+1);
        }
    }
    

    4. 多个key的操作

    //mget命令
    void mgetCommand(client *c) {
        int j;
    
        addReplyMultiBulkLen(c,c->argc-1);
        //for循环获取所有输入的key
        for (j = 1; j < c->argc; j++) {
            //查找dict上的key
            robj *o = lookupKeyRead(c->db,c->argv[j]);
            if (o == NULL) {
                addReply(c,shared.nullbulk);
            } else {
                if (o->type != OBJ_STRING) {
                    addReply(c,shared.nullbulk);
                } else {
                    addReplyBulk(c,o);
                }
            }
        }
    }
    
    //mset命令、msetnx命令
    void msetGenericCommand(client *c, int nx) {
        int j;
    
        if ((c->argc % 2) == 0) {
            addReplyError(c,"wrong number of arguments for MSET");
            return;
        }
    
        /* Handle the NX flag. The MSETNX semantic is to return zero and don't
         * set anything if at least one key alerady exists. */
        if (nx) {
            for (j = 1; j < c->argc; j += 2) {
                if (lookupKeyWrite(c->db,c->argv[j]) != NULL) {
                    addReply(c, shared.czero);
                    return;
                }
            }
        }
    
        for (j = 1; j < c->argc; j += 2) {
            c->argv[j+1] = tryObjectEncoding(c->argv[j+1]);
            setKey(c->db,c->argv[j],c->argv[j+1]);
            notifyKeyspaceEvent(NOTIFY_STRING,"set",c->argv[j],c->db->id);
        }
        server.dirty += (c->argc-1)/2;
        addReply(c, nx ? shared.cone : shared.ok);
    }
    
    void msetCommand(client *c) {
        msetGenericCommand(c,0);
    }
    
    void msetnxCommand(client *c) {
        msetGenericCommand(c,1);
    }
    

    5. 增加减少key的值

    //incr命令、decr命令、incrby命令、decrby命令
    void incrDecrCommand(client *c, long long incr) {
        long long value, oldvalue;
        robj *o, *new;
    
        //查找key
        o = lookupKeyWrite(c->db,c->argv[1]);
        if (o != NULL && checkType(c,o,OBJ_STRING)) return;
        if (getLongLongFromObjectOrReply(c,o,&value,NULL) != C_OK) return;
    
        oldvalue = value;
    
        //校验域
        if ((incr < 0 && oldvalue < 0 && incr < (LLONG_MIN-oldvalue)) ||
            (incr > 0 && oldvalue > 0 && incr > (LLONG_MAX-oldvalue))) {
            addReplyError(c,"increment or decrement would overflow");
            return;
        }
        value += incr;
    
        if (o && o->refcount == 1 && o->encoding == OBJ_ENCODING_INT &&
            (value < 0 || value >= OBJ_SHARED_INTEGERS) &&
            value >= LONG_MIN && value <= LONG_MAX)
        {
            new = o;
            o->ptr = (void*)((long)value);
        } else {
            new = createStringObjectFromLongLongForValue(value);
            if (o) {
                //覆盖原值
                dbOverwrite(c->db,c->argv[1],new);
            } else {
                //新建
                dbAdd(c->db,c->argv[1],new);
            }
        }
        signalModifiedKey(c->db,c->argv[1]);
        notifyKeyspaceEvent(NOTIFY_STRING,"incrby",c->argv[1],c->db->id);
        server.dirty++;
        addReply(c,shared.colon);
        addReply(c,new);
        addReply(c,shared.crlf);
    }
    
    void incrCommand(client *c) {
        incrDecrCommand(c,1);
    }
    
    void decrCommand(client *c) {
        incrDecrCommand(c,-1);
    }
    
    void incrbyCommand(client *c) {
        long long incr;
    
        if (getLongLongFromObjectOrReply(c, c->argv[2], &incr, NULL) != C_OK) return;
        incrDecrCommand(c,incr);
    }
    
    void decrbyCommand(client *c) {
        long long incr;
    
        if (getLongLongFromObjectOrReply(c, c->argv[2], &incr, NULL) != C_OK) return;
        incrDecrCommand(c,-incr);
    }
    
    void incrbyfloatCommand(client *c) {
        long double incr, value;
        robj *o, *new, *aux;
    
        o = lookupKeyWrite(c->db,c->argv[1]);
        if (o != NULL && checkType(c,o,OBJ_STRING)) return;
        if (getLongDoubleFromObjectOrReply(c,o,&value,NULL) != C_OK ||
            getLongDoubleFromObjectOrReply(c,c->argv[2],&incr,NULL) != C_OK)
            return;
    
        value += incr;
        if (isnan(value) || isinf(value)) {
            addReplyError(c,"increment would produce NaN or Infinity");
            return;
        }
        new = createStringObjectFromLongDouble(value,1);
        if (o)
            dbOverwrite(c->db,c->argv[1],new);
        else
            dbAdd(c->db,c->argv[1],new);
        signalModifiedKey(c->db,c->argv[1]);
        notifyKeyspaceEvent(NOTIFY_STRING,"incrbyfloat",c->argv[1],c->db->id);
        server.dirty++;
        addReplyBulk(c,new);
    
        /* Always replicate INCRBYFLOAT as a SET command with the final value
         * in order to make sure that differences in float precision or formatting
         * will not create differences in replicas or after an AOF restart. */
        aux = createStringObject("SET",3);
        rewriteClientCommandArgument(c,0,aux);
        decrRefCount(aux);
        rewriteClientCommandArgument(c,2,new);
    }
    

    6. 追加

    //append命令
    void appendCommand(client *c) {
        size_t totlen;
        robj *o, *append;
    
        //查找key
        o = lookupKeyWrite(c->db,c->argv[1]);
        //不存在
        if (o == NULL) {
            //创建新key
            c->argv[2] = tryObjectEncoding(c->argv[2]);
            //dict存储
            dbAdd(c->db,c->argv[1],c->argv[2]);
            //引用次数+1
            incrRefCount(c->argv[2]);
            totlen = stringObjectLen(c->argv[2]);
        } else {
            /* Key exists, check type */
            if (checkType(c,o,OBJ_STRING))
                return;
    
            /* "append" is an argument, so always an sds */
            append = c->argv[2];
            //append后的长度
            totlen = stringObjectLen(o)+sdslen(append->ptr);
            if (checkStringLength(c,totlen) != C_OK)
                return;
    
            /* Append the value */
            o = dbUnshareStringValue(c->db,c->argv[1],o);
            //合并及赋值
            o->ptr = sdscatlen(o->ptr,append->ptr,sdslen(append->ptr));
            totlen = sdslen(o->ptr);
        }
        signalModifiedKey(c->db,c->argv[1]);
        notifyKeyspaceEvent(NOTIFY_STRING,"append",c->argv[1],c->db->id);
        server.dirty++;
        addReplyLongLong(c,totlen);
    }
    

    7. 字符串长度

    
    //strlen命令
    void strlenCommand(client *c) {
        robj *o;
        if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
            checkType(c,o,OBJ_STRING)) return;
        //计算长度并返回
        addReplyLongLong(c,stringObjectLen(o));
    }
    

    相关文章

      网友评论

          本文标题:t_string.c

          本文链接:https://www.haomeiwen.com/subject/jxspwqtx.html