
Redis 日志机制简介(三):RDB 日志
Redis 日志机制简介(三):RDB 日志
唯有以更扎实的态度,推进工作,继续深究原理,才可以建立起一个更为开放繁荣的数据库内核生态来,才可以真正树立起相互学习,相互促进之精神来,才可以真正团结起各方面的力量来。
Redis RDB 日志的作用,以及基本实践
想要理解 Redis 的作用吗?让我们引述文档 https://redis.io/docs/latest/operate/oss_and_stack/management/persistence/ (Redis persistence, Redis 持久化) 中的有关内容:
持久化的策略便是将内存中的数据同步于磁盘之上,比如固态存储(solid-state disk,SSD)。而 Redis 提供了一组持久化的选项,他们包括:
- RDB (Redis Database) Redis RDB 持久化被用于让你的数据集合实现某个特定时刻的点到点快照恢复
- AOF (Append Only File) AOF 持久化将所有针对于 Redis 服务端的写入数据操作持久化。这些操作可以在服务端启动时展开回放工作,由此实现对于数据集合的重建。这些指令使用与 Redis 通信协议(请参考 https://redis.io/docs/latest/develop/reference/protocol-spec/)相同的格式展开记录。
- 无可持久化 你可以选择完全屏蔽可持久化。这在某些场合被用于缓存的时候。
- RDB + AOF 你同样可以选择 AOF 结合 RDB 的混合持久化策略。
在这个基础上,我们引述 Redis Save 指令的功能(参考文档 https://redis.io/docs/latest/commands/save/):
SAVE 指令将会执行数据集合的同步保存,用 RDB 文件的格式生成 Redis 实例内全部数据集合的同步快照。
结合 save.json
(可以参考文章 https://datapromoto.atomgit.com/explore/journalism/detail/358934497930121216 ,Redis 的指令表实现机制简介,辅助理解)中的内容:
将数据库同步到磁盘上。
结合着对应的实现,我们就可以对 RDB 建立一个基本的理解:
/*
代码来源于 rdb.c
*/
void saveCommand(client *c) {
/*
以 CHILD_TYPE_ 打头的一系列常量,代表了 Redis 服务端的运行状态
CHILD_TYPE_RDB 代表 Redis 已经在使用 RDB 机制在后台展开备份工作
*/
if (server.child_type == CHILD_TYPE_RDB) {
addReplyError(c,"Background save already in progress");
return;
}
/* 更新 Redis 使用 RDB 存储备份的次数 */
server.stat_rdb_saves++;
rdbSaveInfo rsi, *rsiptr;
/* 获取 RDB 日志有关的存储信息 */
rsiptr = rdbPopulateSaveInfo(&rsi);
/* 根据存储函数的反馈结果,选择对应的回复 */
if (rdbSave(SLAVE_REQ_NONE,server.rdb_filename,rsiptr,RDBFLAGS_NONE) == C_OK) {
addReply(c,shared.ok);
} else {
addReplyErrorObject(c,shared.err);
}
}
而在实际之中,当我们执行 SAVE
指令时,客户端将会提示
(客户端反馈的 OK
,即对应代码中的 shared.ok,关于这个对象的具体内涵,请参考 createSharedObjects
中的有关实现)
(服务端的对应反馈,请参考 rdbSave
中的最后部分,这也是我们接下来要着重解读的内容)
深入 Redis RDB 日志的原理
现在,我们已经建立了对于 RDB 日志的基本了解,但是距离我们真正理解其原理,尚且有一段距离,因为我们依旧对于 Redis 管理资源的方式,乃至于整体的架构,尚且不是非常清晰,因此首先,让我们对这两部分内容,建立一个对应的理解:
Redis 组织资源的方式
越是完备的组织,其层级的理念,乃至于资源管理的方法也就越清晰,阅读 Redis RDB,实际上也是经历一种直观地如何资源管理的美妙体验,非常让人惊叹。
Redis Database
Redis 服务端的最高层级中,Redis Database 是最高一级的组织资源的单位,他们使用 id
来做标识,并可以通过 SELECT
指令来做切换(具体请参考 https://redis.io/docs/latest/commands/select/ ,Redis SELECT 文档,里面直接指出,尽管所有的数据库的数据均存储于同一个 RDB/AOF 文件内,但是数据库为其提供了一种命名隔离的方式,即命名空间)
对应的描述结构如下所示:
typedef struct redisDb {
kvstore *keys; /* 为当前 Redis Database 分配的 kv space */
kvstore *expires; /* 已经超时的数据键 */
ebuckets hexpires; /* 存储已经过时的 set 对象,每一则哈西值对应一则 TTL(或者是下一个要过期的最小字段) */
dict *blocking_keys; /* 正在让客户端等待数据中的数据键(BLPOP) */
dict *blocking_keys_unblock_on_nokey; /* 让客户端等待数据中的数据键,当数据键被删除时,他们不应当被阻塞(XREADEDGROUP) */
/* 这个数据部分是 *blocking_keys 的子部 */
dict *ready_keys; /* 接收到 PUSH 的被阻塞的数据键 *
dict *watched_keys; /* 为 MULTI/EXEC CAS 准备的被监视中的数据键 */
int id; /* 数据库的 ID */
long long avg_ttl; /* 为状态观察准备的平均 TTL 数据 */
unsigned long expires_cursor; /* 为活跃中的过期循环准备的游标变量 */
list *defrag_later; /* 计划逐步展开数据清理的数据键名称列表
} redisDb;
同时,为促进理解,我们补充一些对应的知识:
Redis 动态哈希表
哈希表,实际上便是一种简单的 “Key-Value” 结构,其如图所示:
(一般哈希表的形式便是如此所示)
而为了处理“元素碰撞”(即因为哈希函数乃至于哈希表容量,使得多个数据键对应到同一个数据存储位置)的问题,一般我们会设立“哈系桶”,如图所示:
(在这种情况下面,会把单个的存储位置转变为链表,并存储对应的数据键,这样即使发生了碰撞的情况,通过直接对键进行比较,也可以提取出正确的数据来,这是一种用空间换来的胜利)
至于动态哈希表的思想,它来源于文章《Dynamic Hash Tables》,为了解决哈希表容量扩张的问题而设计出来,因为在哈希表存储对象数目极大的时候,拓展哈希表所需要做的重分布工作(一般而言,我们计算一个数据键所需要存储到的数据存储区域的时候,会结合着表的容量来进行,而重分布时,因为数据表容量改变,因此每一个元素都需要做调整,这就使得整体的成本十分高昂)的代价很高,因此动态哈希表做了细化,将哈希表转变为如下的结构:
这样,当我们存储一个元素的时候,首先就先判断其位于哪一个“区域”,再将其输送到对应的哈希表里面(哈希表和之前我们所讨论的结构是一样的)。
区别在于,在做重分布的时候,我们只需要自区域着手,再针对内部的数据表调整就行(一般而言,添加一个区域即可)。这样就可以避免过度地对整个大表做调整(引述我在《浅谈 PostgreSQL 动态哈希表》中的比喻:“这就像转移大量文件时,我们往往倾向于先分类打包为不同的压缩包以后,再做处理文件的转移工作一样”)。
而对应到具体的实现,让我们参考下面的几个结构:
/* 描述了一个链表,对应动态哈希表中最底层的部分 */
struct dictEntry {
/* 存储数据键 */
void *key;
/* 用于存储数据,共同体的结构可以方便地使用各种对应的数据结构展开解析工作 */
union {
void *val;
uint64_t u64;
int64_t s64;
double d;
} v;
struct dictEntry *next; /* Next entry in the same hash bucket. */
};
/* 描述了一张完整的动态哈希表 */
struct dict {
/* 里面定义了一组自定义函数,用于自定义化哈希键的存储等 */
dictType *type;
/*
这里是问题的核心
设立的两张表,一张为存储数据而设计,另外一张针对扩容时的备份
*/
dictEntry **ht_table[2];
unsigned long ht_used[2];
/* 是否正在做重分布? -1 代表没有 */
long rehashidx;
/* 重分布是否被阻塞? 0 代表有 */
unsigned pauserehash : 15; /* If >0 rehashing is paused */
unsigned useStoredKeyApi : 1;
/* 存储数据表的容量,即 (size = 1<<exp) */
signed char ht_size_exp[2];
/* 是否运行做二次容量调整? */
int16_t pauseAutoResize;
void *metadata[];
};
对应的几个常用 API 则如下所示:
/* 设置数据键 */
void dictSetKey(dict *d, dictEntry* de, void *key);
/* 设置数据值 */
void dictSetVal(dict *d, dictEntry *de, void *val);
/* 查找对应的数据对 */
dictEntry * dictFind(dict *d, const void *key);
/* 查找对应的数据 */
void *dictFetchValue(dict *d, const void *key);
/* 对数据做替代工作 */
int dictReplace(dict *d, void *key, void *val);
/* 删除数据对 */
int dictDelete(dict *d, const void *key);
/* 其它请读者主动参考 src/dict.c */
Redis redisObject 简介
Redis redisObject 用于以一种统一协调的方式,描述存储的键值对,如下所示:
struct redisObject {
/* 资源对象的所属类型,在 RDB 文件中,均为 OBJ_MODULE */
unsigned type:4;
/* 资源的编码方式,是字符串?还是整数? */
unsigned encoding:4;
unsigned lru:LRU_BITS;
/*
使用何种 LRU 算法?
LRU time (参考全局 lru_clock)
LFU data (最低有效的8位频率,或者最具有标志性的 16 字节访问时间
*/
int refcount; /* 资源的引用计数 */
void *ptr; /* 存储的数据,至于数据应当以什么样的方式应对,当参考前面的描述部分了解 */
};
Redis 组织资源的方式:小结
就这样,我们对于 Redis 组织资源的办法建立了基本的了解,无论是最基层的对象,乃至于最顶层的结构都是,可以发现,他们实际上就是一个按照 Database ID 划分到多个小的“动态哈希表”中的 kv 存储结构,而资源的具体管理办法,就是具体情况具体分析,像指令一样,结合着具体的场景分析即可。
RDB 日志具体实践过程:以 rdbSave 函数为入口
现在,我们再次将目光聚焦于 rdbSave 函数之上,它是 Redis 保存资源的具体实践过程,其代码参考如下(出于分析的缘由,我仅保留关键部分的代码):
int rdbSave(int req, char *filename, rdbSaveInfo *rsi, int rdbflags) {
char tmpfile[256];
char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */
/* 更新事件池,关于事件池,这是我们后面将会着重分析的内容 */
startSaving(rdbflags);
/* RDB 文件临时名称 */
snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
/* 核心处理代码被封装为这个函数 */
if (rdbSaveInternal(req,tmpfile,rsi,rdbflags) != C_OK) {
stopSaving(0);
return C_ERR;
}
/* 将完成后的临时文件更名为正式的文件,并同步数据 */
if (rename(tmpfile,filename) == -1) {
// ...
return C_ERR;
}
if (fsyncFileDir(filename) != 0) {
// ...
return C_ERR;
}
/* 向用户反馈... */
serverLog(LL_NOTICE,"DB saved on disk");
// ...
return C_OK;
}
真正核心的代码逻辑:自动态哈希表中提取数据,调用具体数据处理函数,写入数据到文件
static int rdbSaveInternal(int req, const char *filename, rdbSaveInfo *rsi, int rdbflags) {
char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */
rio rdb;
int error = 0;
int saved_errno;
char *err_op; /* For a detailed log */
FILE *fp = fopen(filename,"w");
if (!fp) {
// ...
return C_ERR;
}
/* 初始化文件 */
rioInitWithFile(&rdb,fp);
if (server.rdb_save_incremental_fsync) {
rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES);
if (!(rdbflags & RDBFLAGS_KEEP_CACHE)) rioSetReclaimCache(&rdb,1);
}
/* 核心代码 */
if (rdbSaveRio(req,&rdb,&error,rdbflags,rsi) == C_ERR) {
errno = error;
err_op = "rdbSaveRio";
goto werr;
}
/* 同步内存数据到文件 */
if (fflush(fp)) { err_op = "fflush"; goto werr; }
if (fsync(fileno(fp))) { err_op = "fsync"; goto werr; }
if (!(rdbflags & RDBFLAGS_KEEP_CACHE) && reclaimFilePageCache(fileno(fp), 0, 0) == -1) {
serverLog(LL_NOTICE,"Unable to reclaim cache after saving RDB: %s", strerror(errno));
}
if (fclose(fp)) { fp = NULL; err_op = "fclose"; goto werr; }
return C_OK;
werr:
saved_errno = errno;
serverLog(LL_WARNING,"Write error while saving DB to the disk(%s): %s", err_op, strerror(errno));
if (fp) fclose(fp);
unlink(filename);
errno = saved_errno;
return C_ERR;
}
int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) {
// ...
/* 遍历所有的 Redis Database */
if (!(req & SLAVE_REQ_RDB_EXCLUDE_DATA)) {
for (j = 0; j < server.dbnum; j++) {
if (rdbSaveDb(rdb, j, rdbflags, &key_counter) == -1) goto werr;
}
}
// ...
}
/* 这个函数同时被应用于处理 AOF 日志,因此非常复杂 */
ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, long *key_counter) {
dictEntry *de;
ssize_t written = 0;
ssize_t res;
kvstoreIterator *kvs_it = NULL;
static long long info_updated_time = 0;
char *pname = (rdbflags & RDBFLAGS_AOF_PREAMBLE) ? "AOF rewrite" : "RDB";
redisDb *db = server.db + dbid;
unsigned long long int db_size = kvstoreSize(db->keys);
if (db_size == 0) return 0;
// ...
kvs_it = kvstoreIteratorInit(db->keys);
int last_slot = -1;
/* 遍历动态哈希表 */
while ((de = kvstoreIteratorNext(kvs_it)) != NULL) {
// ...
/* 提取资源 */
sds keystr = dictGetKey(de);
robj key, *o = dictGetVal(de);
long long expire;
size_t rdb_bytes_before_key = rdb->processed_bytes;
initStaticStringObject(key,keystr);
/* 是否过期? */
expire = getExpire(db,&key);
/* 保存资源 */
if ((res = rdbSaveKeyValuePair(rdb, &key, o, expire, dbid)) < 0) goto werr;
written += res;
// ...
}
kvstoreIteratorRelease(kvs_it);
return written;
werr:
if (kvs_it) kvstoreIteratorRelease(kvs_it);
return -1;
}
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime, int dbid) {
// 更新 LRU 信息
/* 保存键值与类型,其中根据资源类型的不同,有各自的处理逻辑 */
if (rdbSaveObjectType(rdb,val) == -1) return -1;
if (rdbSaveStringObject(rdb,key) == -1) return -1;
if (rdbSaveObject(rdb,val,key,dbid) == -1) return -1;
// ...
return 1;
}
基本上大致逻辑便是如此,不过功夫全在于细节之处,欢迎读者继续挖掘,哈哈。
写在最后
感谢所有曾经指导过我的人,感谢@红发哥,袁国铭老师,魏波老师,王其达老师,任娇老师,牛世继老师,张凯老师,吴洋老师,@Makio,@可可,@轩轩,@大炮,@于雨,我们将会继续前进,为着建设一个更为繁荣的数据库内核生态而前进。