IO多路复用

发布时间 2023-12-19 14:47:22作者: zxinlog

IO多路复用

socket

网络编程绕不开 socket 编程,socket ,插座,可以容许多端进行通信。

服务端流程

服务端绑定socket,首先创建socket, 说明所使用的网络协议TCP还是UDP,然后bind绑定到一个IP地址。开启监听 listen,监听端口,当检测到端口有其他主机发送的请求时,则accept 建立连接。建立之后,通过read、write 进行读写。

socket 就是一种网络上的通信方式,socket,send、recv、close。非常类似文件的 open、write、read、close。

创建 socket

类似于打开文件,想要通信,也需要首先创建一个 socket,创建socket 所需要的信息:网络协议,通信协议。这里的socket也和open文件一样,都是一个文件描述符。

网络协议指代IPv4 或者IPv6,通信协议就是传输层协议是 TCP 还是 UDP。

tcp_fd = socket(AF_INET, SOCK_STREAM, 0);
udp_fd = socket(AF_INET, SOCK_DGRAM, 0);

前面的AF_INET 表示IPv4 协议,最后一个0 表示默认,会根据前面是UDP还是TCP自行选择。
tcp就是SOCK_STREAM
udp就是SOCK_DGRAM。

也可以使用一些其他的协议,如IPv6、Local等网络协议,具体的可以使用到的时候进行查询。

绑定地址

创建完socket之后的下一步, 一般涉及到地址绑定的问题。地址涉及多个数据,如IP地址,端口号,Linux 下有预备好的数据结构。

使用的数据结构是:

struct sockaddr_in 

在绑定的时候,使用的数据结构是 sockaddr, 而在这里使用 sockaddr_in ,是因为二者表示的都是同样的内部数据结构,不过使用sockaddr_in 可以更加清晰的进行赋值。

struct sockaddr_in addr;
addr.sin_family = AF_INET; 
addr.sin_port = htons(8888)
addr.sin_addr.s_addr = INADDR_ANY;

sin_family 仍然是表示所使用的网络协议是 IPv4.

sin_port 中的htons 表示将主机字节序转为 网络字节序。在使用socket进行绑定的时候,涉及到网络字节序的问题。因此使用时需要转换。(大端小端)

sin_addr.s_addr 表示需要绑定的ip地址。可以使用 INADDR_ANY 自动绑定,直接通过其中的 s_addr进行绑定

addr.sin_addr.s_addr = inet_addr("192.168.1.1");

这是使用inet_addr的方式,将一个点分十进制转换成一个网络序二进制。

也可以使用 inet_pton的方式进行绑定。

if(inet_pton(AF_INET, "192.168.1.1", addr.sin_addr) <= 0){
    perror("");
}

pton 从人类的表示方法,到network的表示方法,第一个参数是网络协议、第二个是src、第三个是dst。

struct sockaddr_in addr;
addr.sin_family = AF_INET; 
addr.sin_port = htons(8888);
addr.sin_addr.s_addr = inet_addr("127.0.0.1");
// inet_pton(AF_INET, "127.0.0.1", addr.sin_addr);

bind 也是有返回值的,当返回值 < 0,则绑定失败。

if(bind(tcp_sock, (struct sockaddr*)&addr, sizeof(addr)) < 0){
	perror("");
}

绑定的时候,需要将 addr 转成 sockaddr* 的类型,也就是地址类型。

就是告诉 sock,你所需要的数据类型地址信息和长度信息。去绑定吧。

监听

listen 表示开启监听,一般在服务端进行使用。服务端开启监听之后,参数填上监听的socket 套接字,也就是之前已经绑定过ip地址端口的套接字。然后填上要监听的数量。

#include <sys/socket.h>

int listen(int sockfd, int backlog);

listen 没有太多可将的内容,值得注意的是,listen 不要放在循环内,只需要声明一次,表示开启监听即可。操作系统内核会维护一个监听队列。队列大小和backlog一致。

