Redis中废弃的VM机制

发布时间 2023-05-22 00:51:52作者: Lht1

Redis中废弃的VM机制

概述

在翻redis1.3版本的源码研究Redis对象机制时,看到redisObject结构体中存在一个struct redisObjectVM vm的字段。

是早期版本的redis为了提高实现大于服务器内存存储量的数据库支持(即用户存入的数据量可以大于服务器的内存容量),把内存的值(也就是redisObject中的ptr字段编码后的数据)放到磁盘上。实现了一个Virtual Memory模块。

这一功能在2.6版本后就被⚠️废弃了。但其实这一版本的Redis已经有多线程操作了。

这一模块通过redisServer中的vm_enabled字段来启用,如果没有启用VM模块,则在分配redisObject的内存区域时,不会分配redisObjectVM(省内存)。

struct redisObjectVM {
    off_t page;         /* the page at witch the object is stored on disk */
    off_t usedpages;    /* number of pages used on disk */
    time_t atime;       /* Last access time */
} vm;

/* The actual Redis Object */
typedef struct redisObject {
    void *ptr;
    unsigned char type;
    unsigned char encoding;
    unsigned char storage;  /* If this object is a key, where is the value?
                             * REDIS_VM_MEMORY, REDIS_VM_SWAPPED, ... */
    unsigned char vtype; /* If this object is a key, and value is swapped out,
                          * this is the type of the swapped out object. */
    int refcount;
    /* VM fields, this are only allocated if VM is active, otherwise the
     * object allocation function will just allocate
     * sizeof(redisObjct) minus sizeof(redisObjectVM), so using
     * Redis without VM active will not have any overhead. */
    struct redisObjectVM vm;
} robj;

VM模块的初始化

vm模块初始化方法是vmInit,在调用initServer初始化全局变量redisServer会根据vm_enabled来决定是否调用该方法。

static void initServer() {
    // ...
    if (server.vm_enabled) vmInit();
}

这一模块是默认不启用,在默认初始化配置中,vmInit会创建一个/tmp/redis-pid.vm文件。

这个文件就作为内存数据换入到磁盘的存储位置。

vm模块中通过一个unsigned char *vm_bitmap位图结构来记录每一页的使用情况,默认每一页大小是256字节。

(如果按照默认大小来创建vm文件,会通过ftruncate函数来生成一个25G的文件)

vm相关字段默认值

然后,创建一些list和线程互斥锁?。

同时初始化一个管道描述符,用来IO线程和主线程进行通信。

最后会向redis的事件循环中添加一个vmThreadedIOCompletedJob事件。

io任务

#define REDIS_IOJOB_LOAD 0          /* Load from disk to memory */
#define REDIS_IOJOB_PREPARE_SWAP 1  /* Compute needed pages */
#define REDIS_IOJOB_DO_SWAP 2       /* Swap from memory to disk */
typedef struct iojob {
    int type;   /* Request type, REDIS_IOJOB_* */
    redisDb *db;/* Redis database */
    robj *key;  /* This I/O request is about swapping this key */
    robj *val;  /* the value to swap for REDIS_IOREQ_*_SWAP, otherwise this
                 * field is populated by the I/O thread for REDIS_IOREQ_LOAD. */
    off_t page; /* Swap page where to read/write the object */
    off_t pages; /* Swap pages needed to save object. PREPARE_SWAP return val */
    int canceled; /* True if this command was canceled by blocking side of VM */
    pthread_t thread; /* ID of the thread processing this entry */
} iojob;

主线程VM处理逻辑

主线程VM的处理方法:vmThreadedIOCompletedJob

