select_poll_epoll导论

发布时间 2024-01-05 22:54:57作者: Stitches

https://www.cnblogs.com/Anker/p/3258674.html

https://zhuanlan.zhihu.com/p/150972878

重点解读epoll:https://zhuanlan.zhihu.com/p/546228628

https://subingwen.cn/linux/select

epoll

epoll LT/ET 深度剖析:https://zhuanlan.zhihu.com/p/21374980
epoll LT/ET模式详解:https://cloud.tencent.com/developer/article/1636224
如何理解Epoll中的LT和ET模式,底层实现又是怎么样的?:https://www.zhihu.com/question/403893498

Select

1. IO多路转接(复用)

IO 多路转接也称为 IO 多路复用,它是一种网络通信的手段(机制),通过这种方式可以同时监测多个文件描述符并且这个过程是阻塞的,一旦检测到有文件描述符就绪( 可以读数据或者可以写数据)程序的阻塞就会被解除,之后就可以基于这些(一个或多个)就绪的文件描述符进行通信了。通过这种方式在单线程 / 进程的场景下也可以在服务器端实现并发。常见的 IO 多路转接方式有:select、poll、epoll。

下面先对多线程 / 多进程并发和 IO 多路转接的并发处理流程进行对比(服务器端):

  • 多线程 / 多进程并发
    • 主线程 / 父进程:调用 accept() 监测客户端连接请求
      • 如果没有新的客户端的连接请求,当前线程 / 进程会阻塞
      • 如果有新的客户端连接请求解除阻塞,建立连接
    • 子线程 / 子进程:和建立连接的客户端通信
      • 调用 read() / recv() 接收客户端发送的通信数据,如果没有通信数据,当前线程 / 进程会阻塞,数据到达之后阻塞自动解除
      • 调用 write() / send() 给客户端发送数据,如果写缓冲区已满,当前线程 / 进程会阻塞,否则将待发送数据写入写缓冲区中
  • IO 多路转接并发
    • 使用 IO 多路转接函数委托内核检测服务器端所有的文件描述符(通信和监听两类),这个检测过程会导致进程 / 线程的阻塞,如果检测到已就绪的文件描述符阻塞解除,并将这些已就绪的文件描述符传出
    • 根据类型对传出的所有已就绪文件描述符进行判断,并做出不同的处理
      • 监听的文件描述符:和客户端建立连接
        • 此时调用 accept() 是不会导致程序阻塞的,因为监听的文件描述符是已就绪的(有新请求)
      • 通信的文件描述符:调用通信函数和已建立连接的客户端通信
        • 调用 read() / recv() 不会阻塞程序,因为通信的文件描述符是就绪的,读缓冲区内已有数据
        • 调用 write() / send() 不会阻塞程序,因为通信的文件描述符是就绪的,写缓冲区不满,可以往里面写数据
    • 对这些文件描述符继续进行下一轮的检测(循环往复。。。)

与多进程和多线程技术相比,I/O 多路复用技术的最大优势是系统开销小,系统不必创建进程 / 线程,也不必维护这些进程 / 线程,从而大大减小了系统的开销。

1.1 函数原型

使用 select 这种 IO 多路转接方式需要调用一个同名函数 select,这个函数是跨平台的,Linux、Mac、Windows 都是支持的。程序猿通过调用这个函数可以委托内核帮助我们检测若干个文件描述符的状态,其实就是检测这些文件描述符对应的读写缓冲区的状态:

  • 读缓冲区:检测里边有没有数据,如果有数据该缓冲区对应的文件描述符就绪
  • 写缓冲区:检测写缓冲区是否可以写 (有没有容量),如果有容量可以写,缓冲区对应的文件描述符就绪
  • 读写异常:检测读写缓冲区是否有异常,如果有该缓冲区对应的文件描述符就绪

委托检测的文件描述符被遍历检测完毕之后,已就绪的这些满足条件的文件描述符会通过 select() 的参数分 3 个集合传出,程序猿得到这几个集合之后就可以分情况依次处理了。

