Linux的I/O复用技术:poll

发布时间 2023-06-03 16:29:38作者: 韓さん

poll:
poll系统调用和select类似,也是在指定时间内轮询一定数量的文件描述符,以测试其中是否有就绪的文件描述符。
优点:
1.相比select来讲,它没有fd数量的限制,理论上打开fd的数目跟系统内在有关;
2.也不用每次都把fd集合从用户区拷贝数据到内核,它使用一个  struct pollfd结构体来维护每个fd;
缺点:
它本质上是和select一样的,只是描述fd集合的方式不同,poll使用pollfd结构而不是select的fd_set结构,其他的都差不多。
适用场景:
适用于所监视文件描述符少的场景;

poll的原型如下:

#include<poll.h>
int poll(struct pollfd* fds, nfds_t nfds, int timeout);

poll结构体定义如下:

 struct pollfd
{
    int fd;             /*文件描述符*/
    short events;  /*注册的事件*/
    short revents; /*实际发生的事件,由内核填充*/
};

像ssh这样的登录服务通常要同时处理网络连接和用户输入,这可以用I/O复用实现。我们尝试用poll实现简单的聊天室程序,可以让所有用户实现在线群聊。客户端功能主要是从标准输入端读入数据,并将数据发送至服务器,并打印出服务器发送给他的数据。服务器端功能主要是接受客户数据,并把数据发送至登录到该服务器的每个客户端(发送数据者除外)

pollclient.cpp

客户端使用poll同时监听用户输入和网络连接,并利用splice函数将用户输入内容直接定向到网络连接上发送,从而实现数据零拷贝。

#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <poll.h>
#include <stdlib.h>

int main(int argc, char* argv[])
{
    if (argc <= 2)
    {
      printf("argc error!\n");
      return -1;
    }
    const char* ip = argv[1];
    int port = atoi(argv[2]);
    printf( "ip is %s and port is %d\n", ip, port );
    int ret = 0;
    struct sockaddr_in addr;
    bzero(&addr, sizeof(addr));
    addr.sin_family = AF_INET;
    addr.sin_port = htons(port);
    inet_pton(AF_INET, ip, &addr.sin_addr);
    
    int sockfd = socket(PF_INET, SOCK_STREAM, 0);
    assert(sockfd >= 0);
    if (connect(sockfd, (struct sockaddr*)&addr, sizeof(addr)) < 0)
    {
        printf("connection failed\n");
        close(sockfd);
        return 1;
    }
    pollfd fds[2];
/*注册文件描述符0(标准输入)和文件描述符sockfd上的可读事件*/ fds[
0].fd = 0; fds[0].events = POLLIN; fds[0].revents = 0; fds[1].fd = sockfd; fds[1].events = POLLIN | POLLRDHUP; fds[1].revents = 0; char buf[64]; int pipefd[2]; ret = pipe(pipefd); assert(ret != -1); while(1) { ret = poll(fds, 2, -1); if (ret < 0) { printf("poll failure\n"); break; } if(fds[1].revents & POLLRDHUP)//挂起。比如管道的写端被关闭后,读端描述符上将收到POLLHUP事件 { printf( "server close the connection\n" ); break; } else if(fds[1].revents & POLLIN) //数据(包括普通数据和优先数据)可读 { memset(buf, '\0', 64); recv(fds[1].fd, buf, 63, 0); printf("%s\n", buf); } if(fds[0].revents & POLLIN) {
/*使用splice将用户输入的数据直接写到sockfd上(零拷贝)*/ ret
= splice(0, NULL, pipefd[1], NULL, 32768, SPLICE_F_MORE | SPLICE_F_MOVE); ret = splice(pipefd[0], NULL, sockfd, NULL, 32768, SPLICE_F_MORE | SPLICE_F_MOVE); } } close(sockfd); return 0; }

pollserver.cpp

服务器端用poll来同时处理监听socket和连接socket。并用时间换空间的策略来提高性能;