static void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata,
            int mask)
{
    char buf[1];
    int retval, processed = 0, toprocess = -1, trytoswap = 1;
    REDIS_NOTUSED(el);
    REDIS_NOTUSED(mask);
    REDIS_NOTUSED(privdata);

    /* For every byte we read in the read side of the pipe, there is one
     * I/O job completed to process. */
    // 不断从管道中读取数据(非阻塞)
    while((retval = read(fd,buf,1)) == 1) {
        iojob *j;
        listNode *ln;
        robj *key;
        struct dictEntry *de;

        redisLog(REDIS_DEBUG,"Processing I/O completed job");

        /* Get the processed element (the oldest one) */
      	// 持有锁操作队列
        lockThreadedIO();
        assert(listLength(server.io_processed) != 0);
      	// 这里每次只获取io_processed队列的1%任务进行处理, 最少处理一个任务
        if (toprocess == -1) {
            toprocess = (listLength(server.io_processed)*REDIS_MAX_COMPLETED_JOBS_PROCESSED)/100;
            if (toprocess <= 0) toprocess = 1;
        }
      	// 从io_processed队列删除任务
        ln = listFirst(server.io_processed);
        j = ln->value;
        listDelNode(server.io_processed,ln);
        unlockThreadedIO();
        /* If this job is marked as canceled, just ignore it */
        if (j->canceled) {
            freeIOJob(j);
            continue;
        }
        /* Post process it in the main thread, as there are things we
         * can do just here to avoid race conditions and/or invasive locks */
        redisLog(REDIS_DEBUG,"Job %p type: %d, key at %p (%s) refcount: %d\n", (void*) j, j->type, (void*)j->key, (char*)j->key->ptr, j->key->refcount);
        de = dictFind(j->db->dict,j->key);
        assert(de != NULL);
        key = dictGetEntryKey(de);
        if (j->type == REDIS_IOJOB_LOAD) {
            redisDb *db;
	    // 更新db中的key类型为REDIS_VM_MEMORY
            // 标记vm位图为free状态, 统计换入次数, 释放iojob对象
            /* Key loaded, bring it at home */
            key->storage = REDIS_VM_MEMORY;
            key->vm.atime = server.unixtime;
            vmMarkPagesFree(key->vm.page,key->vm.usedpages);
            redisLog(REDIS_DEBUG, "VM: object %s loaded from disk (threaded)",
                (unsigned char*) key->ptr);
            server.vm_stats_swapped_objects--;
            server.vm_stats_swapins++;
            dictGetEntryVal(de) = j->val;
            incrRefCount(j->val);
            db = j->db;
            freeIOJob(j);
            /* Handle clients waiting for this key to be loaded. */
            // 这里是把db中io_keys字典中等待该key的客户端中的io_keys列表中移除该key,
            // 如果客户端io_keys为空, 也就是不需要等待其他key被换入磁盘,
            // 那么就添加server.io_ready_clients中去去处理.
            handleClientsBlockedOnSwappedKey(db,key);
        } else if (j->type == REDIS_IOJOB_PREPARE_SWAP) {
            /* Now we know the amount of pages required to swap this object.
             * Let's find some space for it, and queue this task again
             * rebranded as REDIS_IOJOB_DO_SWAP. */
            // 如果不可以换出就删掉该任务并标记key为内存状态
            // 不能换出有两种情况, 一种就是有子进程在执行, 另一种就是没找到连续的页面
            if (!vmCanSwapOut() ||
                vmFindContiguousPages(&j->page,j->pages) == REDIS_ERR)
            {
                /* Ooops... no space or we can't swap as there is
                 * a fork()ed Redis trying to save stuff on disk. */
                freeIOJob(j);
                key->storage = REDIS_VM_MEMORY; /* undo operation */
            } else {
                /* Note that we need to mark this pages as used now,
                 * if the job will be canceled, we'll mark them as freed
                 * again. */
              	// 标记vm位图为已使用
              	// 把任务类型改成换入磁盘, 并把任务丢入io_newjobs队列
                vmMarkPagesUsed(j->page,j->pages);
                j->type = REDIS_IOJOB_DO_SWAP;
                lockThreadedIO();
                queueIOJob(j);
                unlockThreadedIO();
            }
        } else if (j->type == REDIS_IOJOB_DO_SWAP) {
            robj *val;

            /* Key swapped. We can finally free some memory. */
            if (key->storage != REDIS_VM_SWAPPING) {
                printf("key->storage: %d\n",key->storage);
                printf("key->name: %s\n",(char*)key->ptr);
                printf("key->refcount: %d\n",key->refcount);
                printf("val: %p\n",(void*)j->val);
                printf("val->type: %d\n",j->val->type);
                printf("val->ptr: %s\n",(char*)j->val->ptr);
            }
            // 修改key类型为已换出到磁盘
            redisAssert(key->storage == REDIS_VM_SWAPPING);
            val = dictGetEntryVal(de);
            key->vm.page = j->page;
            key->vm.usedpages = j->pages;
            key->storage = REDIS_VM_SWAPPED;
            key->vtype = j->val->type;
            decrRefCount(val); /* Deallocate the object from memory. */
            dictGetEntryVal(de) = NULL;
            redisLog(REDIS_DEBUG,
                "VM: object %s swapped out at %lld (%lld pages) (threaded)",
                (unsigned char*) key->ptr,
                (unsigned long long) j->page, (unsigned long long) j->pages);
            server.vm_stats_swapped_objects++;
            server.vm_stats_swapouts++;
            freeIOJob(j);
            /* Put a few more swap requests in queue if we are still
             * out of memory */
            // 如果可以换出到磁盘(没子进程)且(第一次执行或者上次执行换出没有错误)且
            // 当前内存超过限制(默认1G) 那么就
            if (trytoswap && vmCanSwapOut() &&
                zmalloc_used_memory() > server.vm_max_memory)
            {
                int more = 1;
                while(more) {
                    lockThreadedIO();
                    // 如果当前IO任务数量小于最大线程数, 就会不断调用
                    // vmSwapOneObjectThreaded从db列表中捞一个key写入磁盘,
                    // 也就是把key包装为IO任务调用queueIOJob方法入队
                    more = listLength(server.io_newjobs) <
                            (unsigned) server.vm_max_threads;
                    unlockThreadedIO();
                    /* Don't waste CPU time if swappable objects are rare. */
                    if (vmSwapOneObjectThreaded() == REDIS_ERR) {
                        trytoswap = 0;
                        break;
                    }
                }
            }
        }
      	// 执行完成指定次数就退出方法, 等待下一次IO事件触发此方法执行
        processed++;
        if (processed == toprocess) return;
    }
    if (retval < 0 && errno != EAGAIN) {
        redisLog(REDIS_WARNING,
            "WARNING: read(2) error in vmThreadedIOCompletedJob() %s",
            strerror(errno));
    }
}