1.2 细节描述

在 select() 函数中第 2、3、4 个参数都是 fd_set 类型,它表示一个文件描述符的集合,类似于信号集 sigset_t,这个类型的数据有 128 个字节,也就是 1024 个标志位,和内核中文件描述符表中的文件描述符个数是一样的。

这并不是巧合,而是故意为之。这块内存中的每一个 bit 和 文件描述符表中的每一个文件描述符是一一对应的关系,这样就可以使用最小的存储空间将要表达的意思描述出来了。

内核在遍历这个读集合的过程中,如果被检测的文件描述符对应的读缓冲区中没有数据,内核将修改这个文件描述符在读集合 fd_set 中对应的标志位,改为 0,如果有数据那么这个标志位的值不变,还是 1。

当 select() 函数解除阻塞之后,被内核修改过的读集合通过参数传出,此时集合中只要标志位的值为 1,那么它对应的文件描述符肯定是就绪的,我们就可以基于这个文件描述符和客户端建立新连接或者通信了。

1.3 底层原理

select() 通过设置或检查存放的 fd 的数组来判断是否有事件需要处理,因此带来以下缺点。

  • 单个进程可以监视的 fd 数量被限制,32位最多 32*32 ,64位最多 32*64。具体可以通过 cat /proc/sys/fs/file-max 来设置;
  • 线性扫描整个 fd 数组,效率很低。如果有 IO 事件发生需要重新扫描,浪费很多 CPU时间,如果能够给每个 socket绑定回调函数,发生时自动完成相关操作,可以避免轮询。
int poll(struct pollfd *fds, int nfds, int timeout)
{
    int ret = sys_poll(fds, nfds, timeout);
      
    if (ret < 0) {
        SET_ERRNO(-ret);
        ret = -1;
    }
    return ret;
}
    
asmlinkage long sys_poll(struct pollfd * ufds, unsigned int nfds, long timeout)
{
    int i, j, fdcount, err;
    struct pollfd **fds;
    struct poll_wqueues table, *wait;
    int nchunks, nleft;
        
    /* Do a sanity check on nfds ... */
    if (nfds  NR_OPEN)
        return -EINVAL;
        
    // 2. 注册回调函数__pollwait
    poll_initwait(&table);
    wait = &table;
    if (!timeout)
        wait = NULL;
        
    err = -ENOMEM;
    fds = NULL;
    if (nfds != 0) {
        // 初始化 fd_set 数组
        fds = (struct pollfd **)kmalloc(
            (1 + (nfds - 1) / POLLFD_PER_PAGE) * sizeof(struct pollfd *),
            GFP_KERNEL);
        if (fds == NULL)
            goto out;
    }
        
    nchunks = 0;
    nleft = nfds;
    while (nleft  POLLFD_PER_PAGE) { /* allocate complete PAGE_SIZE chunks */
        fds[nchunks] = (struct pollfd *)__get_free_page(GFP_KERNEL);
        if (fds[nchunks] == NULL)
            goto out_fds;
        nchunks++;
        nleft -= POLLFD_PER_PAGE;
    }
    if (nleft) { /* allocate last PAGE_SIZE chunk, only nleft elements used */
        fds[nchunks] = (struct pollfd *)__get_free_page(GFP_KERNEL);
        if (fds[nchunks] == NULL)
            goto out_fds;
    }
        
    err = -EFAULT;
    for (i=0; i < nchunks; i++)
        // 1. 从用户空间拷贝 fd_set 结构到内核空间
        if (copy_from_user(fds[i], ufds + i*POLLFD_PER_PAGE, PAGE_SIZE))
            goto out_fds1;
    if (nleft) {
        if (copy_from_user(fds[nchunks], ufds + nchunks*POLLFD_PER_PAGE, 
                nleft * sizeof(struct pollfd)))
            goto out_fds1;
    }
        
    fdcount = do_poll(nfds, nchunks, nleft, fds, wait, timeout);
        
    /* OK, now copy the revents fields back to user space. */
    for(i=0; i < nchunks; i++)
        for (j=0; j < POLLFD_PER_PAGE; j++, ufds++)
            __put_user((fds[i] + j)-revents, &ufds-revents);
    if (nleft)
        for (j=0; j < nleft; j++, ufds++)
            __put_user((fds[nchunks] + j)-revents, &ufds-revents);
        
    err = fdcount;
    if (!fdcount && signal_pending(current))
        err = -EINTR;
        
    out_fds1:
    if (nleft)
        free_page((unsigned long)(fds[nchunks]));
    out_fds:
    for (i=0; i < nchunks; i++)
        free_page((unsigned long)(fds[i]));
    if (nfds != 0)
        kfree(fds);
    out:
    poll_freewait(&table);
    return err;
}