接收请求

accept 表示接收连接,建立连接。如果服务器的socket文件描述符开启监听之后,有客户端连接请求,希望连接到服务器,就需要使用accept 进行建立连接。

accept 会返回一个socket文件描述符,一般表示独立的对于客户端的,也就是说,可以通过返回的这个文件描述符和客户端进行通信。

问题出现了,如何能够建立完连接,再和所建立的客户端进行通信。

#include <arpa/inet.h>
#include <memory.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/socket.h>

int main(int argc, char *argv[]) {
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd == -1) {
        perror("socket fail\n");
        exit(-1);
    }

    struct sockaddr_in addr;
    addr.sin_family = AF_INET;
    addr.sin_port = htons(8888);
    if (inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr) == -1) {
        perror("inet_pton fail\n");
    }

    if (bind(sockfd, (struct sockaddr *) &addr, sizeof(addr)) == -1) {
        perror("bind fail\n");
    }

    listen(sockfd, 10);

    char buf[1024];
    while (1) {
        int client_fd = accept(sockfd, NULL, NULL);
        if (client_fd == -1) {
            perror("accept fail\n");
        }

        memset(buf, 0, sizeof(buf));
        recv(client_fd, buf, sizeof(buf), 0);
        printf("%s\n", buf);
    }
}

while 循环之内的内容,表示建立连接accept,然后接收从客户端发来的信息并打印输出。

然后呢?下一次循环仍然会去建立连接,也就是说,客户端就算在此时发了100条信息,它也打印不出来了。

那么怎样才能既监听客户端的请求,又监听客户端的连接?

一般我们会想到使用多进程、多线程,一个线程负责一个事务不就好了,一个线程负责一个客户端,每来一个客户端,就开启一个新线程。

这么想也没错,但是在单线程的情况下,如何实现这个功能,这就要提到一项技术:IO多路复用。

IO多路复用是一项在单进程、线程下,同时监听多个IO操作的机制。

目前的IO多路复用有 select、poll、epoll。

先从select开始介绍使用。

#include <sys/select.h>
select

select 需要创建一个 fd_set 队列,也就是文件描述符队列。用来存放多个文件描述符的。可以这样想,既然要监听多个文件描述符,那么肯定在所需要监听的位置,存在一个监听队列。对不同的文件描述符进行一个区分。所以 fd_set 的用法就出来了。

fd_set set;
// 这里我就直接命名为set了

在C语言中创建结构体或者什么的,一般都需要先清空结构体,就算申请内存空间,也会进行初始化。

通过 FD_ZERO 进行清零

fd_set set;
FD_ZERO(&set); //这里初始化,使用的是set的地址。

select 原型:

int select(int nfds,
           fd_set *readfds,
           fd_set *writefds, 
           fd_set *exceptfds,
           struct timeval *timeout);

nfds 表示最大的一个文件描述符+1,比如我要监听的是【1,2,3】,那这里就用4.表示我就监听到4,如果监听的是 【0...100】,那这里就填101,总之是申请过的最大的文件描述符+1的值。

readfds 表示读队列,既然监听,就使用这个队列,用于监听一个文件描述符的读事件。

writefds 表示写队列,可以监听一个文件描述符是否可写。

exceptfds 表示异常队列。一般不怎么用。

timeout 表示超时,通过设置一个时间,如果超时则select返回。

对了,这里介绍一下select的返回,、

  • 当select有事件发生,返回,返回值是发生的事件的数量。
  • 当timeout超时时,返回,返回值为0.
  • select 出现错误,返回 -1

那么就可以在while循环开始的时候,先定义好要监听的文件描述符,将要监听的文件描述符添加到监听队列中。select每次返回都会清空监听队列,所以要将这个添加行为,写在while循环的开始部分。

select 通过FD_SET 进行添加,当select返回 > 0的值时,就可以通过 FD_ISSET进行筛选,看到底是哪几个文件描述符就绪,然后根据这个,进行相应的操作。

