Linux - IO多路复用之select

发布时间 2023-05-08 11:38:58作者: [BORUTO]

1. IO 多路转接 (复用)

 

IO 多路转接也称为 IO 多路复用,它是一种网络通信的手段(机制),通过这种方式可以同时监测多个文件描述符并且这个过程是阻塞的,一旦检测到有文件描述符就绪( 可以读数据或者可以写数据)程序的阻塞就会被解除,

之后就可以基于这些(一个或多个)就绪的文件描述符进行通信了。通过这种方式在单线程 / 进程的场景下也可以在服务器端实现并发。常见的 IO 多路转接方式有:select、poll、epoll。

 

下面先对多线程 / 多进程并发和 IO 多路转接的并发处理流程进行对比(服务器端):

多线程 / 多进程并发

  • 主线程 / 父进程:调用 accept() 监测客户端连接请求
  • 如果没有新的客户端的连接请求,当前线程 / 进程会阻塞
  • 如果有新的客户端连接请求解除阻塞,建立连接
  • 子线程 / 子进程:和建立连接的客户端通信
  • 调用 read() / recv() 接收客户端发送的通信数据,如果没有通信数据,当前线程 / 进程会阻塞,数据到达之后阻塞自动解除
  • 调用 write() / send() 给客户端发送数据,如果写缓冲区已满,当前线程 / 进程会阻塞,否则将待发送数据写入写缓冲区中

IO 多路转接并发

  • 使用 IO 多路转接函数委托内核检测服务器端所有的文件描述符(通信和监听两类),这个检测过程会导致进程 / 线程的阻塞,如果检测到已就绪的文件描述符阻塞解除,并将这些已就绪的文件描述符传出
  • 根据类型对传出的所有已就绪文件描述符进行判断,并做出不同的处理
  • 监听的文件描述符:和客户端建立连接
  • 此时调用 accept() 是不会导致程序阻塞的,因为监听的文件描述符是已就绪的(有新请求)
  • 通信的文件描述符:调用通信函数和已建立连接的客户端通信
  • 调用 read() / recv() 不会阻塞程序,因为通信的文件描述符是就绪的,读缓冲区内已有数据
  • 调用 write() / send() 不会阻塞程序,因为通信的文件描述符是就绪的,写缓冲区不满,可以往里面写数据
  • 对这些文件描述符继续进行下一轮的检测(循环往复。。。)

与多进程和多线程技术相比,I/O 多路复用技术的最大优势是系统开销小,系统不必创建进程 / 线程,也不必维护这些进程 / 线程,从而大大减小了系统的开销。

 

2. select

2.1 函数原型

使用 select 这种 IO 多路转接方式需要调用一个同名函数 select,这个函数是跨平台的,Linux、Mac、Windows 都是支持的。程序猿通过调用这个函数可以委托内核帮助我们检测若干个文件描述符的状态,其实就是检测这些文件描述符对应的读写缓冲区的状态:

  • 读缓冲区:检测里边有没有数据,如果有数据该缓冲区对应的文件描述符就绪
  • 写缓冲区:检测写缓冲区是否可以写 (有没有容量),如果有容量可以写,缓冲区对应的文件描述符就绪
  • 读写异常:检测读写缓冲区是否有异常,如果有该缓冲区对应的文件描述符就绪

委托检测的文件描述符被遍历检测完毕之后,已就绪的这些满足条件的文件描述符会通过 select() 的参数分 3 个集合传出,程序猿得到这几个集合之后就可以分情况依次处理了。

下面来看一下这个函数的函数原型:

#include <sys/select.h>
struct timeval {
    time_t      tv_sec;         /* seconds */
    suseconds_t tv_usec;        /* microseconds */
};

int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval * timeout);


函数参数:

nfds:委托内核检测的这三个集合中最大的文件描述符 + 1

  • 内核需要线性遍历这些集合中的文件描述符,这个值是循环结束的条件
  • 在 Window 中这个参数是无效的,指定为 - 1 即可

readfds:文件描述符的集合,内核只检测这个集合中文件描述符对应的读缓冲区

  • 传入传出参数,读集合一般情况下都是需要检测的,这样才知道通过哪个文件描述符接收数据

writefds:文件描述符的集合,内核只检测这个集合中文件描述符对应的写缓冲区

  • 传入传出参数,如果不需要使用这个参数可以指定为 NULL

exceptfds:文件描述符的集合,内核检测集合中文件描述符是否有异常状态

  • 传入传出参数,如果不需要使用这个参数可以指定为 NULL