// 注册回调函数
void poll_initwait(struct poll_wqueues *pwq) {
    init_poll_funcptr(&pwq->pt, __pollwait);
    pwq->error = 0;
    pwq->table = NULL;
}

// 回调函数具体操作
void __pollwait(struct file *filp, wait_queue_head_t *wait_address, poll_table *_p)
{
    struct poll_wqueues *p = container_of(_p, struct poll_wqueues, pt);
    struct poll_table_page *table = p-table;
        
    if (!table || POLL_TABLE_FULL(table)) {
        struct poll_table_page *new_table;
        
        new_table = (struct poll_table_page *) __get_free_page(GFP_KERNEL);
        if (!new_table) {
            p-error = -ENOMEM;
            __set_current_state(TASK_RUNNING);
            return;
        }
        new_table-entry = new_table-entries;
        new_table-next = table;
        p-table = new_table;
        table = new_table;
    }
    
    /* 添加新节点 */
    {
        struct poll_table_entry * entry = table-entry;
        table-entry = entry+1;
        get_file(filp);
        entry-filp = filp;
        entry-wait_address = wait_address;
        init_waitqueue_entry(&entry-wait, current);
        add_wait_queue(wait_address,&entry-wait);
    }
}
// 死循环遍历
static int do_poll(unsigned int nfds, unsigned int nchunks, unsigned int nleft, 
struct pollfd *fds[], struct poll_wqueues *wait, long timeout)
{
    int count;
    poll_table* pt = &wait-pt;
        
    for (;;) {
        unsigned int i;
        
        set_current_state(TASK_INTERRUPTIBLE);
        count = 0;
        for (i=0; i < nchunks; i++)
            do_pollfd(POLLFD_PER_PAGE, fds[i], &pt, &count);
        if (nleft)
            do_pollfd(nleft, fds[nchunks], &pt, &count);
        pt = NULL;
        if (count || !timeout || signal_pending(current))
            break;
        count = wait-error;
        if (count)
            break;
        // 睡眠 timeout 时间
        timeout = schedule_timeout(timeout);
    }
    current-state = TASK_RUNNING;
    return count;
}

如上述 select() 底层代码,整体逻辑如下:

  • 使用 copy_from_user 从用户空间拷贝所有 fd_set 数组到内核空间;
  • 注册回调函数 __pollwait,该回调函数会遍历所有 fd 并且调用其 poll() 方法。例如对于 tcp_poll 就是调用 __pollwait() 函数。它将当前进程挂到设备的等待队列中,当设备收到一条消息时,唤醒等待队列中睡眠的进程;
  • 内核会执行 do_poll() 函数,该函数轮询所有 fd_set 数组,判断设备是否有资源可读写。如果一次遍历完成后没有发现可读写资源,会调用 schedule_timeout() 函数进入睡眠。睡眠过程中如果有可读写资源触发,会唤醒等待队列上的进程,否则睡眠超时后被唤醒进行下一轮轮询。
  • 最终 poll() 方法返回描述读写操作就绪状态的掩码,并把 fd_set 从内核态拷贝回用户态。

