您当前的位置:首页 > 电脑百科 > 数据库 > Redis

Redis数据淘汰算法

时间:2019-08-13 11:26:33  来源:  作者:

众所周知,redis的所有数据都存储在内存中,但是内存是一种有限的资源,所以为了防止Redis无限制的使用内存,在启动Redis时可以通过配置项 maxmemory 来指定其最大能使用的内存容量。例如可以通过以下配置来设置Redis最大能使用 1G 内存:

maxmemory 1G

当Redis使用的内存超过配置的 maxmemory 时,便会触发数据淘汰策略。Redis提供了多种数据淘汰的策略,如下:

  • volatile-lru: 最近最少使用算法,从设置了过期时间的键中选择空转时间最长的键值对清除掉
  • volatile-lfu: 最近最不经常使用算法,从设置了过期时间的键中选择某段时间之内使用频次最小的键值对清除掉
  • volatile-ttl: 从设置了过期时间的键中选择过期时间最早的键值对清除
  • volatile-random: 从设置了过期时间的键中,随机选择键进行清除
  • allkeys-lru: 最近最少使用算法,从所有的键中选择空转时间最长的键值对清除
  • allkeys-lfu: 最近最不经常使用算法,从所有的键中选择某段时间之内使用频次最少的键值对清除
  • allkeys-random: 所有的键中,随机选择键进行删除
  • noeviction: 不做任何的清理工作,在redis的内存超过限制之后,所有的写入操作都会返回错误;但是读操作都能正常的进行

可以在启动Redis时,通过配置项 maxmemory_policy 来指定要使用的数据淘汰策略。例如要使用 volatile-lru 策略可以通过以下配置来指定:

maxmemory_policy volatile-lru

LRU算法

LRU是 Least Recently Used 的缩写,即最近最少使用,很多缓存系统都使用此算法作为淘汰策略。

最简单的实现方式就是把所有缓存通过一个链表连接起来,新创建的缓存添加到链表的头部,如果有缓存被访问了,就把缓存移动到链表的头部。由于被访问的缓存会移动到链表的头部,所以没有被访问的缓存会随着时间的推移移动的链表的尾部,淘汰数据时只需要从链表的尾部开始即可。下图展示了这个过程:

Redis数据淘汰算法

 

Redis的LRU算法

Redis使用了结构体 robj 来存储缓存对象,而 robj 结构有个名为 lru 的字段,用于记录缓存对象最后被访问的时间,Redis就是以 lru 字段的值作为淘汰依据。robj 结构如下:

typedef struct redisObject {
 ...
 unsigned lru:24;
 ...
} robj;

当缓存对象被访问时,便会更新此字段的值。代码如下:

robj *lookupKey(redisDb *db, robj *key, int flags) {
 dictEntry *de = dictFind(db->dict,key->ptr);
 if (de) {
 robj *val = dictGetVal(de);
 /* Update the access time for the ageing algorithm.
 * Don't do it if we have a saving child, as this will trigger
 * a copy on write madness. */
 if (server.rdb_child_pid == -1 &&
 server.aof_child_pid == -1 &&
 !(flags & LOOKUP_NOTOUCH))
 {
 if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
 updateLFU(val);
 } else {
 val->lru = LRU_CLOCK(); // 更新lru字段的值
 }
 }
 return val;
 } else {
 return NULL;
 }
}

lookupKey() 函数用于查找key对应的缓存对象,所以当缓存对象被访问时便会调用此函数。

Redis数据淘汰

接下来我们分析一下当Redis内存使用超过配置的最大内存使用限制时的处理方式。

Redis在处理每一个命令时都会检查内存的使用是否超过了限制的最大值,处理命令是通过 processCommand() 函数进行的,检查内存使用情况的代码如下:

int processCommand(client *c) {
 ...
 if (server.maxmemory && !server.lua_timedout) {
 int out_of_memory = freeMemoryIfNeededAndSafe() == C_ERR;
 if (server.current_client == NULL) return C_ERR;
 if (out_of_memory &&
 (c->cmd->flags & CMD_DENYOOM ||
 (c->flags & CLIENT_MULTI && c->cmd->proc != execCommand))) {
 flagTransaction(c);
 addReply(c, shared.oomerr);
 return C_OK;
 }
 }
 ...
}

检查内存的使用情况主要通过 freeMemoryIfNeededAndSafe() 函数进行,而 freeMemoryIfNeededAndSafe() 函数最终会调用 freeMemoryIfNeeded() 函数进行处理,由于 freeMemoryIfNeeded() 函数比较庞大,所以我们分段来进行分析:

