redis dict
redisDb(server.h/redisDb)
/* Redis database representation. There are multiple databases identified
* by integers from 0 (the default database) up to the max configured
* database. The database number is the 'id' field in the structure. */
typedef struct redisDb {
dict *dict; / The keyspace for this DB ,值value存储 space key val space*/
dict *expires; / Timeout of keys with a timeout set,带超时的key space */
dict *blocking_keys; / Keys with clients waiting for data (BLPOP)*/
dict *ready_keys; / Blocked keys that received a PUSH */
dict *watched_keys; / WATCHED keys for MULTI/EXEC CAS */
int id; /* Database ID */
long long avg_ttl; /* Average TTL, just for stats 超时的avg ttl*/
} redisDb;
reids服务可配置多个数据库,默认0-15,16个。
dict(dict.h/dict)
typedef struct dict {
dictType *type; //不同的key类型的 val的处理方法
void *privdata;
dictht ht[2];
long rehashidx; /* rehashing not in progress if rehashidx == -1 */
unsigned long iterators; /* number of iterators currently running */
} dict;
注意每个dict 包含字典dictht,他们用于rehashidx,一般情况下用第一个ht[0]。
dicht(dict.h/dicht)
/* This is our hash table structure. Every dictionary has two of this as we
* implement incremental rehashing, for the old to the new table. */
typedef struct dictht {
dictEntry **table; // 数组
unsigned long size;
unsigned long sizemask;
unsigned long used;
} dictht;
dictEntry是个连续数据空间,size是指数组大小,used是有key val的数量。
dictEntry(dict.h/dictEntry)
typedef struct dictEntry {
void *key;
union { //这是union联合体,不同的val有不同值,比如字符串,指针等,在过期键中,只使用了s64来存储失效时间
void *val;
uint64_t u64;
int64_t s64;
double d;
} v;
struct dictEntry *next; //链表
} dictEntry;
dictEntry是个键值对,key,v,next:相同索引不同key用链表结构。
在redisDb中expire中的dictEnrty key value中value是 union的s64。
redisDb实例
redis_dict.pngredis 过期key删除
存储
redisDb内的expires中存储的就是有有效期的key space。每个key value中value就是存储的过期时间,就是过期的时间戳。redis检查过期key根据的是这个戳和当前系统时间的差值。
reids_expire_save.png删除策略
1.定时
在设置键的过期时间时,创建一个定时事件,当过期时间到达时,由事件处理器自动执行键的删除操作。
优点:保证及时删除
缺点:每个key占用一个事件或线程,CPU密集型。占用大量CPU时间。
2.惰性
get时候检查过期时间。
优点:对CPU友好。
缺点:占用内存。溢出。
3.定期删除
1+2方案。redis采取的策略。
代码:
周期检查(4.版本 最新src版本)
// position db.c propagateExpire
/*
* Propagate expires into slaves and the AOF file.
* When a key expires in the master, a DEL operation for this key is sent
* to all the slaves and the AOF file if enabled.
*
* This way the key expiry is centralized in one place, and since both
* AOF and the master->slave link guarantee operation ordering, everything
* will be consistent even if we allow write operations against expiring
* keys. */
//AOF ,salve
void propagateExpire(redisDb *db, robj *key, int lazy) {
robj *argv[2];
argv[0] = lazy ? shared.unlink : shared.del;
argv[1] = key;
incrRefCount(argv[0]);
incrRefCount(argv[1]);
if (server.aof_state != AOF_OFF)
feedAppendOnlyFile(server.delCommand,db->id,argv,2);
replicationFeedSlaves(server.slaves,db->id,argv,2);
decrRefCount(argv[0]);
decrRefCount(argv[1]);
}
// position expire.c activeExpireCycleTryExpire
/* Helper function for the activeExpireCycle() function.
* This function will try to expire the key that is stored in the hash table
* entry 'de' of the 'expires' hash table of a Redis database.
*
* If the key is found to be expired, it is removed from the database and
* 1 is returned. Otherwise no operation is performed and 0 is returned.
*
* When a key is expired, server.stat_expiredkeys is incremented.
*
* The parameter 'now' is the current time in milliseconds as is passed
* to the function to avoid too many gettimeofday() syscalls. */
//试图删除过期key
int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) {
long long t = dictGetSignedIntegerVal(de);
if (now > t) {
sds key = dictGetKey(de);
robj *keyobj = createStringObject(key,sdslen(key));
propagateExpire(db,keyobj,server.lazyfree_lazy_expire);
if (server.lazyfree_lazy_expire)
dbAsyncDelete(db,keyobj); //异步删除 4.0新加
else
dbSyncDelete(db,keyobj); //同步删除
notifyKeyspaceEvent(NOTIFY_EXPIRED,
"expired",keyobj,db->id);
decrRefCount(keyobj); //减少引用计数,内存引用为0的对象才会被清空内存
server.stat_expiredkeys++;
return 1;
} else {
return 0;
}
}
// position expire.c activeExpireCycle
/* Try to expire a few timed out keys. The algorithm used is adaptive and
* will use few CPU cycles if there are few expiring keys, otherwise
* it will get more aggressive to avoid that too much memory is used by
* keys that can be removed from the keyspace.
*
* No more than CRON_DBS_PER_CALL databases are tested at every
* iteration.
*
* This kind of call is used when Redis detects that timelimit_exit is
* true, so there is more work to do, and we do it more incrementally from
* the beforeSleep() function of the event loop.
*
* Expire cycle type:
*
* If type is ACTIVE_EXPIRE_CYCLE_FAST the function will try to run a
* "fast" expire cycle that takes no longer than EXPIRE_FAST_CYCLE_DURATION
* microseconds, and is not repeated again before the same amount of time.
*
* If type is ACTIVE_EXPIRE_CYCLE_SLOW, that normal expire cycle is
* executed, where the time limit is a percentage of the REDIS_HZ period
* as specified by the ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC define. */
//这是周期检查的入口
//type = ACTIVE_EXPIRE_CYCLE_FAST 限制执行时间不能超过 EXPIRE_FAST_CYCLE_DURATION
//type = ACTIVE_EXPIRE_CYCLE_SLOW 限制执行时间是REDIS_HZ周期的一个比例
void activeExpireCycle(int type) {
/* This function has some global state in order to continue the work
* incrementally across calls. */
static unsigned int current_db = 0; /* Last DB tested. */
static int timelimit_exit = 0; /* Time limit hit in previous call? */
static long long last_fast_cycle = 0; /* When last fast cycle ran. */
//注意上面三个静态类型的使用
int j, iteration = 0;
int dbs_per_call = CRON_DBS_PER_CALL;
long long start = ustime(), timelimit, elapsed;
/* When clients are paused the dataset should be static not just from the
* POV of clients not being able to write, but also from the POV of
* expires and evictions of keys not being performed. */
//判断是否需要暂停,条件:禁止写操作 包括客户端写 和 失效检测,
if (clientsArePaused()) return;
if (type == ACTIVE_EXPIRE_CYCLE_FAST) {
/* Don't start a fast cycle if the previous cycle did not exited
* for time limt. Also don't repeat a fast cycle for the same period
* as the fast cycle total duration itself. */
if (!timelimit_exit) return;
if (start < last_fast_cycle + ACTIVE_EXPIRE_CYCLE_FAST_DURATION*2) return;
last_fast_cycle = start;
//检测是不是上次的ACTIVE_EXPIRE_CYCLE_FAST检测是否查过time limit 或者是否在同样的时间周期内重复
}
/* We usually should test CRON_DBS_PER_CALL per iteration, with
* two exceptions:
*
* 1) Don't test more DBs than we have.
* 2) If last time we hit the time limit, we want to scan all DBs
* in this iteration, as there is work to do in some DB and we don't want
* expired keys to use memory for too much time. */
if (dbs_per_call > server.dbnum || timelimit_exit)
dbs_per_call = server.dbnum;
//不要超过数据库的数量
//如果hit time limit 则要scan所有数据库的过期key
/* We can use at max ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC percentage of CPU time
* per iteration. Since this function gets called with a frequency of
* server.hz times per second, the following is the max amount of
* microseconds we can spend in this function. */
timelimit = 1000000*ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC/server.hz/100;
timelimit_exit = 0;
if (timelimit <= 0) timelimit = 1;
//计算当前的function timeLimit。执行时间
if (type == ACTIVE_EXPIRE_CYCLE_FAST)
timelimit = ACTIVE_EXPIRE_CYCLE_FAST_DURATION; /* in microseconds. */
//循环检查每个db
for (j = 0; j < dbs_per_call && timelimit_exit == 0; j++) {
int expired;
redisDb *db = server.db+(current_db % server.dbnum);
/* Increment the DB now so we are sure if we run out of time
* in the current DB we'll restart from the next. This allows to
* distribute the time evenly across DBs. */
current_db++;
/* Continue to expire if at the end of the cycle more than 25%
* of the keys were expired. */
do {
unsigned long num, slots;
long long now, ttl_sum;
int ttl_samples;
iteration++;
/* If there is nothing to expire try next DB ASAP. */
if ((num = dictSize(db->expires)) == 0) {
db->avg_ttl = 0;
break;
}
//检查过期key数量,如果为0则下一个
slots = dictSlots(db->expires);
now = mstime();
/* When there are less than 1% filled slots getting random
* keys is expensive, so stop here waiting for better times...
* The dictionary will be resized asap. */
if (num && slots > DICT_HT_INITIAL_SIZE &&
(num*100/slots < 1)) break;
//限制比例,如果过期比例太小,name获取随机失效key的成本比较高 就放弃
/* The main collection cycle. Sample random keys among keys
* with an expire set, checking for expired ones. */
expired = 0;
ttl_sum = 0;
ttl_samples = 0;
if (num > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP)
num = ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP;
//对当前db进行循环 随机获取过期key,然后判断是否过期,过期了就删除,同时计算过期ttl值,做db的avg_ttl计算
while (num--) {
dictEntry *de;
long long ttl;
if ((de = dictGetRandomKey(db->expires)) == NULL) break;
ttl = dictGetSignedIntegerVal(de)-now;
if (activeExpireCycleTryExpire(db,de,now)) expired++;
if (ttl > 0) {
/* We want the averageTTL of keys yet not expired. */
ttl_sum += ttl;
ttl_samples++;
}
}
/* Update the average TTL stats for this database. */
if (ttl_samples) {
long long avg_ttl = ttl_sum/ttl_samples;
/* Do a simple running average with a few samples.
* We just use the current estimate with a weight of 2%
* and the previous estimate with a weight of 98%. */
if (db->avg_ttl == 0) db->avg_ttl = avg_ttl;
db->avg_ttl = (db->avg_ttl/50)*49 + (avg_ttl/50);
}
/* We can't block forever here even if there are many keys to
* expire. So after a given amount of milliseconds return to the
* caller waiting for the other active expire cycle. */
//判断是否scan all dbs,如果一周了,切到了时间 就退出。等待下一个周期
if ((iteration & 0xf) == 0) { /* check once every 16 iterations. */
elapsed = ustime()-start;
if (elapsed > timelimit) {
timelimit_exit = 1;
break;
}
}
/* We don't repeat the cycle if there are less than 25% of keys
* found expired in the current DB. */
} while (expired > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP/4);
}
elapsed = ustime()-start;
latencyAddSampleIfNeeded("expire-cycle",elapsed/1000);
}
dbSyncDelete 同步删除
/* Delete a key, value, and associated expiration entry if any, from the DB */
int dbSyncDelete(redisDb *db, robj *key) {
/* Deleting an entry from the expires dict will not free the sds of
* the key, because it is shared with the main dictionary. */
if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
//1.删除过期key
if (dictDelete(db->dict,key->ptr) == DICT_OK) {
if (server.cluster_enabled) slotToKeyDel(key);
return 1;
} else {
return 0;
}
//2.删除值空间
}
//很简单直接调用dictDelte来删除
dbAsyncDelete 异步删除
//position lazyfree.c dbAsyncDelete
/* Delete a key, value, and associated expiration entry if any, from the DB.
* If there are enough allocations to free the value object may be put into
* a lazy free list instead of being freed synchronously. The lazy free list
* will be reclaimed in a different bio.c thread. */
#define LAZYFREE_THRESHOLD 64
int dbAsyncDelete(redisDb *db, robj *key) {
/* Deleting an entry from the expires dict will not free the sds of
* the key, because it is shared with the main dictionary. */
if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
//删除过期key
/* If the value is composed of a few allocations, to free in a lazy way
* is actually just slower... So under a certain limit we just free
* the object synchronously. */
dictEntry *de = dictUnlink(db->dict,key->ptr);
//如果一个val是在非常小的,在一定值下,同步删除,如果非常大,超过配置数量,则异步删除
if (de) {
robj *val = dictGetVal(de);
size_t free_effort = lazyfreeGetFreeEffort(val);
/* If releasing the object is too much work, let's put it into the
* lazy free list. */
if (free_effort > LAZYFREE_THRESHOLD) {
atomicIncr(lazyfree_objects,1);
bioCreateBackgroundJob(BIO_LAZY_FREE,val,NULL,NULL);
dictSetVal(db->dict,de,NULL);
}
}
/* Release the key-val pair, or just the key if we set the val
* field to NULL in order to lazy free it later. */
//如果上面要再异步删除 dictFreeUnlinkedEntry里面就直接返回
if (de) {
dictFreeUnlinkedEntry(db->dict,de);
if (server.cluster_enabled) slotToKeyDel(key);
return 1;
} else {
return 0;
}
}
//position dict dictFreeUnlinkedEntry
/* You need to call this function to really free the entry after a call
* to dictUnlink(). It's safe to call this function with 'he' = NULL. */
void dictFreeUnlinkedEntry(dict *d, dictEntry *he) {
if (he == NULL) return;
dictFreeKey(d, he);
dictFreeVal(d, he);
zfree(he);
}
dictUnlink dictDelete
//position dict.c dictUnlink
dictEntry *dictUnlink(dict *ht, const void *key) {
return dictGenericDelete(ht,key,1);
}
/* Remove an element, returning DICT_OK on success or DICT_ERR if the
* element was not found. */
int dictDelete(dict *ht, const void *key) {
return dictGenericDelete(ht,key,0) ? DICT_OK : DICT_ERR;
}
//两者不相同的地方是返回值,第一个返回被删除的键值对 第二个直接删除 返回是否成功
/* Search and remove an element. This is an helper function for
* dictDelete() and dictUnlink(), please check the top comment
* of those functions. */
static dictEntry *dictGenericDelete(dict *d, const void *key, int nofree) {
unsigned int h, idx;
dictEntry *he, *prevHe;
int table;
if (d->ht[0].used == 0 && d->ht[1].used == 0) return NULL;
if (dictIsRehashing(d)) _dictRehashStep(d);
h = dictHashKey(d, key);
//在table数组中找
for (table = 0; table <= 1; table++) {
idx = h & d->ht[table].sizemask;
he = d->ht[table].table[idx];
prevHe = NULL;
//删除后要线性链表操作
while(he) {
if (key==he->key || dictCompareKeys(d, key, he->key)) {
/* Unlink the element from the list */
if (prevHe)
prevHe->next = he->next;
else
d->ht[table].table[idx] = he->next;
if (!nofree) { //是否删除
dictFreeKey(d, he);
dictFreeVal(d, he);
zfree(he);
}
d->ht[table].used--;
return he;
}
prevHe = he;
he = he->next;
}
if (!dictIsRehashing(d)) break;
}
return NULL; /* not found */
}
get 之类检查 在需要的时候
所有命令在读取或写入数据库之前,程序都会调用 expireIfNeeded
对输入键进行检查, 并将过期键删除.
int expireIfNeeded(redisDb *db, robj *key) {
mstime_t when = getExpire(db,key);
mstime_t now;
if (when < 0) return 0; /* No expire for this key */
/* Don't expire anything while loading. It will be done later. */
if (server.loading) return 0;
/* If we are in the context of a Lua script, we claim that time is
* blocked to when the Lua script started. This way a key can expire
* only the first time it is accessed and not in the middle of the
* script execution, making propagation to slaves / AOF consistent.
* See issue #1525 on Github for more information. */
now = server.lua_caller ? server.lua_time_start : mstime();
/* If we are running in the context of a slave, return ASAP:
* the slave key expiration is controlled by the master that will
* send us synthesized DEL operations for expired keys.
*
* Still we try to return the right information to the caller,
* that is, 0 if we think the key should be still valid, 1 if
* we think the key is expired at this time. */
if (server.masterhost != NULL) return now > when;
/* Return when this key has not expired */
if (now <= when) return 0;
/* Delete the key */
server.stat_expiredkeys++;
propagateExpire(db,key,server.lazyfree_lazy_expire);
notifyKeyspaceEvent(NOTIFY_EXPIRED,
"expired",key,db->id);
return server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) :
dbSyncDelete(db,key);
}
lazyfree
在删除对象时只是进行逻辑删除,然后把对象丢给后台,让后台线程去执行真正的destruct,避免由于对象体积过大而造成阻塞
代码已经讲过。
线程 后台线程 bio/bioProcessBackgroundJobs
void *bioProcessBackgroundJobs(void *arg) {
struct bio_job *job;
unsigned long type = (unsigned long) arg;
sigset_t sigset;
/* Check that the type is within the right interval. */
if (type >= BIO_NUM_OPS) {
serverLog(LL_WARNING,
"Warning: bio thread started with wrong type %lu",type);
return NULL;
}
/* Make the thread killable at any time, so that bioKillThreads()
* can work reliably. */
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
pthread_mutex_lock(&bio_mutex[type]);
/* Block SIGALRM so we are sure that only the main thread will
* receive the watchdog signal. */
sigemptyset(&sigset);
sigaddset(&sigset, SIGALRM);
if (pthread_sigmask(SIG_BLOCK, &sigset, NULL))
serverLog(LL_WARNING,
"Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno));
while(1) {
listNode *ln;
/* The loop always starts with the lock hold. */
if (listLength(bio_jobs[type]) == 0) {
pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]);
continue;
}
/* Pop the job from the queue. */
ln = listFirst(bio_jobs[type]);
job = ln->value;
/* It is now possible to unlock the background system as we know have
* a stand alone job structure to process.*/
pthread_mutex_unlock(&bio_mutex[type]);
/* Process the job accordingly to its type. */
if (type == BIO_CLOSE_FILE) {
close((long)job->arg1);
} else if (type == BIO_AOF_FSYNC) {
aof_fsync((long)job->arg1);
} else if (type == BIO_LAZY_FREE) { //对异步过期的执行
/* What we free changes depending on what arguments are set:
* arg1 -> free the object at pointer.
* arg2 & arg3 -> free two dictionaries (a Redis DB).
* only arg3 -> free the skiplist. */
if (job->arg1)
lazyfreeFreeObjectFromBioThread(job->arg1); //key vale删除判断在这里
else if (job->arg2 && job->arg3)
lazyfreeFreeDatabaseFromBioThread(job->arg2,job->arg3); //数据库删除
else if (job->arg3)
lazyfreeFreeSlotsMapFromBioThread(job->arg3); //暂未定
} else {
serverPanic("Wrong job type in bioProcessBackgroundJobs().");
}
zfree(job);
/* Unblock threads blocked on bioWaitStepOfType() if any. */
pthread_cond_broadcast(&bio_step_cond[type]);
/* Lock again before reiterating the loop, if there are no longer
* jobs to process we'll block again in pthread_cond_wait(). */
pthread_mutex_lock(&bio_mutex[type]);
listDelNode(bio_jobs[type],ln);
bio_pending[type]--;
}
}
总:
键的删除 不仅仅只有在key过期或主动删除时才执行,当内存溢出,key rename等时都会的。
网友评论