1.4 优缺点

  • 用户空间和内核空间需要维护一个存放大量 fd_set 结构数组,每次调用 select 都需要重新来回拷贝,开销较大;
  • select 支持最大监视文件描述符数量太小,默认为 1024个;
  • 需要内核主动轮询来判断是否有资源就绪。

Poll

1、结构及优缺点

struct pollfd {
  int fd;
  short events;
  short revents;
};

poll()pollfd 结构体维护,内部包括了 fd 文件描述符events 注册的事件revents 激活的事件,由内核修改

poll() 相比于 select() 优化了监视文件描述符个数的限制,采用链表进行存储。但是它没有解决在内核和用户空间之间拷贝大量文件描述符的数组,在 poll() 调用频繁时浪费资源。

Epoll

1、结构描述

epoll 将主动轮询修改为被动通知,所以 epoll 注册套接字后主程序可以去做其它事情,当事件发生时才去处理。

// 事件参数描述链接到文件描述符fd的对象
struct epoll_event {
  __u32 events;
  __u64 data;
} EPOLL_PACKED;

2、LT 和 ET

Epoll 包括两种触发模式,水平触发和边缘触发。

  • LT:是标准模式,意味着每次epoll_wait()返回后,事件处理后,如果之后还有数据,会不断触发,也就是说,一个套接字上一次完整的数据,epoll_wait()可能会返回多次,直到没有数据为止。
  • ET:也称高效模式,有数据过来后,epoll_wait()会返回一次,一段时间内,该套接字就算有数据源源不断地过来,epoll_wait()也不会返回了。这里注意,是一段时间,不代表这个套接字上有数据就只触发一次。时间过长,还是会返回多次的。比如我写FTP用了epoll+多线程,但是每次套接字上有信息就开线程处理,同一时间内希望一个套接字只被一个线程持有,但是因为文件传输时间过长,就算使用ET模式,套接字还是会返回多次。这里要特别强调一个参数EPOLLONESHOT,如果要保证套接字同一时段只被一个线程处理,必须加上。

ET(边缘触发)存在的意义:

  • 如果用 LT(水平触发),系统中一旦存在大量无需读写操作的就绪文件描述符,那么每次调用 epoll_wait 就会直接返回,它降低了处理有效文件描述符的效率。
  • 采用 ET 包含增了读写就绪文件描述符时,若一次性没有将全部数据读写完成,下次再调用 epoll_wait 时不会触发通知,直到该文件描述符第二次出现可读写事件才通知。它避免了无效文件描述符的重复处理。

解决方案:给accept()后的套接字加上参数EPOLLONESHOT,线程结束后处理完之后,再重置EPOLLONESHOT属性,但是,千万不可以给listen()后的监听套接字设置此属性,这会造成同一时刻只能处理一个连接的情况。

二者的差异在于level-trigger模式下只要某个socket处于readable/writable状态,无论什么时候进行epoll_wait都会返回该socket;而edge-trigger模式下只有某个socket从unreadable变为readable或从unwritable变为writable时,epoll_wait才会返回该socket。如下两个示意图:

从socket读数据:

从socket写数据:

所以,在epoll的ET模式下,正确的读写方式为:
读:只要可读,就一直读,直到返回0,或者 errno = EAGAIN 写:只要可写,就一直写,直到数据发送完,或者 errno = EAGAIN。

//ET模式下正确的读操作
n = 0;
while ((nread = read(fd, buf+n, BUFSIZ-1)) > 0) {
  if (nread == -1 && errno != EAGAIN)
  {
    perror("read error");
  }
  n += nread;
}


//ET模式下正确的写操作
int nwrite, data_size = strlen(buf);
n = data_size;
nwrite = 0;
while(n > 0) {
  nwrite = write(fd, buf+data_size-n, n);
  if (nwrite < n)
  {
    if (nwrite == -1 && errno != EAGAIN)
    {
      perror("write error");
    }
    break;
  }
  n -= nwrite;
}