int freeMemoryIfNeeded(void) {
 ...
 size_t mem_reported, mem_tofree, mem_freed;
 mstime_t latency, eviction_latency;
 long long delta;
 int slaves = listLength(server.slaves);
 ...
 if (getMaxmemoryState(&mem_reported,NULL,&mem_tofree,NULL) == C_OK)
 return C_OK;
 mem_freed = 0;
 if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION)
 goto cant_free;

freeMemoryIfNeeded() 函数首先会调用 getMaxmemoryState() 函数来获取Redis的内存使用情况,如果 getMaxmemoryState() 函数返回 C_OK,表示内存使用总量还没有超出限制,直接返回 C_OK 就可以了。如果 getMaxmemoryState() 函数不是返回 C_OK,表示内存使用总量已经超出限制,需要进行数据淘汰,需要淘汰数据的大小通过 mem_tofree 参数返回。

当然,如果配置的淘汰策略为 noeviction,表示不能进行数据淘汰,所以需要返回 C_ERR 表示有错误。

接着分析剩余的代码片段:

 latencyStartMonitor(latency);
 while (mem_freed < mem_tofree) {
 int j, k, i, keys_freed = 0;
 static unsigned int next_db = 0;
 sds bestkey = NULL;
 int bestdbid;
 redisDb *db;
 dict *dict;
 dictEntry *de;
 if (server.maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) ||
 server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL)
 {
 struct evictionPoolEntry *pool = EvictionPoolLRU;
 while(bestkey == NULL) {
 unsigned long total_keys = 0, keys;
 for (i = 0; i < server.dbnum; i++) {
 db = server.db+i;
 dict = (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) ?
 db->dict : db->expires;
 if ((keys = dictSize(dict)) != 0) {
 evictionPoolPopulate(i, dict, db->dict, pool);
 total_keys += keys;
 }
 }
 if (!total_keys) break; /* No keys to evict. */
 for (k = EVPOOL_SIZE-1; k >= 0; k--) {
 if (pool[k].key == NULL) continue;
 bestdbid = pool[k].dbid;
 if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) {
 de = dictFind(server.db[pool[k].dbid].dict,
 pool[k].key);
 } else {
 de = dictFind(server.db[pool[k].dbid].expires,
 pool[k].key);
 }
 if (pool[k].key != pool[k].cached)
 sdsfree(pool[k].key);
 pool[k].key = NULL;
 pool[k].idle = 0;
 if (de) {
 bestkey = dictGetKey(de);
 break;
 } else {
 /* Ghost... Iterate again. */
 }
 }
 }
 }

如果内存使用总量超出限制,并且配置了淘汰策略,那么就开始数据淘汰过程。在上面的代码中,mem_tofree 变量表示要淘汰的数据总量,而 mem_freed 变量表示已经淘汰的数据总量。所以在 while 循环中的条件是 mem_freed < mem_tofree,表示淘汰的数据总量一定要达到 mem_tofree 为止。

前面介绍过,Redis的淘汰策略有很多中,所以进行数据淘汰时需要根据配置的策略进行。如果配置的淘汰策略是 LRU/LFU/TTL 的话,那么就进入 if 代码块。在 if 代码块里,首先调用 evictionPoolPopulate() 函数选择一些缓存对象样本放置到 EvictionPoolLRU 数组中。evictionPoolPopulate() 函数后面会进行分析,现在只需要知道 evictionPoolPopulate() 函数是选取一些缓存对象样本就可以了。

获取到缓存对象样本后,还需要从样本中获取最合适的缓存对象进行淘汰,因为在选择样本时会把最合适的缓存对象放置在 EvictionPoolLRU 数组的尾部,所以只需要从 EvictionPoolLRU 数组的尾部开始查找一个不为空的缓存对象即可。

 else if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM ||
 server.maxmemory_policy == MAXMEMORY_VOLATILE_RANDOM)
 {
 for (i = 0; i < server.dbnum; i++) {
 j = (++next_db) % server.dbnum;
 db = server.db+j;
 dict = (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) ?
 db->dict : db->expires;
 if (dictSize(dict) != 0) {
 de = dictGetRandomKey(dict);
 bestkey = dictGetKey(de);
 bestdbid = j;
 break;
 }
 }
 }

