Redis中废弃的VM机制
概述
在翻redis1.3版本的源码研究Redis对象机制时,看到redisObject结构体中存在一个struct redisObjectVM vm
的字段。
是早期版本的redis为了提高实现大于服务器内存存储量的数据库支持(即用户存入的数据量可以大于服务器的内存容量),把内存的值(也就是redisObject
中的ptr
字段编码后的数据)放到磁盘上。实现了一个Virtual Memory模块。
这一功能在2.6版本后就被⚠️废弃了。但其实这一版本的Redis已经有多线程操作了。
这一模块通过redisServer
中的vm_enabled
字段来启用,如果没有启用VM模块,则在分配redisObject的内存区域时,不会分配redisObjectVM(省内存)。
struct redisObjectVM {
off_t page; /* the page at witch the object is stored on disk */
off_t usedpages; /* number of pages used on disk */
time_t atime; /* Last access time */
} vm;
/* The actual Redis Object */
typedef struct redisObject {
void *ptr;
unsigned char type;
unsigned char encoding;
unsigned char storage; /* If this object is a key, where is the value?
* REDIS_VM_MEMORY, REDIS_VM_SWAPPED, ... */
unsigned char vtype; /* If this object is a key, and value is swapped out,
* this is the type of the swapped out object. */
int refcount;
/* VM fields, this are only allocated if VM is active, otherwise the
* object allocation function will just allocate
* sizeof(redisObjct) minus sizeof(redisObjectVM), so using
* Redis without VM active will not have any overhead. */
struct redisObjectVM vm;
} robj;
VM模块的初始化
vm模块初始化方法是vmInit
,在调用initServer
初始化全局变量redisServer
会根据vm_enabled
来决定是否调用该方法。
static void initServer() {
// ...
if (server.vm_enabled) vmInit();
}
这一模块是默认不启用,在默认初始化配置中,vmInit会创建一个/tmp/redis-pid.vm
文件。
这个文件就作为内存数据换入到磁盘的存储位置。
vm模块中通过一个unsigned char *vm_bitmap
位图结构来记录每一页的使用情况,默认每一页大小是256字节。
(如果按照默认大小来创建vm文件,会通过ftruncate
函数来生成一个25G的文件)
然后,创建一些list和线程互斥锁?。
同时初始化一个管道描述符,用来IO线程和主线程进行通信。
最后会向redis的事件循环中添加一个vmThreadedIOCompletedJob
事件。
io任务
#define REDIS_IOJOB_LOAD 0 /* Load from disk to memory */
#define REDIS_IOJOB_PREPARE_SWAP 1 /* Compute needed pages */
#define REDIS_IOJOB_DO_SWAP 2 /* Swap from memory to disk */
typedef struct iojob {
int type; /* Request type, REDIS_IOJOB_* */
redisDb *db;/* Redis database */
robj *key; /* This I/O request is about swapping this key */
robj *val; /* the value to swap for REDIS_IOREQ_*_SWAP, otherwise this
* field is populated by the I/O thread for REDIS_IOREQ_LOAD. */
off_t page; /* Swap page where to read/write the object */
off_t pages; /* Swap pages needed to save object. PREPARE_SWAP return val */
int canceled; /* True if this command was canceled by blocking side of VM */
pthread_t thread; /* ID of the thread processing this entry */
} iojob;
主线程VM处理逻辑
主线程VM的处理方法:vmThreadedIOCompletedJob
。
static void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata,
int mask)
{
char buf[1];
int retval, processed = 0, toprocess = -1, trytoswap = 1;
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
REDIS_NOTUSED(privdata);
/* For every byte we read in the read side of the pipe, there is one
* I/O job completed to process. */
// 不断从管道中读取数据(非阻塞)
while((retval = read(fd,buf,1)) == 1) {
iojob *j;
listNode *ln;
robj *key;
struct dictEntry *de;
redisLog(REDIS_DEBUG,"Processing I/O completed job");
/* Get the processed element (the oldest one) */
// 持有锁操作队列
lockThreadedIO();
assert(listLength(server.io_processed) != 0);
// 这里每次只获取io_processed队列的1%任务进行处理, 最少处理一个任务
if (toprocess == -1) {
toprocess = (listLength(server.io_processed)*REDIS_MAX_COMPLETED_JOBS_PROCESSED)/100;
if (toprocess <= 0) toprocess = 1;
}
// 从io_processed队列删除任务
ln = listFirst(server.io_processed);
j = ln->value;
listDelNode(server.io_processed,ln);
unlockThreadedIO();
/* If this job is marked as canceled, just ignore it */
if (j->canceled) {
freeIOJob(j);
continue;
}
/* Post process it in the main thread, as there are things we
* can do just here to avoid race conditions and/or invasive locks */
redisLog(REDIS_DEBUG,"Job %p type: %d, key at %p (%s) refcount: %d\n", (void*) j, j->type, (void*)j->key, (char*)j->key->ptr, j->key->refcount);
de = dictFind(j->db->dict,j->key);
assert(de != NULL);
key = dictGetEntryKey(de);
if (j->type == REDIS_IOJOB_LOAD) {
redisDb *db;
// 更新db中的key类型为REDIS_VM_MEMORY
// 标记vm位图为free状态, 统计换入次数, 释放iojob对象
/* Key loaded, bring it at home */
key->storage = REDIS_VM_MEMORY;
key->vm.atime = server.unixtime;
vmMarkPagesFree(key->vm.page,key->vm.usedpages);
redisLog(REDIS_DEBUG, "VM: object %s loaded from disk (threaded)",
(unsigned char*) key->ptr);
server.vm_stats_swapped_objects--;
server.vm_stats_swapins++;
dictGetEntryVal(de) = j->val;
incrRefCount(j->val);
db = j->db;
freeIOJob(j);
/* Handle clients waiting for this key to be loaded. */
// 这里是把db中io_keys字典中等待该key的客户端中的io_keys列表中移除该key,
// 如果客户端io_keys为空, 也就是不需要等待其他key被换入磁盘,
// 那么就添加server.io_ready_clients中去去处理.
handleClientsBlockedOnSwappedKey(db,key);
} else if (j->type == REDIS_IOJOB_PREPARE_SWAP) {
/* Now we know the amount of pages required to swap this object.
* Let's find some space for it, and queue this task again
* rebranded as REDIS_IOJOB_DO_SWAP. */
// 如果不可以换出就删掉该任务并标记key为内存状态
// 不能换出有两种情况, 一种就是有子进程在执行, 另一种就是没找到连续的页面
if (!vmCanSwapOut() ||
vmFindContiguousPages(&j->page,j->pages) == REDIS_ERR)
{
/* Ooops... no space or we can't swap as there is
* a fork()ed Redis trying to save stuff on disk. */
freeIOJob(j);
key->storage = REDIS_VM_MEMORY; /* undo operation */
} else {
/* Note that we need to mark this pages as used now,
* if the job will be canceled, we'll mark them as freed
* again. */
// 标记vm位图为已使用
// 把任务类型改成换入磁盘, 并把任务丢入io_newjobs队列
vmMarkPagesUsed(j->page,j->pages);
j->type = REDIS_IOJOB_DO_SWAP;
lockThreadedIO();
queueIOJob(j);
unlockThreadedIO();
}
} else if (j->type == REDIS_IOJOB_DO_SWAP) {
robj *val;
/* Key swapped. We can finally free some memory. */
if (key->storage != REDIS_VM_SWAPPING) {
printf("key->storage: %d\n",key->storage);
printf("key->name: %s\n",(char*)key->ptr);
printf("key->refcount: %d\n",key->refcount);
printf("val: %p\n",(void*)j->val);
printf("val->type: %d\n",j->val->type);
printf("val->ptr: %s\n",(char*)j->val->ptr);
}
// 修改key类型为已换出到磁盘
redisAssert(key->storage == REDIS_VM_SWAPPING);
val = dictGetEntryVal(de);
key->vm.page = j->page;
key->vm.usedpages = j->pages;
key->storage = REDIS_VM_SWAPPED;
key->vtype = j->val->type;
decrRefCount(val); /* Deallocate the object from memory. */
dictGetEntryVal(de) = NULL;
redisLog(REDIS_DEBUG,
"VM: object %s swapped out at %lld (%lld pages) (threaded)",
(unsigned char*) key->ptr,
(unsigned long long) j->page, (unsigned long long) j->pages);
server.vm_stats_swapped_objects++;
server.vm_stats_swapouts++;
freeIOJob(j);
/* Put a few more swap requests in queue if we are still
* out of memory */
// 如果可以换出到磁盘(没子进程)且(第一次执行或者上次执行换出没有错误)且
// 当前内存超过限制(默认1G) 那么就
if (trytoswap && vmCanSwapOut() &&
zmalloc_used_memory() > server.vm_max_memory)
{
int more = 1;
while(more) {
lockThreadedIO();
// 如果当前IO任务数量小于最大线程数, 就会不断调用
// vmSwapOneObjectThreaded从db列表中捞一个key写入磁盘,
// 也就是把key包装为IO任务调用queueIOJob方法入队
more = listLength(server.io_newjobs) <
(unsigned) server.vm_max_threads;
unlockThreadedIO();
/* Don't waste CPU time if swappable objects are rare. */
if (vmSwapOneObjectThreaded() == REDIS_ERR) {
trytoswap = 0;
break;
}
}
}
}
// 执行完成指定次数就退出方法, 等待下一次IO事件触发此方法执行
processed++;
if (processed == toprocess) return;
}
if (retval < 0 && errno != EAGAIN) {
redisLog(REDIS_WARNING,
"WARNING: read(2) error in vmThreadedIOCompletedJob() %s",
strerror(errno));
}
}
IO线程处理逻辑
IO线程的处理方法:IOThreadEntryPoint
。
这方法主要是就一个while(1)
不断循环处理。
static void *IOThreadEntryPoint(void *arg) {
iojob *j;
listNode *ln;
REDIS_NOTUSED(arg);
// detach自己, 当方法退出后释放线程资源
pthread_detach(pthread_self());
while(1) {
/* Get a new job to process */
// 持有锁操作队列
lockThreadedIO();
if (listLength(server.io_newjobs) == 0) {
// 当io_newjobs队列中没有任务时, 关闭退出线程
/* No new jobs in queue, exit. */
redisLog(REDIS_DEBUG,"Thread %ld exiting, nothing to do",
(long) pthread_self());
server.io_active_threads--;
unlockThreadedIO();
return NULL;
}
// 从io_newjobs队列中取任务并放到io_processing队列
ln = listFirst(server.io_newjobs);
j = ln->value;
listDelNode(server.io_newjobs,ln);
/* Add the job in the processing queue */
j->thread = pthread_self();
listAddNodeTail(server.io_processing,j);
ln = listLast(server.io_processing); /* We use ln later to remove it */
// 释放锁
unlockThreadedIO();
redisLog(REDIS_DEBUG,"Thread %ld got a new job (type %d): %p about key '%s'",
(long) pthread_self(), j->type, (void*)j, (char*)j->key->ptr);
/* Process the Job */
if (j->type == REDIS_IOJOB_LOAD) {
// 打开vm文件, fseek定位到第page页后, 使用rdbLoadObject加载数据到内存
j->val = vmReadObjectFromSwap(j->page,j->key->vtype);
} else if (j->type == REDIS_IOJOB_PREPARE_SWAP) {
// 这个操作有点骚
// 打开/dev/null文件, 把val编码成rdb格式写入文件(不调用任何硬件操作)
// 通过ftello获取文件大小来处于每页大小获取写入磁盘需要占用的页面数
FILE *fp = fopen("/dev/null","w+");
j->pages = rdbSavedObjectPages(j->val,fp);
fclose(fp);
} else if (j->type == REDIS_IOJOB_DO_SWAP) {
// 把val编码成rdb格式写入vm文件
if (vmWriteObjectOnSwap(j->val,j->page) == REDIS_ERR)
j->canceled = 1;
}
// 获取线程锁
/* Done: insert the job into the processed queue */
redisLog(REDIS_DEBUG,"Thread %ld completed the job: %p (key %s)",
(long) pthread_self(), (void*)j, (char*)j->key->ptr);
lockThreadedIO();
// 从io_processing队列中移入io_processed队列
listDelNode(server.io_processing,ln);
listAddNodeTail(server.io_processed,j);
// 释放锁
unlockThreadedIO();
// 写入管道来触发主线程IO事件的执行(通知主线程)
/* Signal the main thread there is new stuff to process */
assert(write(server.io_ready_pipe_write,"x",1) == 1);
}
return NULL; /* never reached */
}
queueIOJob方法
这个方法就是把iojob放入io_newjobs队列,并判断当前io线程数是否小于最大线程数,如果小于就创建一个线程。
static void queueIOJob(iojob *j) {
redisLog(REDIS_DEBUG,"Queued IO Job %p type %d about key '%s'\n",
(void*)j, j->type, (char*)j->key->ptr);
listAddNodeTail(server.io_newjobs,j);
if (server.io_active_threads < server.vm_max_threads)
spawnIOThread();
}
static void spawnIOThread(void) {
pthread_t thread;
sigset_t mask, omask;
int err;
sigemptyset(&mask);
sigaddset(&mask,SIGCHLD);
sigaddset(&mask,SIGHUP);
sigaddset(&mask,SIGPIPE);
pthread_sigmask(SIG_SETMASK, &mask, &omask);
while ((err = pthread_create(&thread,&server.io_threads_attr,IOThreadEntryPoint,NULL)) != 0) {
redisLog(REDIS_WARNING,"Unable to spawn an I/O thread: %s",
strerror(err));
usleep(1000000);
}
pthread_sigmask(SIG_SETMASK, &omask, NULL);
server.io_active_threads++;
}
客户端执行命令
static int processCommand(redisClient *c) {
struct redisCommand *cmd;
/* Free some memory if needed (maxmemory setting) */
// 如果达到最大内存, 就从尝试objfreelist释放一个对象或者从expires中随机获取过期的key释放
if (server.maxmemory) freeMemoryIfNeeded();
// ...
/* Now lookup the command and check ASAP about trivial error conditions
* such wrong arity, bad command name and so forth. */
cmd = lookupCommand(c->argv[0]->ptr);
// ...
/* Exec the command */
if (c->flags & REDIS_MULTI && cmd->proc != execCommand && cmd->proc != discardCommand) {
queueMultiCommand(c,cmd);
addReply(c,shared.queued);
} else {
// 如果存在命令设计到的key被交换到磁盘, 那么就会阻塞客户端(从事件循环中删除该事件)
// 等待所有key被加载到内存中.
if (server.vm_enabled && server.vm_max_threads > 0 &&
blockClientOnSwappedKeys(cmd,c)) return 1;
// 否则直接运行命令
call(c,cmd);
}
/* Prepare the client for the next command */
resetClient(c);
return 1;
}
static int blockClientOnSwappedKeys(struct redisCommand *cmd, redisClient *c) {
int j, last;
if (cmd->vm_preload_proc != NULL) {
cmd->vm_preload_proc(c);
} else {
if (cmd->vm_firstkey == 0) return 0;
last = cmd->vm_lastkey;
if (last < 0) last = c->argc+last;
for (j = cmd->vm_firstkey; j <= last; j += cmd->vm_keystep)
// 如果key内存就直接返回
// 如果key正在io队列中, 则尝试撤销任务
// 如果key已换出, 则把创建io任务调用queueIOJob入队,
// 并把key加入客户端的io_keys等待列表, 同时把客户端加入服务端指定key的io_keys列表
waitForSwappedKey(c,c->argv[j]);
}
/* If the client was blocked for at least one key, mark it as blocked. */
if (listLength(c->io_keys)) {
c->flags |= REDIS_IO_WAIT;
aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
server.vm_blocked_clients++;
return 1;
} else {
return 0;
}
}
注意:如果客户端所有需要的key已经全部加载到内存,会添加客户端到server.io_ready_clients
队列中。
唤醒等待的客户端
通过事件循环中的beforSleep方法唤醒
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
// 处理所有等待的客户端!
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}
static void beforeSleep(struct aeEventLoop *eventLoop) {
REDIS_NOTUSED(eventLoop);
// 遍历所有等待的客户端并执行命令
if (server.vm_enabled && listLength(server.io_ready_clients)) {
listIter li;
listNode *ln;
listRewind(server.io_ready_clients,&li);
while((ln = listNext(&li))) {
redisClient *c = ln->value;
struct redisCommand *cmd;
/* Resume the client. */
listDelNode(server.io_ready_clients,ln);
c->flags &= (~REDIS_IO_WAIT);
server.vm_blocked_clients--;
aeCreateFileEvent(server.el, c->fd, AE_READABLE,
readQueryFromClient, c);
cmd = lookupCommand(c->argv[0]->ptr);
assert(cmd != NULL);
// 重新调用客户端命令
call(c,cmd);
resetClient(c);
/* There may be more data to process in the input buffer. */
if (c->querybuf && sdslen(c->querybuf) > 0)
processInputBuffer(c);
}
}
}
定时检查
每100ms检查一次。
static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
// ...
/* Swap a few keys on disk if we are over the memory limit and VM
* is enbled. Try to free objects from the free list first. */
if (vmCanSwapOut()) {
while (server.vm_enabled && zmalloc_used_memory() >
server.vm_max_memory)
{
int retval;
// 尝试释放objfreelist中对象
if (tryFreeOneObjectFromFreelist() == REDIS_OK) continue;
// 如果不允许有IO线程, 则立即把key写入vm文件,
// 否则调用queueIOJob把io任务入队给io线程处理
retval = (server.vm_max_threads == 0) ?
vmSwapOneObjectBlocking() :
vmSwapOneObjectThreaded();
if (retval == REDIS_ERR && !(loops % 300) &&
zmalloc_used_memory() >
(server.vm_max_memory+server.vm_max_memory/10))
{
// 当前内存使用量超过最大内存限制的1/10, 就记录警告日志
redisLog(REDIS_WARNING,"WARNING: vm-max-memory limit exceeded by more than 10%% but unable to swap more objects out!");
}
/* Note that when using threade I/O we free just one object,
* because anyway when the I/O thread in charge to swap this
* object out will finish, the handler of completed jobs
* will try to swap more objects if we are still out of memory. */
if (retval == REDIS_ERR || server.vm_max_threads > 0) break;
}
}
// ...
return 100; // 100ms
}
骚操作
- 通过
/dev/null
来快速计算数据所需的页面大小。