3、Epoll 的优点

  • 没有最大 fd 文件描述符数限制;
  • 效率不会随着 fd 数量增加而下降。因为只有活跃可用的 fd 才会触发回调,不会进行无意义的轮询;
  • 免去了用户空间和内核空间之间对文件描述符数组的拷贝,epoll 采用 mmap 内存映射技术,操作用户态虚拟地址空间即操作内核态文件页缓存地址;

4、原理分析

4.1 epoll_create

asmlinkage int sys_epoll_create(int maxfds)
{
  int error = -EINVAL, fd;
  unsigned long addr;
  struct inode *inode;
  struct file *file;
  struct eventpoll *ep;
​
  /*
   * eventpoll接口中不可能存储超过MAX_FDS_IN_EVENTPOLL的fd
   */
  if (maxfds > MAX_FDS_IN_EVENTPOLL)
    goto eexit_1;
​
  /*
   * Creates all the items needed to setup an eventpoll file. That is,
   * a file structure, and inode and a free file descriptor.
   */
  error = ep_getfd(&fd, &inode, &file);
  if (error)
    goto eexit_1;
​
  /*
   * 调用去初始化eventpoll file. 这和"open" file operation callback一样,因为 inside
   * ep_getfd() we did what the kernel usually does before invoking
   * corresponding file "open" callback.
   */
  error = open_eventpoll(inode, file);
  if (error)
    goto eexit_2;
​
  /* "private_data" 由open_eventpoll()设置 */
  ep = file->private_data;
​
  /* 分配页给event double buffer */
  error = ep_do_alloc_pages(ep, EP_FDS_PAGES(maxfds + 1));
  if (error)
    goto eexit_2;
​
  /*
   * 创建event double buffer的一个用户空间的映射,以避免当返回events给调用者时,内核到用户空间的内存复制
   */
  down_write(&current->mm->mmap_sem);
  addr = do_mmap_pgoff(file, 0, EP_MAP_SIZE(maxfds + 1), PROT_READ,
           MAP_PRIVATE, 0);
  up_write(&current->mm->mmap_sem);
  error = PTR_ERR((void *) addr);
  if (IS_ERR((void *) addr))
    goto eexit_2;
​
  return fd;
​
eexit_2:
  sys_close(fd);
eexit_1:
  return error;
}

epoll() 通过 epoll_create() 创建文件描述符。

4.2 epoll_ctl

asmlinkage int sys_epoll_ctl(int epfd, int op, int fd, unsigned int events)
{
  int error = -EBADF;
  struct file *file;
  struct eventpoll *ep;
  struct epitem *dpi;
  struct pollfd pfd;
​
  // 获取epfd对应的file实例
  file = fget(epfd);
  if (!file)
    goto eexit_1;
​
  /*
   * We have to check that the file structure underneath the file descriptor
   * the user passed to us _is_ an eventpoll file.
   * 检查fd对应文件是否是一个eventpoll文件
   */
  error = -EINVAL;
  if (!IS_FILE_EPOLL(file))
    goto eexit_2;
​
  /*
   * At this point it is safe to assume that the "private_data" contains
   * our own data structure.
   * 获取eventpoll文件中的私有数据,该数据是在epoll_create中创建的
   */
  ep = file->private_data;
​
  down_write(&ep->acsem);
​
  pfd.fd = fd;
  pfd.events = events | POLLERR | POLLHUP;
  pfd.revents = 0;
​
  // 在eventpoll中存储文件描述符信息的红黑树中查找指定的fd对应的epitem实例
  dpi = ep_find(ep, fd);
​
  error = -EINVAL;
  switch (op) {
  case EP_CTL_ADD:
    // 若要添加的fd不存在,则调用ep_insert()插入红黑树
    if (!dpi)
      error = ep_insert(ep, &pfd);
    else
      // 若已存在,则返回EEXIST错误
      error = -EEXIST;
    break;
  case EP_CTL_DEL:
    if (dpi)
      error = ep_remove(ep, dpi);
    else
      error = -ENOENT;
    break;
  case EP_CTL_MOD:
    if (dpi) {
      dpi->pfd.events = events;
      error = 0;
    } else
      error = -ENOENT;
    break;
  }
​
  up_write(&ep->acsem);
​
eexit_2:
  fput(file);
eexit_1:
  return error;
}