timeout:超时时长,用来强制解除 select () 函数的阻塞的

  • NULL:函数检测不到就绪的文件描述符会一直阻塞。
  • 等待固定时长(秒):函数检测不到就绪的文件描述符,在指定时长之后强制解除阻塞,函数返回 0
  • 不等待:函数不会阻塞,直接将该参数对应的结构体初始化为 0 即可。

函数返回值:

  • 大于 0:成功,返回集合中已就绪的文件描述符的总个数
  • 等于 - 1:函数调用失败
  • 等于 0:超时,没有检测到就绪的文件描述符

另外初始化 fd_set 类型的参数还需要使用相关的一些列操作函数,具体如下:

// 将文件描述符fd从set集合中删除 == 将fd对应的标志位设置为0        
void FD_CLR(int fd, fd_set *set);

// 判断文件描述符fd是否在set集合中 == 读一下fd对应的标志位到底是0还是1
int  FD_ISSET(int fd, fd_set *set);

// 将文件描述符fd添加到set集合中 == 将fd对应的标志位设置为1
void FD_SET(int fd, fd_set *set);

// 将set集合中, 所有文件文件描述符对应的标志位设置为0, 集合中没有添加任何文件描述符
void FD_ZERO(fd_set *set);

 

2.2 细节描述

在 select() 函数中第 2、3、4 个参数都是 fd_set 类型,它表示一个文件描述符的集合,类似于信号集 sigset_t,这个类型的数据有 128 个字节,也就是 1024 个标志位,和内核中文件描述符表中的文件描述符个数是一样的。

sizeof(fd_set) = 128 字节 * 8 = 1024 bit      // int [32]


这并不是巧合,而是故意为之。这块内存中的每一个 bit 和 文件描述符表中的每一个文件描述符是一一对应的关系,这样就可以使用最小的存储空间将要表达的意思描述出来了。

下图中的 fd_set 中存储了要委托内核检测读缓冲区的文件描述符集合。

  • 如果集合中的标志位为 0 代表不检测这个文件描述符状态
  • 如果集合中的标志位为 1 代表检测这个文件描述符状态

内核在遍历这个读集合的过程中,如果被检测的文件描述符对应的读缓冲区中没有数据,内核将修改这个文件描述符在读集合 fd_set 中对应的标志位,改为 0,如果有数据那么这个标志位的值不变,还是 1。

当 select() 函数解除阻塞之后,被内核修改过的读集合通过参数传出,此时集合中只要标志位的值为 1,那么它对应的文件描述符肯定是就绪的,我们就可以基于这个文件描述符和客户端建立新连接或者通信了。

 

3. 并发处理

3.1 处理流程

如果在服务器基于 select 实现并发,其处理流程如下:

(1) 创建监听的套接字 lfd = socket ();

(2) 将监听的套接字和本地的 IP 和端口绑定 bind ()

(3) 给监听的套接字设置监听 listen ()

(4) 创建一个文件描述符集合 fd_set,用于存储需要检测读事件的所有的文件描述符

  • 通过 FD_ZERO () 初始化
  • 通过 FD_SET () 将监听的文件描述符放入检测的读集合中

(5) 循环调用 select (),周期性的对所有的文件描述符进行检测

(6) select () 解除阻塞返回,得到内核传出的满足条件的就绪的文件描述符集合

  • 通过 FD_ISSET () 判断集合中的标志位是否为 1
  • 如果这个文件描述符是监听的文件描述符,调用 accept () 和客户端建立连接
  • 将得到的新的通信的文件描述符,通过 FD_SET () 放入到检测集合中
  • 如果这个文件描述符是通信的文件描述符,调用通信函数和客户端通信
  • 如果客户端和服务器断开了连接,使用 FD_CLR () 将这个文件描述符从检测集合中删除
  • 如果没有断开连接,正常通信即可

(7) 重复第 6 步

 

3.2 通信代码

服务器端代码:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>

