io—多路复用

发布时间 2024-01-03 22:06:22作者: Stitches

https://juejin.cn/post/6892687008552976398?searchId=20230808000034F391CE9EAF068C415D55

https://www.jianshu.com/p/95b50b026895

讲解网络IO的原理 (Linux 2.3 NAPI)

「IO番外」IO多路复用从头讲起_哔哩哔哩_bilibili

Linux下常见网络 IO模型

阻塞式 IO

应用进程发起 recvfrom 系统调用,然后内核等待数据到来,当数据到来时再将数据复制到用户态进程内存中。整个 R1、R2 阶段均被阻塞。

非阻塞 IO

非阻塞IO 主要改变了 R1阶段,在进程发起 recvfrom 系统调用后,立刻返回结果。此时进程可以去干其他事情,但需要轮询检查内核数据是否准备完毕。一直到数据准备完毕,R2阶段会阻塞进行数据拷贝,从内核态拷贝到用户态进程内存空间中。

多路复用 IO

当服务端同时接收多个连接时,怎么高效处理这些连接?当然是有哪个连接就绪就开始处理,其余的则继续监听。Linux 提供了 select/poll/epoll 几种多路复用 API,多路复用与阻塞式IO的区别在于它不会等到所有 socket都准备就绪时才开始处理,而是有一个部分准备好后就开始处理。

在 R1、R2 两个阶段均阻塞,但是多线程环境下 R1 阶段可以只分配 1个线程处理;在监听到有对应事件发生时再分配单独线程去处理。

异步 IO

异步IO 下当用户进程发起异步读 aio_read 系统调用后,不管内核缓冲区的数据是否准备完毕都不会阻塞当前进程,会立刻返回当前进程可以去处理其它事情;同时内核数据在准备完毕时会直接将数据从内核空间拷贝到进程的内存中,然后再以 aio_read 信号通知用户进程,R1、R2 两个阶段都是非阻塞的。


多路复用 IO深究

select

直接上源码,边看代码边分析,下述源码参考 Linux (v3.9-rc8) 版本:

int select(int nfds, 
        fd_set *restrict readfds, 
        fd_set *restrict writefds, 
        fd_set *restrict errorfds, 
        struct timeval *restrict timeout);

函数相关参数说明如下:

  • 监视的文件描述符个数;
  • 每个文件描述符,可以监视的一种或多种状态,包括可读、可写、异常三种;
  • 最长监视的时间;

select 函数入口

SYSCALL_DEFINE5(select, int, n, 
        fd_set __user *, inp, 
        fd_set __user *, outp, 
        fd_set __user *, exp, 
        struct timeval __user *, tvp)
{
    // …
    ret = core_sys_select(n, inp, outp, exp, to);
    ret = poll_select_copy_remaining(&end_time, tvp, 1, ret);
    return ret;
}

// 系统核心 select
int core_sys_select(int n, 
        fd_set __user *inp, 
        fd_set __user *outp, 
        fd_set __user *exp, 
        struct timespec *end_time)
{
    fd_set_bits fds;
    // 重置读/写/异常 事件的 fd_set集合
    if ((ret = get_fd_set(n, inp, fds.in)) ||
        (ret = get_fd_set(n, outp, fds.out)) ||
        (ret = get_fd_set(n, exp, fds.ex)))
        goto out;
    zero_fd_set(n, fds.res_in);
    zero_fd_set(n, fds.res_out);
    zero_fd_set(n, fds.res_ex);

    // 循环监听核心实现
    ret = do_select(n, &fds, end_time);
    // …
}

可以发现 do_select() 函数输入了监听文件描述符个数、各种事件对应的文件描述符集合、监听超时时间。函数内部最外层是一个死循环,内层循环遍历文件描述符集合,对每一个文件描述符,调用 f_op->poll 来返回当前文件描述符的状态掩码。当循环一遍所有文件描述符集合时,检查是否有活跃的文件描述符,如果有就退出循环体返回上层程序;如果没有活跃的文件描述符但是超过了监听时间,也返回给上层应用;如果都不满足,就调用 poll_schedule_timeout(xxx) 函数进入休眠状态,等待事件到来唤醒或者定时器唤醒,接着继续循环监听。

int do_select(int n, fd_set_bits *fds, struct timespec *end_time)
{
    // …
    for (;;) {
        // …
        for (i = 0; i < n; ++rinp, ++routp, ++rexp) {
            // …
            struct fd f;
            f = fdget(i);
            if (f.file) {
                const struct file_operations *f_op;
                f_op = f.file->f_op;
                mask = DEFAULT_POLLMASK;
                if (f_op->poll) {
                    wait_key_set(wait, in, out,
                             bit, busy_flag);
                    // 对每个fd进行I/O事件检测
                    mask = (*f_op->poll)(f.file, wait);
                }
                fdput(f);
                // …
            }
        }
        // 退出循环体
        if (retval || timed_out || signal_pending(current))
            break;
        // 进入休眠
        if (!poll_schedule_timeout(&table, TASK_INTERRUPTIBLE,
                to, slack))
            timed_out = 1;
    }
}