如果使用随机淘汰策略,那么就进入 else if 代码块,这部分代码的逻辑很简单,如果配置的淘汰策略是 volatile-random,那么就从有过期时间的缓存对象中随机获取,否则就从所有的缓存对象中随机获取。

 if (bestkey) {
 db = server.db+bestdbid;
 robj *keyobj = createStringObject(bestkey,sdslen(bestkey));
 propagateExpire(db,keyobj,server.lazyfree_lazy_eviction);
 delta = (long long) zmalloc_used_memory();
 latencyStartMonitor(eviction_latency);
 // 删除缓存对象
 if (server.lazyfree_lazy_eviction)
 dbAsyncDelete(db,keyobj);
 else
 dbSyncDelete(db,keyobj);
 latencyEndMonitor(eviction_latency);
 latencyAddSampleIfNeeded("eviction-del",eviction_latency);
 latencyRemoveNestedEvent(latency,eviction_latency);
 delta -= (long long) zmalloc_used_memory();
 mem_freed += delta;
 server.stat_evictedkeys++;
 notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted",
 keyobj, db->id);
 decrRefCount(keyobj);
 keys_freed++;
 if (slaves) flushSlavesOutputBuffers();
 if (server.lazyfree_lazy_eviction && !(keys_freed % 16)) {
 if (getMaxmemoryState(NULL,NULL,NULL,NULL) == C_OK) {
 mem_freed = mem_tofree;
 }
 }
 }

如果找到要淘汰的缓存对象,那么就开始释放缓存对象所占用的内存空间。除了需要释放缓存对象占用的内存空间外,还需要进行一些其他的操作,比如把淘汰的缓存对象同步到从服务器和把淘汰的缓存对象追加到 AOF文件 中等。

当条件 mem_freed < mem_tofree 为假时便会退出 while 循环,说明Redis的内存使用总量已经小于最大的内存使用限制,freeMemoryIfNeeded() 函数便会返回 C_OK 表示成功执行。

淘汰数据样本采集

前面说了,当使用非随机淘汰策略时需要进行数据采样(volatile-lru/volatile-lfu/volatile-ttl/allkeys-lru/allkeys-lfu),数据采样通过 evictionPoolPopulate() 函数进行,由于此函数比较庞大,所以对代码分段分析:

void evictionPoolPopulate(int dbid, dict *sampledict, dict *keydict, struct evictionPoolEntry *pool) {
 int j, k, count;
 dictEntry *samples[server.maxmemory_samples];
 count = dictGetSomeKeys(sampledict,samples,server.maxmemory_samples);

evictionPoolPopulate() 函数首先调用 dictGetSomeKeys() 函数从缓存对象集合中获取一些样本,并保存在 samples 数组中。

 for (j = 0; j < count; j++) {
 unsigned long long idle;
 sds key;
 robj *o;
 dictEntry *de;
 de = samples[j];
 key = dictGetKey(de);
 if (server.maxmemory_policy != MAXMEMORY_VOLATILE_TTL) {
 if (sampledict != keydict) de = dictFind(keydict, key);
 o = dictGetVal(de);
 }
 if (server.maxmemory_policy & MAXMEMORY_FLAG_LRU) {
 idle = estimateObjectIdleTime(o);
 } else if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
 idle = 255-LFUDecrAndReturn(o);
 } else if (server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL) {
 idle = ULLONG_MAX - (long)dictGetVal(de);
 } else {
 serverPanic("Unknown eviction policy in evictionPoolPopulate()");
 }

上面的代码主要是获取样本缓存对象的排序权值 idel,如果使用 LRU淘汰算法,那么就调用 estimateObjectIdleTime() 函数获取排序权值,estimateObjectIdleTime() 函数用于获取缓存对象有多长时间没有被访问。排序按照 idle 的值升序排序,就是说 idle 的值越大,就排到越后。

 k = 0;
 while (k < EVPOOL_SIZE &&
 pool[k].key &&
 pool[k].idle < idle) k++;
 if (k == 0 && pool[EVPOOL_SIZE-1].key != NULL) {
 continue;
 } else if (k < EVPOOL_SIZE && pool[k].key == NULL) {
 } else {
 if (pool[EVPOOL_SIZE-1].key == NULL) {
 sds cached = pool[EVPOOL_SIZE-1].cached;
 memmove(pool+k+1,pool+k,
 sizeof(pool[0])*(EVPOOL_SIZE-k-1));
 pool[k].cached = cached;
 } else {
 k--;
 sds cached = pool[0].cached;
 if (pool[0].key != pool[0].cached) sdsfree(pool[0].key);
 memmove(pool,pool+1,sizeof(pool[0])*k);
 pool[k].cached = cached;
 }
 }
 int klen = sdslen(key);
 if (klen > EVPOOL_CACHED_SDS_SIZE) {
 pool[k].key = sdsdup(key);
 } else {
 memcpy(pool[k].cached,key,klen+1);
 sdssetlen(pool[k].cached,klen);
 pool[k].key = pool[k].cached;
 }
 pool[k].idle = idle;
 pool[k].dbid = dbid;
 }
}