int main()
{
    // 1. 创建监听的fd
    int lfd = socket(AF_INET, SOCK_STREAM, 0);

    // 2. 绑定
    struct sockaddr_in addr;
    addr.sin_family = AF_INET;
    addr.sin_port = htons(9999);
    addr.sin_addr.s_addr = INADDR_ANY;
    bind(lfd, (struct sockaddr*)&addr, sizeof(addr));

    // 3. 设置监听
    listen(lfd, 128);

    // 将监听的fd的状态检测委托给内核检测
    int maxfd = lfd;
    // 初始化检测的读集合
    fd_set rdset;
    fd_set rdtemp;
    // 清零
    FD_ZERO(&rdset);
    // 将监听的lfd设置到检测的读集合中
    FD_SET(lfd, &rdset);
    // 通过select委托内核检测读集合中的文件描述符状态, 检测read缓冲区有没有数据
    // 如果有数据, select解除阻塞返回
    // 应该让内核持续检测
    while(1)
    {
        // 默认阻塞
        // rdset 中是委托内核检测的所有的文件描述符
        rdtemp = rdset;
        int num = select(maxfd+1, &rdtemp, NULL, NULL, NULL);
        // rdset中的数据被内核改写了, 只保留了发生变化的文件描述的标志位上的1, 没变化的改为0
        // 只要rdset中的fd对应的标志位为1 -> 缓冲区有数据了
        // 判断
        // 有没有新连接
        if(FD_ISSET(lfd, &rdtemp))
        {
            // 接受连接请求, 这个调用不阻塞
            struct sockaddr_in cliaddr;
            int cliLen = sizeof(cliaddr);
            int cfd = accept(lfd, (struct sockaddr*)&cliaddr, &cliLen);

            // 得到了有效的文件描述符
            // 通信的文件描述符添加到读集合
            // 在下一轮select检测的时候, 就能得到缓冲区的状态
            FD_SET(cfd, &rdset);
            // 重置最大的文件描述符
            maxfd = cfd > maxfd ? cfd : maxfd;
        }

        // 没有新连接, 通信
        for(int i=0; i<maxfd+1; ++i)
        {
			// 判断从监听的文件描述符之后到maxfd这个范围内的文件描述符是否读缓冲区有数据
            if(i != lfd && FD_ISSET(i, &rdtemp))
            {
                // 接收数据
                char buf[10] = {0};
                // 一次只能接收10个字节, 客户端一次发送100个字节
                // 一次是接收不完的, 文件描述符对应的读缓冲区中还有数据
                // 下一轮select检测的时候, 内核还会标记这个文件描述符缓冲区有数据 -> 再读一次
                // 	循环会一直持续, 知道缓冲区数据被读完位置
                int len = read(i, buf, sizeof(buf));
                if(len == 0)
                {
                    printf("客户端关闭了连接...\n");
                    // 将检测的文件描述符从读集合中删除
                    FD_CLR(i, &rdset);
                    close(i);
                }
                else if(len > 0)
                {
                    // 收到了数据
                    // 发送数据
                    write(i, buf, strlen(buf)+1);
                }
                else
                {
                    // 异常
                    perror("read");
                }
            }
        }
    }

    return 0;
}


在上面的代码中,创建了两个 fd_set 变量,用于保存要检测的读集合:

// 初始化检测的读集合
fd_set rdset;
fd_set rdtemp;


rdset 用于保存要检测的原始数据,这个变量不能作为参数传递给 select 函数,因为在函数内部这个变量中的值会被内核修改,函数调用完毕返回之后,里边就不是原始数据了,大部分情况下是值为 1 的标志位变少了,不可能每一轮检测,所有的文件描述符都是就行的状态。因此需要通过 rdtemp 变量将原始数据传递给内核,select () 调用完毕之后再将内核数据传出,这两个变量的功能是不一样的。

 

客户端代码:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>

int main()
{
    // 1. 创建用于通信的套接字
    int fd = socket(AF_INET, SOCK_STREAM, 0);
    if(fd == -1)
    {
        perror("socket");
        exit(0);
    }

    // 2. 连接服务器
    struct sockaddr_in addr;
    addr.sin_family = AF_INET;     // ipv4
    addr.sin_port = htons(9999);   // 服务器监听的端口, 字节序应该是网络字节序
    inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr.s_addr);
    int ret = connect(fd, (struct sockaddr*)&addr, sizeof(addr));
    if(ret == -1)
    {
        perror("connect");
        exit(0);
    }

    // 通信
    while(1)
    {
        // 读数据
        char recvBuf[1024];
        // 写数据
        // sprintf(recvBuf, "data: %d\n", i++);
        fgets(recvBuf, sizeof(recvBuf), stdin);
        write(fd, recvBuf, strlen(recvBuf)+1);
        // 如果客户端没有发送数据, 默认阻塞
        read(fd, recvBuf, sizeof(recvBuf));
        printf("recv buf: %s\n", recvBuf);
        sleep(1);
    }

    // 释放资源
    close(fd); 

    return 0;
}