IO线程处理逻辑

IO线程的处理方法:IOThreadEntryPoint

这方法主要是就一个while(1)不断循环处理。

static void *IOThreadEntryPoint(void *arg) {
    iojob *j;
    listNode *ln;
    REDIS_NOTUSED(arg);

    // detach自己, 当方法退出后释放线程资源
    pthread_detach(pthread_self());
    while(1) {
        /* Get a new job to process */
      	// 持有锁操作队列
        lockThreadedIO();
        if (listLength(server.io_newjobs) == 0) {
            // 当io_newjobs队列中没有任务时, 关闭退出线程
            /* No new jobs in queue, exit. */
            redisLog(REDIS_DEBUG,"Thread %ld exiting, nothing to do",
                (long) pthread_self());
            server.io_active_threads--;
            unlockThreadedIO();
            return NULL;
        }
      	// 从io_newjobs队列中取任务并放到io_processing队列
        ln = listFirst(server.io_newjobs);
        j = ln->value;
        listDelNode(server.io_newjobs,ln);
        /* Add the job in the processing queue */
        j->thread = pthread_self();
        listAddNodeTail(server.io_processing,j);
        ln = listLast(server.io_processing); /* We use ln later to remove it */
      	// 释放锁
        unlockThreadedIO();
        redisLog(REDIS_DEBUG,"Thread %ld got a new job (type %d): %p about key '%s'",
            (long) pthread_self(), j->type, (void*)j, (char*)j->key->ptr);

        /* Process the Job */
        if (j->type == REDIS_IOJOB_LOAD) {
            // 打开vm文件, fseek定位到第page页后, 使用rdbLoadObject加载数据到内存
            j->val = vmReadObjectFromSwap(j->page,j->key->vtype);
        } else if (j->type == REDIS_IOJOB_PREPARE_SWAP) {
          	// 这个操作有点骚
          	// 打开/dev/null文件, 把val编码成rdb格式写入文件(不调用任何硬件操作)
		// 通过ftello获取文件大小来处于每页大小获取写入磁盘需要占用的页面数
          	FILE *fp = fopen("/dev/null","w+");
            j->pages = rdbSavedObjectPages(j->val,fp);
            fclose(fp);
        } else if (j->type == REDIS_IOJOB_DO_SWAP) {
            // 把val编码成rdb格式写入vm文件
            if (vmWriteObjectOnSwap(j->val,j->page) == REDIS_ERR)
                j->canceled = 1;
        }
				
      	// 获取线程锁
        /* Done: insert the job into the processed queue */
        redisLog(REDIS_DEBUG,"Thread %ld completed the job: %p (key %s)",
            (long) pthread_self(), (void*)j, (char*)j->key->ptr);
        lockThreadedIO();
      	// 从io_processing队列中移入io_processed队列
        listDelNode(server.io_processing,ln);
        listAddNodeTail(server.io_processed,j);
        // 释放锁
        unlockThreadedIO();

      	// 写入管道来触发主线程IO事件的执行(通知主线程)
        /* Signal the main thread there is new stuff to process */
        assert(write(server.io_ready_pipe_write,"x",1) == 1);
    }
    return NULL; /* never reached */
}