上面这段代码的作用是:根据 idle 的值找到当前缓存对象所在 EvictionPoolLRU 数组的位置,然后把缓存对象保存到 EvictionPoolLRU 数组中。以下插图解释了数据采样的过程:

Redis数据淘汰算法

 

所以 EvictionPoolLRU 数组的最后一个元素便是最优的淘汰缓存对象。

从上面的分析可知,淘汰数据时只是从样本中找到最优的淘汰缓存对象,并不是从所有缓存对象集合中查找。由于前面介绍的 LRU算法 需要维护一个LRU链表,而维护一个LRU链表的成本比较大,所以Redis才出此下策。



Tags:Redis 算法   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,如有任何标注错误或版权侵犯请与我们联系(Email:2595517585@qq.com),我们将及时更正、删除,谢谢。
▌相关推荐
众所周知,Redis的所有数据都存储在内存中,但是内存是一种有限的资源,所以为了防止Redis无限制的使用内存,在启动Redis时可以通过配置项 maxmemory 来指定其最大能使用的内存容量...【详细内容】
2019-08-13  Tags: Redis 算法  点击:(187)  评论:(0)  加入收藏
▌简易百科推荐
来源: my.oschina.net/xiaomu0082/blog/2990388首先说下问题现象:内网sandbox环境API持续1周出现应用卡死,所有api无响应现象刚开始当测试抱怨环境响应慢的时候 ,我们重启一下应...【详细内容】
2021-12-08  Java识堂    Tags:Redis   点击:(18)  评论:(0)  加入收藏
我不知道为什么你会选择对特定数量的“错误”(或警告)如此具体。听起来您正在寻找将要发布到 Yahoo! 的某些文章的内容。 Insider (N Foos to Blah for the BlahBlah)。那说:...【详细内容】
2021-12-07  富集云科技有限公司    Tags:Redis   点击:(14)  评论:(0)  加入收藏
目录 一、背景 二、步骤 0.理论支持 1、获取数据 2、结果 3、分析数据并评估大小 三、关于repl-backlog-size 一、背景 repl-backlog-size控制这个环形缓冲区. ​ 主从断...【详细内容】
2021-11-05  弈秋的美好生活    Tags:redis   点击:(41)  评论:(0)  加入收藏
Redis 性能测试是通过同时执行多个命令实现的。1,Redis-benchmarkRedis性能命令:redis性能命令格式: redis-benchmark [option] [option value] redis 性能测试工具可选参数如...【详细内容】
2021-11-02  川石信息    Tags:Redis   点击:(41)  评论:(0)  加入收藏
1 概述数据结构和内部编码 无传统关系型数据库的 Table 模型schema 所对应的db仅以编号区分。同一 db 内,key 作为顶层模型,它的值是扁平化的。即 db 就是key的命名空间。 key...【详细内容】
2021-11-01  JavaEdge    Tags:Redis   点击:(28)  评论:(0)  加入收藏
普通java中使用引用Java redis 驱动,即可连接:import redis.clients.jedis.Jedis; public class RedisTestJava { public static void main(String[] args) { //连...【详细内容】
2021-10-13  faesuite    Tags:Redis   点击:(34)  评论:(0)  加入收藏
Redis常用的数据结构有 string list set zset hashstringstring 是 Redis 的基本的数据类型,一个 key 对应一个 value。string 类型是二进制安全的,Redis的string可以包含任...【详细内容】
2021-10-12  语霖    Tags:Redis   点击:(36)  评论:(0)  加入收藏
列表类型可以存储一组按插入顺序排序的字符串,它非常灵活,支持在两端插入、弹出数据,可以充当栈和队列的角色。> LPUSH fruit apple(integer) 1> RPUSH fruit banana(integer)...【详细内容】
2021-09-17  深夜敲代码    Tags:Redis   点击:(54)  评论:(0)  加入收藏
Redis持久化意义 是做灾难恢复,数据恢复,也可以归类到高可用的一个环节里面去,比如你的redis整个挂了,然后redis就不可用了,你要做的事情是让redis变得可用,尽快变得可用 大量的请...【详细内容】
2021-08-12  小李说IT    Tags:Redis   点击:(77)  评论:(0)  加入收藏
当查询Redis中没有的数据时,该查询会下沉到数据库层,同时数据库层也没有该数据,当这种情况大量出现或被恶意攻击时,接口的访问全部透过Redis访问数据库,而数据库中也没有这些数据...【详细内容】
2021-07-30  随便t    Tags:缓存穿透   点击:(91)  评论:(0)  加入收藏
最新更新
栏目热门
栏目头条