客户端不需要使用 IO 多路转接进行处理,因为客户端和服务器的对应关系是 1:N,也就是说客户端是比较专一的,只能和一个连接成功的服务器通信。

虽然使用 select 这种 IO 多路转接技术可以降低系统开销,提高程序效率,但是它也有局限性:

(1) 待检测集合(第 2、3、4 个参数)需要频繁的在用户区和内核区之间进行数据的拷贝,效率低

(2) 内核对于 select 传递进来的待检测集合的检测方式是线性的

  • 如果集合内待检测的文件描述符很多,检测效率会比较低
  • 如果集合内待检测的文件描述符相对较少,检测效率会比较高

(3) 使用select能够检测的最大文件描述符个数有上限,默认是1024,这是在内核中被写死了的。

 

4. select实现简单聊天室 

4.1 服务器

//服务器端
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>        
#include <sys/socket.h>
#include <arpa/inet.h>
#include <string.h>
#include <signal.h>
#include <sys/select.h>
#include <fcntl.h>
 
//最多允许的客户端数量
#define NUM 100
 
int serverSocket,clientSocket[NUM];
int currentNum = 0;      //当前客户端数量
 
void hand(int val){
	//7. 关闭连接
	for(int i = 0;i < NUM; i++){
		if(-1 != clientSocket[i])
			close(clientSocket[i]);
	}
	close(serverSocket);
	printf("bye bye!\n");
	exit(0);
}
 
int main(int argc,char* argv[]){
	if(argc != 3) printf("请输入ip地址和端口号!\n"),exit(0);
	printf("ip: %s     port:%d\n",argv[1],atoi(argv[2]));
 
	signal(SIGINT,hand);
 
	//1. 创建socket 参数一: 协议类型(版本) 参数二: 通信媒介 参数三: 保护方式
	serverSocket = socket(AF_INET,SOCK_STREAM,0);
	if(-1 == serverSocket) printf("创建socket失败:%m\n"),exit(-1);
	printf("创建socket成功!\n");
 
	//2. 创建服务器协议地址簇
	struct sockaddr_in sAddr = { 0 };
	sAddr.sin_family = AF_INET;        //协议类型 和socket函数第一个参数一致
	sAddr.sin_addr.s_addr = inet_addr(argv[1]);  //将字符串转整数
	sAddr.sin_port = htons(atoi(argv[2]));    //将字符串转整数,再将小端转换成大端
 
	//3. 绑定服务器协议地址簇
	int r = bind(serverSocket,(struct sockaddr*)&sAddr,sizeof sAddr);
	if(-1 == r) printf("绑定失败:%m\n"),close(serverSocket),exit(-2);
	printf("绑定成功!\n");
 
	//4. 监听  
	r = listen(serverSocket,10);   //数量
	if(-1 == r) printf("监听失败:%m\n"),close(serverSocket),exit(-3);
	printf("监听成功!\n");
 
 
	//初始化客户端描述符号数组
	for (int i = 0; i < NUM; ++i){
		clientSocket[i] = -1;
	}
	//开始监视
	//不仅需要监视serverSocket还要监视每一个返回回来的clientSocket
	fd_set fds;
 
	int maxFd;       //最大描述符号
	struct sockaddr_in cAddr = {0};
	int len = sizeof(cAddr);
	int cfd;
 
	char buff[1024] = {0};
 
	maxFd = 0;
	maxFd = ((maxFd > serverSocket) ? maxFd : serverSocket);
 
	while(1){
		FD_ZERO(&fds);   //清空监视集合
 
		FD_SET(serverSocket,&fds);    //将服务器socketFd放到监视集合之中
 
		//将客户端socketFd放到监视集合之中
		for (int i = 0; i < NUM; ++i){
			if(-1 != clientSocket[i]){
				FD_SET(clientSocket[i],&fds);
			}
		}
 
		//开始监视
		r = select(maxFd+1,&fds,NULL,NULL,NULL);
		if(-1 == r)
			printf("服务器崩溃:%m\n"),close(serverSocket),exit(-1);
		else if(0 == r){
			printf("服务器处于等待状态!\n");
			continue;
		}else{
			//检查是不是serverSocket的动静
			if(FD_ISSET(serverSocket,&fds)){
				cfd = accept(serverSocket,NULL,NULL);
				if(-1 == cfd){
					printf("客户端连接失败!\n");
				}else{
					printf("有客户端连接上服务器了:%d\n",cfd);
 
					//保存客户端描述符号
					for (int i = 0; i < NUM; ++i){
						if(-1 == clientSocket[i]){
							clientSocket[i] = cfd;
							maxFd = ((maxFd > cfd) ? maxFd : cfd);
							break;
						}
					}
				}
			}
		}
		//检查客户端是否有动静
		for (int i = 0; i < NUM; ++i){
			if(-1 != clientSocket[i] && FD_ISSET(clientSocket[i],&fds)){
				r = recv(clientSocket[i],buff,1023,0);
				if(r > 0){
					buff[r] = 0;
					printf("%d >> %s\n",clientSocket[i], buff);
 
					//服务器将数据转发给每一个在线的客户端(除了发消息给服务器的客户端)
					char tBuff[2048];
					sprintf(tBuff,"来自%d客户端发给服务器的消息:%s",clientSocket[i],buff);
					for(int j = 0; j < NUM; j++){
						if(-1 != clientSocket[j] && clientSocket[i] != clientSocket[j]){
							send(clientSocket[j],tBuff,strlen(tBuff),0);
						}
					}
				}else{
					printf("客户端: %d 已经断开连接了\n",clientSocket[i]);
					clientSocket[i] = -1;
				}
			}
		}
	}
 
	return 0;
}

 