queueIOJob方法

这个方法就是把iojob放入io_newjobs队列,并判断当前io线程数是否小于最大线程数,如果小于就创建一个线程。

static void queueIOJob(iojob *j) {
    redisLog(REDIS_DEBUG,"Queued IO Job %p type %d about key '%s'\n",
        (void*)j, j->type, (char*)j->key->ptr);
    listAddNodeTail(server.io_newjobs,j);
    if (server.io_active_threads < server.vm_max_threads)
        spawnIOThread();
}

static void spawnIOThread(void) {
    pthread_t thread;
    sigset_t mask, omask;
    int err;

    sigemptyset(&mask);
    sigaddset(&mask,SIGCHLD);
    sigaddset(&mask,SIGHUP);
    sigaddset(&mask,SIGPIPE);
    pthread_sigmask(SIG_SETMASK, &mask, &omask);
    while ((err = pthread_create(&thread,&server.io_threads_attr,IOThreadEntryPoint,NULL)) != 0) {
        redisLog(REDIS_WARNING,"Unable to spawn an I/O thread: %s",
            strerror(err));
        usleep(1000000);
    }
    pthread_sigmask(SIG_SETMASK, &omask, NULL);
    server.io_active_threads++;
}

客户端执行命令

static int processCommand(redisClient *c) {
    struct redisCommand *cmd;

    /* Free some memory if needed (maxmemory setting) */
    // 如果达到最大内存, 就从尝试objfreelist释放一个对象或者从expires中随机获取过期的key释放
    if (server.maxmemory) freeMemoryIfNeeded();

    // ...

    /* Now lookup the command and check ASAP about trivial error conditions
     * such wrong arity, bad command name and so forth. */
    cmd = lookupCommand(c->argv[0]->ptr);
		
    // ...
  
    /* Exec the command */
    if (c->flags & REDIS_MULTI && cmd->proc != execCommand && cmd->proc != discardCommand) {
        queueMultiCommand(c,cmd);
        addReply(c,shared.queued);
    } else {
        // 如果存在命令设计到的key被交换到磁盘, 那么就会阻塞客户端(从事件循环中删除该事件)
      	// 等待所有key被加载到内存中.
      	if (server.vm_enabled && server.vm_max_threads > 0 &&
            blockClientOnSwappedKeys(cmd,c)) return 1;
      	// 否则直接运行命令  
      	call(c,cmd);
    }

    /* Prepare the client for the next command */
    resetClient(c);
    return 1;
}