当设备触发了新的 I/O 事件时会主动唤醒主程序进程,那是什么时候将当前进程添加到等待队列中的?接着深入到 (*f_op->poll)(f.file, wait) 函数内部查看,这个函数主要作用包括两点:一是调用 poll_wait() 函数注册相应的回调;二是检测文件设备的当前状态。

这里先介绍一下 Linux系统内核和文件设备之间的耦合设计:Linux不知道每个具体的文件设备是怎么操作的,但是让每个设备拥有一个 struct file_operations 结构体,这个结构体里定义了各种用于操作设备的函数指针,每个设备单独实现具体的操作函数。

struct file {
    struct path     f_path;
    struct inode        *f_inode;   /* cached value */
    const struct file_operations    *f_op;

    // …

} __attribute__((aligned(4)));  /* lest something weird decides that 2 is OK */

struct file_operations {
    struct module *owner;
    loff_t (*llseek) (struct file *, loff_t, int);
    ssize_t (*read) (struct file *, char __user *, size_t, loff_t *);
    ssize_t (*write) (struct file *, const char __user *, size_t, loff_t *);
    ssize_t (*aio_read) (struct kiocb *, const struct iovec *, unsigned long, loff_t);
    ssize_t (*aio_write) (struct kiocb *, const struct iovec *, unsigned long, loff_t);
    ssize_t (*read_iter) (struct kiocb *, struct iov_iter *);
    ssize_t (*write_iter) (struct kiocb *, struct iov_iter *);
    int (*iterate) (struct file *, struct dir_context *);
    // select()轮询设备fd的操作函数
    unsigned int (*poll) (struct file *, struct poll_table_struct *);
    // …    
};

scull 驱动实例

这里列举了 scull 驱动来查看 f_op->poll 里面具体干了什么,
scull 设备不同于硬件设备,它是模拟出来的一块内存,因此对它的读写更快速更自由,内存支持你顺着读倒着读点着读怎么读都可以。 我们以书中“管道”(pipe)式,即FIFO的读写驱动程序为例。

首先是 scull_pipe结构体,里面的 wait_queue_head_t 这个队列类型就是用来记录等待设备 I/O 事件的进程的。

struct scull_pipe {
        wait_queue_head_t inq, outq;       /* read and write queues */
        char *buffer, *end;                /* begin of buf, end of buf */
        int buffersize;                    /* used in pointer arithmetic */
        char *rp, *wp;                     /* where to read, where to write */
        int nreaders, nwriters;            /* number of openings for r/w */
        struct fasync_struct *async_queue; /* asynchronous readers */
        struct mutex mutex;              /* mutual exclusion semaphore */
        struct cdev cdev;                  /* Char device structure */
};

scull 设备的轮询操作函数 scull_p_poll,驱动模块加载后,这个函数就被挂到(*poll) 函数指针上去了。我们可以看到它的确是返回了当前设备的I/O状态,并且调用了内核的 poll_wait() 函数,这里注意,它把自己的 wait_queue_head_t 队列也当作参数传进去了。

static unsigned int scull_p_poll(struct file *filp, poll_table *wait)
{
    struct scull_pipe *dev = filp->private_data;
    unsigned int mask = 0;

    /*
     * The buffer is circular; it is considered full
     * if "wp" is right behind "rp" and empty if the
     * two are equal.
     */
    mutex_lock(&dev->mutex);
    poll_wait(filp, &dev->inq,  wait);   //内核 poll_wait() 函数
    poll_wait(filp, &dev->outq, wait);
    if (dev->rp != dev->wp)
        mask |= POLLIN | POLLRDNORM;    /* readable */
    if (spacefree(dev))
        mask |= POLLOUT | POLLWRNORM;   /* writable */
    mutex_unlock(&dev->mutex);
    return mask;
}

scull 有数据写入时,就会唤醒 wait_queue_head_t 队列中的等待进程,那么等待队列中的进程是什么时候存入的呢,继续看:

static ssize_t scull_p_write(struct file *filp, const char __user *buf, size_t count,
                loff_t *f_pos)
{
    // …
    /* Make sure there's space to write */
    // …
    /* ok, space is there, accept something */
    // …
    /* finally, awake any reader */
    wake_up_interruptible(&dev->inq);  /* blocked in read() and select() */
    // …
}

可以发现 poll_wait() 函数中直接调用了 struct poll_table_struct 结构体中绑定的函数指针,先找到该结构体初始化的位置。