epoll_ctl() 负责注册要监听的事件类型。在每次将新事件注册到 epoll 句柄中时(epoll_ctl 中指定 EPOLL_CTL_ADD),会把所有 fd 拷贝进内核,而非在 epoll_wait 时重复拷贝。epoll 保证整个过程中,fd 只会在 epoll_ctl 阶段拷贝一次。

类似于 EPOLL_CTL_ADD 操作,还有 EPOLL_CTL_MODEPOLL_CTL_DEL 操作。

4.3 epoll_wait

asmlinkage int sys_epoll_wait(int epfd, struct pollfd const **events, int timeout)
{
  int error = -EBADF;
  void *eaddr;
  struct file *file;
  struct eventpoll *ep;
  struct evpoll dvp;
​
  file = fget(epfd);
  if (!file)
    goto eexit_1;
​
  /*
   * We have to check that the file structure underneath the file descriptor
   * the user passed to us _is_ an eventpoll file.
   */
  error = -EINVAL;
  if (!IS_FILE_EPOLL(file))
    goto eexit_2;
​
  ep = file->private_data;
​
  /*
   * It is possible that the user created an eventpoll file by open()ing
   * the corresponding /dev/ file and he did not perform the correct
   * initialization required by the old /dev/epoll interface. This test
   * protect us from this scenario.
   */
  error = -EINVAL;
  if (!atomic_read(&ep->mmapped))
    goto eexit_2;
​
  dvp.ep_timeout = timeout;
  error = ep_poll(ep, &dvp);
  if (error > 0) {
    eaddr = (void *) (ep->vmabase + dvp.ep_resoff);
    if (copy_to_user(events, &eaddr, sizeof(struct pollfd *)))
      error = -EFAULT;
  }
​
eexit_2:
  fput(file);
eexit_1:
  return error;
}

epoll 并不像 select/poll 每次都将待扫描的文件描述符信息加入到设备等待队列中,而只是在 epoll_ctl 时将其加入到红黑树中,并为每个文件描述符指定一个回调函数。

当设备就绪,唤醒了红黑树中等待者时会调用该回调函数,回调函数将 fd 加入到一个就绪链表中。

epoll_wait 操作实际上就是在该就绪链表中查看有无就绪的文件描述符 fd。

5、Epoll 存在的坑

5.1 Epoll 的坑 ———— EPOLLONESHOT 事件

即使使用ET模式,一个socket上的某个事件还是可能被触发多次,这是跟数据报的大小有关系,常见的情景就是一个线程,而在数据的处理过程中该socket上又有新数据可读(EPOLLIN再次被触发),此时另外一个线程被唤醒处理这些新的数据,于是出现了两个线程同时操作一个socket为了避免这种情况,就可以采用epoll的EPOLLONESPOT事件。同时要注意,注册了EPOLLONESHOT事件的socket一旦被某个线程处理完毕,该线程就应该立即重置这个socket的EPOLLONESHOT的事件,以确保这个socket下次可读时,其EPOLLIN事件被触发,进而让其他的工作线程有机会继续处理这个socket。

5.2 Epoll 的坑 ———— EAGAIN 事件

在一个非阻塞的socket上调用read/write函数, 返回EAGAIN或者EWOULDBLOCK(注: EAGAIN就是EWOULDBLOCK)从字面上看, 意思是:EAGAIN: 再试一次,EWOULDBLOCK: 如果这是一个阻塞socket, 操作将被block,perror输出: Resource temporarily unavailable.