FD_SET,第一个参数是要监听的文件描述符,第二个参数是监听队列的地址。

FD_SET(sockfd, &set);

此时已经可以实现多用户在服务器上的对话了(没什么用,谁看服务器)

#include <arpa/inet.h>
#include <memory.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/select.h>
#include <sys/socket.h>

int main(int argc, char *argv[]) {
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd == -1) {
        perror("socket fail\n");
        exit(-1);
    }

    struct sockaddr_in addr;
    addr.sin_family = AF_INET;
    addr.sin_port = htons(8888);
    if (inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr) == -1) {
        perror("inet_pton fail\n");
    }

    if (bind(sockfd, (struct sockaddr *) &addr, sizeof(addr)) == -1) {
        perror("bind fail\n");
    }

    listen(sockfd, 10);

    char buf[1024];
    fd_set set;
    FD_ZERO(&set);
    int client_fds[10] = {-1};
    int index = 0;
    while (1) {
        FD_SET(sockfd, &set);
        for (int i = 0; i < index; i++) {
            FD_SET(client_fds[i], &set);
        }

        int ret = select(sockfd + index + 1, &set, NULL, NULL, NULL);
        if (ret == -1) {
            perror("select fail");
        }

        if (FD_ISSET(sockfd, &set)) {
            client_fds[index] = accept(sockfd, NULL, NULL);
            if (client_fds[index] == -1) {
                perror("accept fail\n");
            }
            index++;
        }

        for (int i = 0; i < index; i++) {
            if (FD_ISSET(client_fds[i], &set)) {
                int fd = client_fds[i];
                memset(buf, 0, sizeof(buf));
                recv(fd, buf, sizeof(buf), 0);
                printf("%d: %s\n", i, buf);
            }
        }
    }
}

首先将sockfd添加到监听队列,然后每监听到一个客户端的连接,就将该客户端添加到一个客户端队列client_fds中,设置一个index,表示目前有多少个客户端。(并没有维护客户端的断开)。

FD_SET会将需要监听的文件描述符都添加到监听队列。

这就是select的一个初步使用。这里只介绍到这里。重点介绍epoll。poll是select到epoll的过度阶段,一般而言,就没有必要再去了解了。

select内部就是通过维护一个监听队列,这个监听队列是用位图实现的。select 优点就是使用简单。缺点就是内部使用位图结构,监听的数量过大,ISSET会对监听的文件描述符轮询,没有区分开监听队列和就绪队列。且每次就绪之后,该队列都会清空返回,在循环的开始部分还需要再次添加需要监听的文件描述符。

监听就绪队列有上限,操作系统最初限制为1024,如果想要修改这个限制,则需要重新编译操作系统。

另外,这个程序有bug,比如通信的时候,对方断开通信,则我方会循环接收(死循环)。

因为使用recv接收数据的时候,返回值大于0,返回的是接收数据的长度。返回值等于0,说明客户端断开连接,返回值小于0,说明接收失败。只需要添加一个返回值的判断即可。

epoll

epoll 是linux平台下一种io多路复用机制,用于高效处理大量文件描述符的并发IO。基于事件驱动。

epoll 高性能、分离监听队列和就绪队列,监听队列使用红黑树,就绪队列使用线性表。(就绪队列中的文件描述符数量少,可以使用线性表),且没有上限。

epoll 在各个方面全面碾压select,之后使用epoll作为IO多路复用的主要工具。

首先需要创建epoll_fd,epoll_fd 是一个数据结构,通过epoll_create或epoll_create1进行创建,这个数据结构中通过两个数据结构进行保存文件描述符。红黑树+双链表。红黑树用来监听文件描述符,可快速找到描述符,寻找的速度是 O(logN) 级别的,而双链表是用来存放就绪队列的。当监听队列中的文件描述符就绪后,就会将其转移到双链表。即使监听的文件描述符众多也不会影响性能。这就是epoll比select更加优势的地方。可高校处理大量文件描述符。