static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
{
    if (p && p->_qproc && wait_address)
        p->_qproc(filp, wait_address, p);   //调用 _qproc() 函数
}

/*
 * Do not touch the structure directly, use the access functions
 * poll_does_not_wait() and poll_requested_events() instead.
 */
typedef struct poll_table_struct {
    poll_queue_proc _qproc;             // poll_wait() 调用
    unsigned long _key;
} poll_table;

/* 
 * structures and helpers for f_op->poll implementations
 */
typedef void (*poll_queue_proc)(struct file *, wait_queue_head_t *, struct poll_table_struct *);

可以发现在 do_select() 函数内部的 poll_initwait() 方法会对 struct poll_table_struct 结构体进行初始化,它绑定的就是 __pollwait 函数。

int do_select(int n, fd_set_bits *fds, struct timespec *end_time)
{
    struct poll_wqueues table;
    poll_table *wait;
    poll_initwait(&table);
    wait = &table.pt;
    // …
}

void poll_initwait(struct poll_wqueues *pwq)
{
    // 初始化poll_table里的函数指针
    init_poll_funcptr(&pwq->pt, __pollwait);
    pwq->polling_task = current;
    pwq->triggered = 0;
    pwq->error = 0;
    pwq->table = NULL;
    pwq->inline_index = 0;
}
EXPORT_SYMBOL(poll_initwait);

static inline void init_poll_funcptr(poll_table *pt, poll_queue_proc qproc)
{
    pt->_qproc = qproc;
    pt->_key   = ~0UL; /* all events enabled */
}

接着查看 __pollwait() 函数的实现,在函数内部它将当前进程放到了设备的等待队列wait_queue_head_t中。

/* Add a new entry */
static void __pollwait(struct file *filp, wait_queue_head_t *wait_address,
                poll_table *p)
{
    struct poll_wqueues *pwq = container_of(p, struct poll_wqueues, pt);
    struct poll_table_entry *entry = poll_get_entry(pwq);
    if (!entry)
        return;
    entry->filp = get_file(filp);
    entry->wait_address = wait_address;
    entry->key = p->_key;
    init_waitqueue_func_entry(&entry->wait, pollwake);
    entry->wait.private = pwq;
    // 把当前进程装到设备的等待队列
    add_wait_queue(wait_address, &entry->wait);
}

void add_wait_queue(wait_queue_head_t *q, wait_queue_t *wait)
{
    unsigned long flags;

    wait->flags &= ~WQ_FLAG_EXCLUSIVE;
    spin_lock_irqsave(&q->lock, flags);
    __add_wait_queue(q, wait);
    spin_unlock_irqrestore(&q->lock, flags);
}
EXPORT_SYMBOL(add_wait_queue);

static inline void __add_wait_queue(wait_queue_head_t *head, wait_queue_t *new)
{
    list_add(&new->task_list, &head->task_list);
}

/**
 * Insert a new element after the given list head. The new element does not
 * need to be initialised as empty list.
 * The list changes from:
 *      head → some element → ...
 * to
 *      head → new element → older element → ...
 *
 * Example:
 * struct foo *newfoo = malloc(...);
 * list_add(&newfoo->entry, &bar->list_of_foos);
 *
 * @param entry The new element to prepend to the list.
 * @param head The existing list.
 */
static inline void
list_add(struct list_head *entry, struct list_head *head)
{
    __list_add(entry, head, head->next);
}

select 流程总结

如上我们分析了 select 的底层实现,它的流程大致为:

  1. 将文件描述符集合 fd_set 从用户空间拷贝到内核空间;
  2. 注册回调函数 __poll_wait,该函数内部将当前进程挂到设备的等待队列中;
  3. 最外层为死循环,内层循环遍历文件描述符集合,对每一个文件描述符调用 poll() 方法(Linux 只定义了 poll函数指针,具体的实现由不同的设备去实现。例如对于 socket,就是 socket_poll,socket_poll 会根据情况调用 tcp_poll/udp_poll);
  4. poll 方法返回一个描述读写操作是否就绪的 mask掩码,并且根据这个掩码给 fd_set 赋值;
  5. 如果遍历完所有的文件描述符都没有产生掩码,也就是没有活跃的事件,就会检查是否超过监听时间,如果未超过时间,就调用 schedule_timeout 让当前 select进程进入睡眠。直到设备驱动发现自身资源可读时,再唤醒等待队列上睡眠的进程。超过定时时间,会继续唤醒 select进程再次循环监听文件描述符集合。

select 方法存在以下缺点:

  1. 每次调用 select 都需要将用户态文件描述符集合拷贝到内核态,在文件描述符很多时开销大;
  2. 每次调用 select 都需要在内核遍历完所有传递进来的文件描述符集合,开销大;
  3. select 仅支持最多 1024个文件描述符。