static int blockClientOnSwappedKeys(struct redisCommand *cmd, redisClient *c) {
    int j, last;

    if (cmd->vm_preload_proc != NULL) {
        cmd->vm_preload_proc(c);
    } else {
        if (cmd->vm_firstkey == 0) return 0;
        last = cmd->vm_lastkey;
        if (last < 0) last = c->argc+last;
        for (j = cmd->vm_firstkey; j <= last; j += cmd->vm_keystep)
            // 如果key内存就直接返回
            // 如果key正在io队列中, 则尝试撤销任务
            // 如果key已换出, 则把创建io任务调用queueIOJob入队, 
            // 并把key加入客户端的io_keys等待列表, 同时把客户端加入服务端指定key的io_keys列表
            waitForSwappedKey(c,c->argv[j]);
    }

    /* If the client was blocked for at least one key, mark it as blocked. */
    if (listLength(c->io_keys)) {
        c->flags |= REDIS_IO_WAIT;
        aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
        server.vm_blocked_clients++;
        return 1;
    } else {
        return 0;
    }
}

注意:如果客户端所有需要的key已经全部加载到内存,会添加客户端到server.io_ready_clients队列中。

唤醒等待的客户端

通过事件循环中的beforSleep方法唤醒

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            // 处理所有等待的客户端!
            eventLoop->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
}

static void beforeSleep(struct aeEventLoop *eventLoop) {
    REDIS_NOTUSED(eventLoop);
		
    // 遍历所有等待的客户端并执行命令
    if (server.vm_enabled && listLength(server.io_ready_clients)) {
        listIter li;
        listNode *ln;

        listRewind(server.io_ready_clients,&li);
        while((ln = listNext(&li))) {
            redisClient *c = ln->value;
            struct redisCommand *cmd;

            /* Resume the client. */
            listDelNode(server.io_ready_clients,ln);
            c->flags &= (~REDIS_IO_WAIT);
            server.vm_blocked_clients--;
            aeCreateFileEvent(server.el, c->fd, AE_READABLE,
                readQueryFromClient, c);
            cmd = lookupCommand(c->argv[0]->ptr);
            assert(cmd != NULL);
            // 重新调用客户端命令
            call(c,cmd);
            resetClient(c);
            /* There may be more data to process in the input buffer. */
            if (c->querybuf && sdslen(c->querybuf) > 0)
                processInputBuffer(c);
        }
    }
}

定时检查

每100ms检查一次。

static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
		// ...
  
    /* Swap a few keys on disk if we are over the memory limit and VM
     * is enbled. Try to free objects from the free list first. */
    if (vmCanSwapOut()) {
        while (server.vm_enabled && zmalloc_used_memory() >
                server.vm_max_memory)
        {
            int retval;
	    // 尝试释放objfreelist中对象
            if (tryFreeOneObjectFromFreelist() == REDIS_OK) continue;
            // 如果不允许有IO线程, 则立即把key写入vm文件,
            // 否则调用queueIOJob把io任务入队给io线程处理
            retval = (server.vm_max_threads == 0) ?
                        vmSwapOneObjectBlocking() :
                        vmSwapOneObjectThreaded();
            if (retval == REDIS_ERR && !(loops % 300) &&
                zmalloc_used_memory() >
                (server.vm_max_memory+server.vm_max_memory/10))
            {
              	// 当前内存使用量超过最大内存限制的1/10, 就记录警告日志
                redisLog(REDIS_WARNING,"WARNING: vm-max-memory limit exceeded by more than 10%% but unable to swap more objects out!");
            }
            /* Note that when using threade I/O we free just one object,
             * because anyway when the I/O thread in charge to swap this
             * object out will finish, the handler of completed jobs
             * will try to swap more objects if we are still out of memory. */
            if (retval == REDIS_ERR || server.vm_max_threads > 0) break;
        }
    }
  
  	// ...

    return 100; // 100ms
}

骚操作

  1. 通过/dev/null来快速计算数据所需的页面大小。

参考文章

Redis 单线程不行了,快来割 VM/ BIO/ IO 多线程的韭菜!(附源码)

Virtual memory (deprecated)

RDB 持久化策略