其次,要提到一点,select 每次都需要重新将文件描述符添加到监听队列(select 根本没有分离监听队列和就绪队列,就绪什么的都是在同一个队列),epoll 分离了监听队列和就绪队列。就绪队列每次wait等待到相应的操作后,就会从就绪队列中移除,也就是说 使用epoll,不需要再每次添加文件描述符到队列中。

epfd = epoll_create1(0);

之前使用epoll_create,参数是一个大于0 的值,用于确定队列的长度,从Linux 某个版本之后,就可以自动确定队列长度,使用 epoll_create1(0), 参数是一个标志,一般填 0 用于使用默认配置。

此时就可以拥有一个epfd。

创建一个事件,事件中包含需要监听的操作和文件描述符

   41     struct epoll_event event;            
   42     event.events = EPOLLIN;              
   43     event.data.fd = sockfd;

这里创建一个event,event 可以重复使用。只要通过 epoll_ctl 注册过后,可以修改event 用于其他的事件。

   44     epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &event);

第一个参数是epoll_create1创建出的epollfd,第二个参数是epoll的添加、修改、删除操作。

第三个参数是需要监听的文件描述符,用于和event相关联。第四个是事件event,这里使用event之后,就已经让epfd和sockfd创建了event上的连接。event的内容在此之后可以修改,不影响操作内容。

 int epoll_wait(
 				int epfd, 
 				struct epoll_event *events,
                int maxevents, 
                int timeout);

等待事件就绪,因此需要一个用于存放事件的队列。

 struct epoll_event events[10];

maxevents 用于表明events的最大容量。

timeout用于设置超时时间。-1 表示一直等待,timeout 的值是一个毫秒值。

#include <arpa/inet.h>
#include <memory.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <sys/select.h>
#include <sys/socket.h>

int main(int argc, char *argv[]) {
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd == -1) {
        perror("socket fail\n");
        exit(-1);
    }

    struct sockaddr_in addr;
    addr.sin_family = AF_INET;
    addr.sin_port = htons(8888);
    if (inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr) == -1) {
        perror("inet_pton fail\n");
    }

    if (bind(sockfd, (struct sockaddr *) &addr, sizeof(addr)) == -1) {
        perror("bind fail\n");
    }

    listen(sockfd, 10);

    char buf[1024];
    int client_fds[10] = {-1};
    int index = 0;

    int epfd = epoll_create1(0);

    struct epoll_event event;
    event.events = EPOLLIN;
    event.data.fd = sockfd;
    epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &event);
    struct epoll_event events[10];

    while (1) {
        int event_num = epoll_wait(epfd, events, 10, -1);
        if (event_num == 0 || event_num == -1) {
            continue;
        }
        for (int i = 0; i < event_num; i++) {
            if (events[i].data.fd == sockfd) {
                client_fds[index] = accept(sockfd, NULL, NULL);
                event.events = EPOLLIN;
                event.data.fd = client_fds[index];
                epoll_ctl(epfd, EPOLL_CTL_ADD, client_fds[index], &event);
                printf("有新连接...\n");
                index++;
            }

            // if(events[i].data.fd == )
            for (int k = 0; k < index; k++) {
                if (events[i].data.fd == client_fds[k]) {
                    int fd = events[i].data.fd;
                    memset(buf, 0, sizeof(buf));
                    int ret = recv(fd, buf, sizeof(buf), 0);
                    if (ret == 0) {
                        event.events = EPOLLIN;
                        event.data.fd = fd;
                        epoll_ctl(epfd, EPOLL_CTL_DEL, fd, &event);
                        client_fds[k] = -1;
                    }
                    if (ret > 0) {
                        printf("%d:%s\n", k, buf);
                    }
                }
            }
        }
    }
}

select 和 epoll 实现了相同功能的代码,而epoll则支持高性能的大量文件描述符的使用。