#include<sys/types.h>
#include<sys/msg.h>
#include<sys/ipc.h>
#include<sys/stat.h>
#include<stdio.h>
#include<string.h>
#include<pthread.h>
#include<stdlib.h>
#include<unistd.h>
#include<iostream>
#include<sys/wait.h>
#include<sys/socket.h>
#include<sys/epoll.h>
#include<sys/ipc.h>
#include<errno.h>
#include<sys/shm.h>
#include<fcntl.h>
#include<semaphore.h>
#include<arpa/inet.h>
#include<iostream>
#include<assert.h>
#include<ctype.h>
#include<time.h>
#include <poll.h>
using namespace std;
//客户数据
struct client_data { sockaddr_in addr; //客户端socket地址 char* write_buf; //待写到客户端的数据的位置 char buf[64]; //从客户端读入的数据 }; int setnonblocking(int fd) { int oldt = fcntl(fd, F_GETFL); int newt = oldt | O_NONBLOCK; fcntl(fd, F_SETFL, newt); return oldt; } int main(int argc, char* argv[]) { if (argc <= 2) { printf("argc error!\n"); return -1; } const char* ip = argv[1]; int port = atoi(argv[2]); int ret = 0; struct sockaddr_in addr; bzero(&addr, sizeof(addr)); addr.sin_family = AF_INET; inet_pton(AF_INET, ip, &addr.sin_addr); addr.sin_port = htons(port); int listenfd = socket(PF_INET, SOCK_STREAM, 0); assert(listenfd >= 0); ret = bind(listenfd, (struct sockaddr* )&addr, sizeof(addr)); assert(ret != -1); ret = listen(listenfd, 5); assert(ret != -1); /*
创建users数组,分配65535个client_data对象。
可以预期:每个可能的socket连接都可以获得一个这样的对象,并且socket的值可以直接用来索引(作为数组的下标)socket连接对应的client_data对象,
这是将socket和客户数据相关联的简单而高效的方式
*/ client_data
* users = new client_data[65535];
//尽管我们分配了足够多的client_data对象,但为了提高poll的性能,仍然有必要限制用户的数量 pollfd fds[
6]; int num = 0; int i; for(i=1; i<=5; i++) { fds[i].fd = -1; fds[i].events = 0; } fds[0].fd = listenfd; fds[0].events = POLLIN | POLLERR; fds[0].revents = 0; while(1) { ret = poll(fds, num+1, -1); if(ret<0) { printf("poll failure\n"); break; } int i; for(i=0; i<num+1; i++) { if ((fds[i].fd == listenfd) && (fds[i].revents & POLLIN)) { struct sockaddr_in clientAddr; socklen_t clientAddrLen = sizeof(clientAddr); int connfd = accept(listenfd, (struct sockaddr *)&clientAddr, &clientAddrLen); if (connfd < 0) { printf("errno is: %d\n", errno); continue; }
//如果请求过多,则关闭新到的连接
if(num >= 5) { const char* info = "too many users\n"; printf("%s", info); send(connfd, info, strlen(info), 0); close(connfd); continue; }
//对于新的连接,同时修改fds和users数组。前文已经提到,users[connfd]对应于新连接文件描述符connfd的客户数据 num
++; users[connfd].addr = clientAddr; setnonblocking(connfd); fds[num].fd = connfd; fds[num].events = POLLIN | POLLRDHUP | POLLERR; fds[num].revents = 0; printf("come a new user, now have %d users\n", num); } else if(fds[i].revents & POLLERR) //错误 { printf("get an error from %d\n", fds[i].fd); char errors[100]; memset(errors, '\0', 100); socklen_t len = sizeof(errors); if (getsockopt(fds[i].fd, SOL_SOCKET, SO_ERROR, &errors, &len) < 0) { printf("get socket option failed\n"); } continue; } else if(fds[i].revents & POLLRDHUP)//挂起。比如管道的写端被关闭后,读端描述符上将收到POLLHUP事
{
                //如果客户端关闭连接,则服务器也关闭对应的连接,并将用户总数减1
                users[fds[i].fd] = users[fds[num].fd];
                close(fds[i].fd);
                fds[i] = fds[num];
                i--;
                num--;
                printf("a client left\n");
            }
            else if(fds[i].revents & POLLIN)
            {
                int connfd = fds[i].fd;
                memset(users[connfd].buf, '\0', 64);
                ret = recv(connfd, users[connfd].buf, 63, 0);
                printf("get %d bytes of client data %s from %d\n", ret, users[connfd].buf, connfd);
                if(ret < 0)
                {
//如果读操作出错,则关闭连接
if (errno != EAGAIN) { close(connfd); users[fds[i].fd] = users[fds[num].fd]; fds[i] = fds[num]; i--; num--; } } else if(ret == 0) { printf("code should not come to here\n"); } else {
//如果接收到客户数据,则通知其他socket连接准备写数据
int j; for(j=1; j<=num; j++) { if (fds[j].fd == connfd) continue; fds[j].events |= ~POLLIN; fds[j].events |= POLLOUT; users[fds[j].fd].write_buf = users[connfd].buf; } } } else if(fds[i].revents & POLLOUT) { int connfd = fds[i].fd; if (!users[connfd].write_buf) continue; ret = send(connfd, users[connfd].write_buf, strlen(users[connfd].write_buf), 0); users[connfd].write_buf = NULL;
//写完数据后需要重新注册fds[i]上的可读事件 fds[i].events
|= ~POLLOUT; fds[i].events |= POLLIN; } } } delete [] users; close(listenfd); return 0; }