这个错误表示资源暂时不够,能read时,读缓冲区没有数据,或者write时,写缓冲区满了。遇到这种情况,如果是阻塞socket,read/write就要阻塞掉。而如果是非阻塞socket,read/write立即返回-1, 同时errno设置为EAGAIN。所以,对于阻塞socket,read/write返回-1代表网络出错了但对于非阻塞socket,read/write返回-1不一定网络真的出错了。可能是Resource temporarily unavailable。这时你应该再试,直到Resource available。

EAGAIN: 再试一次,EWOULDBLOCK: 如果这是一个阻塞socket, 操作将被block,perror输出: Resource temporarily unavailable。这个错误表示资源暂时不够,能read时,读缓冲区没有数据,或者write时,写缓冲区满了。遇到这种情况,如果是阻塞socket,read/write就要阻塞掉。而如果是非阻塞socket,read/write立即返回-1, 同时errno设置为EAGAIN。所以,对于阻塞socket,read/write返回-1代表网络出错了。但对于非阻塞socket,read/write返回-1不一定网络真的出错了。可能是Resource temporarily unavailable。这时你应该再试,直到Resource available。

对于非阻塞的 socket,在接收到 errno=EAGAIN 时应该忽略掉错误,继续重试。

5.3 Epoll 的坑 ———— accept 问题

  • 阻塞模式 accept 存在的问题

考虑这种情况:TCP连接被客户端夭折,即在服务器调用accept之前,客户端主动发送RST终止连接,导致刚刚建立的连接从就绪队列中移出,如果套接口被设置成阻塞模式,服务器就会一直阻塞在accept调用上,直到其他某个客户建立一个新的连接为止。但是在此期间,服务器单纯地阻塞在accept调用上,就绪队列中的其他描述符都得不到处理。

解决方案:把监听套接口设置为非阻塞,当客户在服务器调用accept之前中止某个连接时,accept调用可以立即返回-1,这时源自Berkeley的实现会在内核中处理该事件,并不会将该事件通知给epoll,而其他实现把errno设置为ECONNABORTED或者EPROTO错误,我们应该忽略这两个错误。

  • ET模式下 accept 存在的问题

考虑这种情况:多个连接同时到达,服务器的TCP就绪队列瞬间积累多个就绪连接,由于是边缘触发模式,epoll只会通知一次,accept只处理一个连接,导致TCP就绪队列中剩下的连接都得不到处理。
解决办法是用while循环抱住accept调用,处理完TCP就绪队列中的所有连接后再退出循环。如何知道是否处理完就绪队列中的所有连接呢?accept返回-1并且errno设置为EAGAIN就表示所有连接都处理完。

while ((conn_sock = accept(listenfd, (struct sockaddr*)&remote, (size_t*)&addrlen)) > 0)
{
  handle_client(conn_sock);
}
if (conn_sock == -1) {
  if (errno != EAGAIN && errno != ECONNABORTED && errno != EPROTO && errno != EINTR)
  perror("accept");
}

5.4 Epoll 的坑 ———— 水平触发当socket可写会不停触发socket可写事件

使用Linux epoll模型,水平触发模式;当socket可写时,会不停的触发socket可写的事件,如何处理?

  • 第一种最普遍的方式:
    需要向socket写数据的时候才把socket加入epoll,等待可写事件。接受到可写事件后,调用write或者send发送数据。当所有数据都写完后,把socket移出epoll。
    这种方式的缺点是,即使发送很少的数据,也要把socket加入epoll,写完后在移出epoll,有一定操作代价。

  • 第二种的方式:
    开始不把socket加入epoll,需要向socket写数据的时候,直接调用write或者send发送数据。如果返回EAGAIN,把socket加入epoll,在epoll的驱动下写数据,全部数据发送完毕后,再移出epoll。
    这种方式的优点是:数据不多的时候可以避免epoll的事件处理,提高效率。

参考

https://blog.csdn.net/linuxheik/article/details/73294658