多线程select并发

发布时间 2023-11-07 23:43:20作者: LiviaYu

单纯select的问题

之前的代码中,建立连接和接收数据是线性执行的关系,也就是说,建立连接时不能接收,接收时不能建立连接,所以效率仍然不够高

解决方法

主线程中一直执行select函数,检测文件描述符的状态,让子线程去进行通信

建立子线程的位置

需要在接收到客户端连接后,用子线程去处理这个文件描述符

检测select函数的返回值之后,进行子线程的创建
同理,通信中也需要用到子线程

多线程的共享资源

在此情况下,共享资源为maxfd和rdset

所以在修改时需要加锁

代码

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <iostream>
#include <pthread.h>
#include <sys/select.h>
#include <mutex>

pthread_mutex_t mtx;

struct fdinfo
{
    int fd;
    int* maxfd;
    fd_set* rdset;
};
void* acceptConn(void* arg)
{
    fdinfo* info = (fdinfo*)arg;
    // 接受连接请求, 这个调用不阻塞
    struct sockaddr_in cliaddr;
    int cliLen = sizeof(cliaddr);
    int cfd = accept(info->fd, (struct sockaddr*)&cliaddr, (socklen_t*)&cliLen);

    // 得到了有效的文件描述符
    // 通信的文件描述符添加到读集合
    // 在下一轮select检测的时候, 就能得到缓冲区的状态
    pthread_mutex_lock(&mtx);
    FD_SET(cfd, info->rdset);
    // 重置最大的文件描述符
    *info->maxfd = cfd > *info->maxfd ? cfd : *info->maxfd;
    pthread_mutex_unlock(&mtx);
    delete info;
    return NULL;

}
void* communication(void* arg)
{
    fdinfo* info = (fdinfo*)arg;
    // 接收数据
    char buf[100];
    // 一次只能接收10个字节, 客户端一次发送100个字节
    // 一次是接收不完的, 文件描述符对应的读缓冲区中还有数据
    // 下一轮select检测的时候, 内核还会标记这个文件描述符缓冲区有数据 -> 再读一次
    // 	循环会一直持续, 知道缓冲区数据被读完位置
    int len = read(info->fd, buf, sizeof(buf));
    if (len == 0)
    {
        printf("客户端关闭了连接...\n");
        // 将检测的文件描述符从读集合中删除
        pthread_mutex_lock(&mtx);
        FD_CLR(info->fd, info->rdset);
        pthread_mutex_unlock(&mtx);
        close(info->fd);
        delete info;
        return NULL;
    }
    else if (len > 0)
    {
        // 收到了数据
        std::cout << "client says: " << buf << std::endl;
        // 发送数据
        write(info->fd, buf, strlen(buf) + 1);
    }
    else
    {
        // 异常
        perror("read");
        delete info;
        return NULL;
    }
    delete info;
    return NULL;
}

int main()
{
    pthread_mutex_init(&mtx,NULL);

    std::cout<<"START SUCCESS"<<std::endl;
    // 1. 创建监听的fd
    int lfd = socket(AF_INET, SOCK_STREAM, 0);

    // 2. 绑定
    struct sockaddr_in addr;
    addr.sin_family = AF_INET;
    addr.sin_port = htons(9999);
    addr.sin_addr.s_addr = INADDR_ANY;
    bind(lfd, (struct sockaddr*)&addr, sizeof(addr));

    // 3. 设置监听
    listen(lfd, 128);

    // 将监听的fd的状态检测委托给内核检测
    int maxfd = lfd;
    // 初始化检测的读集合
    fd_set rdset;
    fd_set rdtemp;
    // 清零
    FD_ZERO(&rdset);
    // 将监听的lfd设置到检测的读集合中
    FD_SET(lfd, &rdset);
    // 通过select委托内核检测读集合中的文件描述符状态, 检测read缓冲区有没有数据
    // 如果有数据, select解除阻塞返回
    // 应该让内核持续检测
    while (1)
    {
        // 默认阻塞
        // rdset 中是委托内核检测的所有的文件描述符
        pthread_mutex_lock(&mtx);
        rdtemp = rdset;
        pthread_mutex_unlock(&mtx);
        int num = select(maxfd + 1, &rdtemp, NULL, NULL, NULL);
        // rdset中的数据被内核改写了, 只保留了发生变化的文件描述的标志位上的1, 没变化的改为0
        // 只要rdset中的fd对应的标志位为1 -> 缓冲区有数据了
        // 判断
        // 有没有新连接
        if (FD_ISSET(lfd, &rdtemp))
        {
            
            //子线程
            pthread_t tid;
            fdinfo *info=new fdinfo;
            info->maxfd = &maxfd;
            info->fd = lfd;
            info->rdset = &rdset;
            pthread_create(&tid, NULL, acceptConn, info);
            pthread_detach(tid);

        }

        // 没有新连接, 通信
        for (int i = 0; i < maxfd + 1; ++i)
        {
            // 判断从监听的文件描述符之后到maxfd这个范围内的文件描述符是否读缓冲区有数据
            if (i != lfd && FD_ISSET(i, &rdtemp))
            {
                //子线程
                pthread_t tid;
                fdinfo* info = new fdinfo;
                info->fd = lfd;
                info->rdset = &rdset;
                pthread_create(&tid, NULL, communication, info);
                pthread_detach(tid);
                
            }
        }
    }
    pthread_mutex_destroy(&mtx);
    return 0;
}