4.2 客户端

//客户端
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>        
#include <sys/socket.h>
#include <arpa/inet.h>
#include <string.h>
#include <signal.h>
 
int clientSocket;
void hand(int val){
	//5. 关闭连接
	close(clientSocket);
	printf("bye bye!\n");
	exit(0);
}
int main(int argc,char* argv[]){
	if(argc != 3) printf("请输入ip地址和端口号!\n"),exit(0);
	printf("ip: %s     port:%d\n",argv[1],atoi(argv[2]));
 
	signal(SIGINT,hand);
 
	//1. 创建socket 参数一: 协议类型(版本) 参数二: 通信媒介 参数三: 保护方式
	clientSocket = socket(AF_INET,SOCK_STREAM,0);
	if(-1 == clientSocket) printf("创建socket失败:%m\n"),exit(-1);
	printf("创建socket成功!\n");
 
	//2. 创建服务器协议地址簇
	struct sockaddr_in cAddr = { 0 };
	cAddr.sin_family = AF_INET;
	cAddr.sin_addr.s_addr = inet_addr(argv[1]);  //将字符串转整数
	cAddr.sin_port = htons(atoi(argv[2]));    //将字符串转整数,再将小端转换成大端
 
	//3.连接服务器
	int r = connect(clientSocket,(struct sockaddr*)&cAddr,sizeof cAddr);
	if(-1 == r) printf("连接服务器失败:%m\n"),close(clientSocket),exit(-2);
	printf("连接服务器成功!\n");
 
	
	//开始监视
	//不仅要监视标准输入设备, 还要监视clientSocket服务器是否发送数据
	fd_set fds;
 
	int maxFd = clientSocket > 0 ? clientSocket : 0;
	char buff[2048] = {0};
	while(1){
		//清空集合
		FD_ZERO(&fds);
		//将标准输入输出放入到集合中
		FD_SET(0,&fds);
		//将clientSocket放入到监视集合中
		FD_SET(clientSocket,&fds);
 
		//开始监视
		r = select(maxFd + 1, &fds, NULL,NULL,NULL);
		if(-1 == r)
			printf("客户端崩溃:%m\n"),close(clientSocket),exit(-1);
		else if(0 == r){
			printf("客户端处于等待状态!\n");
			continue;
		}else{
			memset(buff,0,2048);
			//如果 0 有动静就向服务器发消息
			if(FD_ISSET(0,&fds)){
				scanf("%s",buff);
				send(clientSocket,buff,strlen(buff),0);
				continue;
			}
			//如果 clientSocket有动静就接收服务器发来的消息
			if(FD_ISSET(clientSocket,&fds) && -1 != clientSocket){
				memset(buff,0,2048);
				printf("服务器发来了客户端的消息!\n");
				r = recv(clientSocket,buff,2047,0);
				if(r > 0){
					buff[r] = 0;
					printf("服务器发来消息 >> %s\n",buff);
				}
			}
		}
	}
 
	return 0;
}

运行结果:

 

 

作者: 苏丙榅

链接: https://subingwen.cn/linux/select

来源: 爱编程的大丙

著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。