美文网首页
Reids 字典

Reids 字典

作者: PPchair | 来源:发表于2018-04-08 13:43 被阅读0次

    Reids 字典dict

    1、字典结构

    typedef struct dict {
        dictType *type; //不同的key类型的 val的处理方法
        void *privdata;
        dictht ht[2];
        long rehashidx; /* rehashing not in progress if rehashidx == -1 */
        unsigned long iterators; /* number of iterators currently running */
    } dict;
    typedef struct dictht {
        dictEntry **table; // 数组
        unsigned long size;
        unsigned long sizemask; //计算
        unsigned long used;
    } dictht;
    

    包含两个哈希表dictht ht[2],用于rehash,rehash包含两个:dictExpand扩展和dictResize

    dictExpand:扩展,当达到一定的条件,倍数扩展哈希表。(_dictExpandIfNeeded内实现)

    dictResize:到一定条件缩小(htNeedsResize内实现)。

    底层都是用的rehash过程。rehash过程是一个渐进式过程,分别在get,add,del等操作过程中去分片rehash。分别片大小是1(index)。

    后台还会有定时任务去渐进式rehash,分片大小是100。

    rehashidx:-1表示未rehash,其他值表示正在rehash的idx值,在渐进式rehash过程中,用来记录rehash进度。

    2、哈希算法

    也就是hash key的算法。

    #define  dictHashKey(d, key) (d)->type->hashFunction(key) //计算key的hash值 具体见siphash.c内的代码 比较复杂
    

    hash表的sizemask

    始终等于hash表size-1,见代码dictExpand.

     if (ht->used > size)
            return DICT_ERR;
    
        _dictInit(&n, ht->type, ht->privdata);
        n.size = realsize;
        n.sizemask = realsize-1; //计算sizemask
        n.table = calloc(realsize,sizeof(dictEntry*));
    

    3、rehash

    随着哈希表的装载的k v键值对越来越多,hash表的冲突和查找,写入效率都会大大的打折扣,这是我们需要在适宜的时候扩展整个hash表,也就是ht下的table数组大小,减少冲突,提高效率,当然如果有效的使用率过小,也会进行resize反向操作。一般的rehash扩展过程大小是2的倍数扩展的。

    扩展过程如下:

    rehash_1.png rehash_2.png

    3.1图 rehash前

    3.2reset ht size,初始化ht[1],size是ht[0]的倍数

    rehash_3.png

    3.3rehash过程

    rehash_4.png

    3.4将ht[0]= ht[1],ht[1] =null;完毕

    4、rehash代码

    从add入手。

    //dict.c
    /* Add the key to the DB. It's up to the caller to increment the reference
     * counter of the value if needed.
     *
     * The program is aborted if the key already exists. */
     //添加key value调用
    void dbAdd(redisDb *db, robj *key, robj *val) {
        sds copy = sdsdup(key->ptr); //sds redis字符串
        int retval = dictAdd(db->dict, copy, val); //字典添加
    
        serverAssertWithInfo(NULL,key,retval == DICT_OK);
        if (val->type == OBJ_LIST) signalListAsReady(db, key);
        if (server.cluster_enabled) slotToKeyAdd(key);
     }
    
    //dictAdd
    /* Add an element to the target hash table */
    int dictAdd(dict *d, void *key, void *val)
    {
        dictEntry *entry = dictAddRaw(d,key,NULL); //添加key
    
        if (!entry) return DICT_ERR;
        dictSetVal(d, entry, val);//添加val
        return DICT_OK;
    }
    
    //dictAddRaw 中的如果判断正在rehash则进行单步dictRehash过程。
    static void _dictRehashStep(dict *d) {
        if (d->iterators == 0) dictRehash(d,1);
    }
    
    //dictRehash,n表示是rehash片大小
    int dictRehash(dict *d, int n) {
        int empty_visits = n*10; /* Max number of empty buckets to visit. */
        if (!dictIsRehashing(d)) return 0;
    
        while(n-- && d->ht[0].used != 0) { //rehash 片知道n减少到0
            dictEntry *de, *nextde;
    
            /* 如果 被rehash size小于rehashIdx索引,则不能rehash*/
            assert(d->ht[0].size > (unsigned long)d->rehashidx);
            while(d->ht[0].table[d->rehashidx] == NULL) {
                d->rehashidx++;
                if (--empty_visits == 0) return 1;
            }
            de = d->ht[0].table[d->rehashidx];
            /* Move all the keys in this bucket from the old to the new hash HT */
            //rehash de(idx指定的)
            while(de) {
                unsigned int h;
    
                nextde = de->next;
                /* Get the index in the new hash table */
                h = dictHashKey(d, de->key) & d->ht[1].sizemask;
                de->next = d->ht[1].table[h];
                d->ht[1].table[h] = de;
                d->ht[0].used--;
                d->ht[1].used++;
                de = nextde;
            }
            d->ht[0].table[d->rehashidx] = NULL;
            d->rehashidx++;
        }
    
        /* Check if we already rehashed the whole table... */
        if (d->ht[0].used == 0) { //rehash完毕,ht[0] = ht[1],设置rehashidx = -1;
            zfree(d->ht[0].table);
            d->ht[0] = d->ht[1];
            _dictReset(&d->ht[1]);
            d->rehashidx = -1;
            return 0;
        }
    
        /* More to rehash... */
        return 1;
    }
    
    /* Expand or create the hash table */ 
    /* 这里先申请一个size大小的ht[1],依据并设置rehashidx =0,表示正在rehash*/
    int dictExpand(dict *d, unsigned long size)
    {
        dictht n; /* the new hash table */
        unsigned long realsize = _dictNextPower(size);
    
        /* the size is invalid if it is smaller than the number of
         * elements already inside the hash table */
        if (dictIsRehashing(d) || d->ht[0].used > size)
            return DICT_ERR;
    
        /* Rehashing to the same table size is not useful. */
        if (realsize == d->ht[0].size) return DICT_ERR;
    
        /* Allocate the new hash table and initialize all pointers to NULL */
        n.size = realsize;
        n.sizemask = realsize-1;
        n.table = zcalloc(realsize*sizeof(dictEntry*));
        n.used = 0;
    
        /* Is this the first initialization? If so it's not really a rehashing
         * we just set the first hash table so that it can accept keys. */
        if (d->ht[0].table == NULL) {
            d->ht[0] = n;
            return DICT_OK;
        }
    
        /* Prepare a second hash table for incremental rehashing */
        d->ht[1] = n;
        d->rehashidx = 0;//修改rehashidx
        return DICT_OK;
    }
    
    static int dict_can_resize = 1;
    static unsigned int dict_force_resize_ratio = 5;
    /* Expand the hash table if needed */ 
    /*判断是否需要expand*/
    static int _dictExpandIfNeeded(dict *d)
    {
        /* Incremental rehashing already in progress. Return. */
        if (dictIsRehashing(d)) return DICT_OK;
    
        /* If the hash table is empty expand it to the initial size. */
        /**如果是空的,first time则必须扩展 */
        if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE); //初始大小
    
        /* If we reached the 1:1 ratio, and we are allowed to resize the hash
         * table (global setting) or we should avoid it but the ratio between
         * elements/buckets is over the "safe" threshold, we resize doubling
         * the number of buckets. */
        //达到条件 used>= size 使用的到到申请的大小size 且可以resize 或者
        if (d->ht[0].used >= d->ht[0].size &&
            (dict_can_resize ||
             d->ht[0].used/d->ht[0].size > dict_force_resize_ratio)) //到达比率
        {
            return dictExpand(d, d->ht[0].used*2);
        }
        return DICT_OK;
    }
    //
    * Returns the index of a free slot that can be populated with
     * a hash entry for the given 'key'.
     * If the key already exists, -1 is returned
     * and the optional output parameter may be filled.
     *
     * Note that if we are in the process of rehashing the hash table, the
     * index is always returned in the context of the second (new) hash table. */
    static int _dictKeyIndex(dict *d, const void *key, unsigned int hash, dictEntry **existing)
    {
        unsigned int idx, table;
        dictEntry *he;
        if (existing) *existing = NULL;
    
        /* Expand the hash table if needed */
        if (_dictExpandIfNeeded(d) == DICT_ERR) //错误返回
            return -1;
        for (table = 0; table <= 1; table++) {
            idx = hash & d->ht[table].sizemask; //计算哦
            /* Search if this slot does not already contain the given key */
            he = d->ht[table].table[idx];
            while(he) {
                if (key==he->key || dictCompareKeys(d, key, he->key)) {
                    if (existing) *existing = he;
                    return -1;
                }
                he = he->next;
            }
            if (!dictIsRehashing(d)) break;
        }
        return idx;
    }
    //添加key value
    dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing)
    {
        int index;
        dictEntry *entry;
        dictht *ht;
    
        if (dictIsRehashing(d)) _dictRehashStep(d);// 渐进式rehash
    
        /* Get the index of the new element, or -1 if
         * the element already exists. */
        //计算key在table中的索引值 dictHashKey 就是调用宏指令
        if ((index = _dictKeyIndex(d, key, dictHashKey(d,key), existing)) == -1)
            return NULL;
    
        /* Allocate the memory and store the new entry.
         * Insert the element in top, with the assumption that in a database
         * system it is more likely that recently added entries are accessed
         * more frequently. */
        //判断是否rehash,添加的话必须在ht[1]添加
        ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
        entry = zmalloc(sizeof(*entry));
        entry->next = ht->table[index];
        ht->table[index] = entry;
        ht->used++;
    
        /* Set the hash entry fields. */
        dictSetKey(d, entry, key);
        return entry;
    }
    

    定时rehash

    //dict.c
    long long timeInMilliseconds(void) {
        struct timeval tv;
    
        gettimeofday(&tv,NULL);
        return (((long long)tv.tv_sec)*1000)+(tv.tv_usec/1000);
    }
    
    /* Rehash for an amount of time between ms milliseconds and ms+1 milliseconds */
    //定时 片大小100
    int dictRehashMilliseconds(dict *d, int ms) {
        long long start = timeInMilliseconds();
        int rehashes = 0;
    
        while(dictRehash(d,100)) { //片大小100
            rehashes += 100;
            if (timeInMilliseconds()-start > ms) break; //判断是否超过指定时间
        }
        return rehashes;
    }
    
    //server.c
    //指定db渐进式rehash
    int incrementallyRehash(int dbid) {
        /* Keys dictionary */
        if (dictIsRehashing(server.db[dbid].dict)) {
            dictRehashMilliseconds(server.db[dbid].dict,1);
            return 1; /* already used our millisecond for this loop... */
        }
        /* Expires */
        if (dictIsRehashing(server.db[dbid].expires)) {
            dictRehashMilliseconds(server.db[dbid].expires,1);
            return 1; /* already used our millisecond for this loop... */
        }
        return 0;
    }
    
    
    
    /* This function handles 'background' operations we are required to do
     * incrementally in Redis databases, such as active key expiring, resizing,
     * rehashing. */
    void databasesCron(void) {
         // 省略......
    
            /* Resize */
            for (j = 0; j < dbs_per_call; j++) { //resize
                tryResizeHashTables(resize_db % server.dbnum); // resize
                resize_db++;
            }
    
            /* Rehash */
            if (server.activerehashing) {  //rehash
                for (j = 0; j < dbs_per_call; j++) {
                    int work_done = incrementallyRehash(rehash_db);//渐进式
                    if (work_done) {
                        /* If the function did some work, stop here, we'll do
                         * more at the next cron loop. */
                        break;
                    } else {
                        /* If this db didn't need rehash, we'll try the next one. */
                        rehash_db++;
                        rehash_db %= server.dbnum;
                    }
                }
            }
        }
    }
    
    

    4、resize

    和expand相反,删除扩展的哈希表 减少内存,但最终使用的expand代码

    
    ///* Hash table parameters */ service.h 
    #define HASHTABLE_MIN_FILL        10      /* Minimal hash table fill 10% */
    
    ///dict.c
    /* Resize the table to the minimal size that contains all the elements,
     * but with the invariant of a USED/BUCKETS ratio near to <= 1 */
    int dictResize(dict *d)
    {
        int minimal;
    
        if (!dict_can_resize || dictIsRehashing(d)) return DICT_ERR;
        minimal = d->ht[0].used;
        if (minimal < DICT_HT_INITIAL_SIZE)
            minimal = DICT_HT_INITIAL_SIZE;
        return dictExpand(d, minimal);
    }
    
    //server.c
    int htNeedsResize(dict *dict) {
        long long size, used;
    
        size = dictSlots(dict);
        used = dictSize(dict);
        return (size > DICT_HT_INITIAL_SIZE && //大于初始size
                (used*100/size < HASHTABLE_MIN_FILL)); //使用率小于10%
    }
    /* If the percentage of used slots in the HT reaches HASHTABLE_MIN_FILL
     * we resize the hash table to save memory */
    void tryResizeHashTables(int dbid) {
        if (htNeedsResize(server.db[dbid].dict))
            dictResize(server.db[dbid].dict);
        if (htNeedsResize(server.db[dbid].expires))
            dictResize(server.db[dbid].expires);
    }
    

    总结:整个rehash过程是很简单的,

    1.在add,update,get等方法中会持续的rehash

    2.有定时任务持续rehash

    3.在get等方法中,如果字典在rehash过程中,如果ht[0]找不到,就在ht[1]找;

    4.add会在新的hash表中插入

    相关文章

      网友评论

          本文标题:Reids 字典

          本文链接:https://www.haomeiwen.com/subject/fxwphftx.html