关于线程池原理实现(C语言、C++)

发布时间 2023-08-30 14:44:10作者: 风中凌乱的猪头

1、进程和线程

  1、进程:进程是操作系统中独立执行单位,每个进程都有自己独立的的内存空间,

     所以优点:隔离性强:不同的进程之间相互独立,一个进程的崩溃不会影响到其他进程的运行

          稳定性高:进程之间分配独立的内存空间,一个进程的错误不会直接影响其他进程

          可靠性高:操作系统可以为每个进城分配独立的资源,确保他们不会互相干扰

          支持多核:不同的进程可以在不同的处理器核心上并行执行,充分利用多核处理器的性能

       缺点:创建销毁代价大:创建和销毁进程的代价相对较大,涉及到资源分配、内存管理等复杂操作。

          上下文切换开销大:由于进程切换需要保存和恢复大量的上下文信息,操作系统需要耗费较多的开销。

          进程间通信复杂:进程之间的通信需要通过操作系统提供的进程间通信机制,如管道、共享内存、消息队列等,实现起来较为复杂。

  2、线程:是进程内的一个执行流,它共享相同的地址空间和其他资源,并使用进程的上下文来进行执行。线程之间的通信更加简单和高效,因为它们共享同一进程的资源

       所以优点:创建销毁代价小:线程的创建和销毁比进程轻量,只需分配一些栈空间和少量的管理结构。

          上下文切换开销小:线程之间的上下文切换开销远小于进程的上下文切换,因为它们共享相同的地址空间。

          共享资源:线程可以直接访问进程的共享资源,简化了进程间通信的复杂性。

          响应速度快:线程的创建和切换速度快,能够更快地响应事件和处理任务。

         缺点 :缺乏隔离性:线程之间共享相同的地址空间,一个线程的错误可能会影响到其他线程,造成整个进程崩溃。

            安全性问题:由于多个线程共享数据和资源,必须保证线程之间的同步和互斥,否则可能引发数据竞争和不一致的结果。

            可靠性较低:一个线程的bug或异常可能会导致整个进程的崩溃,因为它们共享相同的进程资源。

2、为什么要线程池

  线程池是一种管理和复用线程的机制,它内部维护了一组线程,可以根据需要自动创建、复用和回收这些线程

  解决的问题

  1. 减少线程创建和销毁的开销:线程的创建和销毁都需要消耗系统资源,包括内存、CPU时间等。如果在需要执行任务时频繁地创建和销毁线程,会带来较大的开销。而线程池可以预先创建一定数量的线程,这些线程可以被重复利用,减少了线程创建和销毁的开销。

  2. 控制线程数量:在某些情况下,同时创建大量的线程可能会导致系统负载过重,降低系统的性能甚至导致崩溃。通过使用线程池,可以控制线程的数量,并通过设置线程池的参数来限制并发执行的任务数量,从而更好地平衡系统资源的利用。

  3. 提高响应速度:线程池可以在任务到达时立即执行,而不需要等待线程的创建。这样可以提高任务的响应速度,减少等待时间。

  4. 提高程序的稳定性:线程池可以对线程进行统一的管理和监控,避免因为线程未正确销毁或异常终止而导致整个程序的崩溃。线程池可以监控线程的状态,及时检测到线程出现异常或错误,并进行相应的处理。

  5. 提供任务队列:线程池通常会使用任务队列来存储待执行的任务,这样可以实现任务的排队和调度。任务队列可以有效地管理任务,避免任务过多导致资源竞争和系统负载过重的问题。

3、C语言第一版(不可伸缩线程池的实现)

  利用任务队列以及线程池 

#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<pthread.h>
#include<unistd.h>

//

//任务队列
struct job
{
    void *(*func)(void *arg);
    void *arg;
    struct job *next;
};


struct threadpool
{
    int thread_num;//线程池中工作的线程
    pthread_t *pthread_ids;//保存线程池中线程的id

    struct job *head;//任务队列的头
    struct job *tail;//任务队列的尾
    int queue_max_num;//任务队列的最大数
    int queue_cur_num;//任务队列已有多少任务

    pthread_mutex_t mutex;
    pthread_cond_t queue_empty;//任务为空的条件
    pthread_cond_t queue_not_empty;//任务队列不为空的条件
    pthread_cond_t queue_not_full;//任务队列不为满的条件

};

void *threadpool_function(void *arg)
{
    struct threadpool *pool = (struct threadpool *)arg;
    struct job *pjob = NULL;
    
    while(1)
    {
        pthread_mutex_lock(&(pool->mutex));

        while(pool->queue_cur_num == 0)
        {
            pthread_cond_wait(&(pool->queue_not_empty),&(pool->mutex));
        }

        pjob = pool->head;
        pool->queue_cur_num--;
        if(pool->queue_cur_num != pool->queue_max_num)
        {
            pthread_cond_broadcast(&(pool->queue_not_full));
        }

        if(pool->queue_cur_num == 0)
        {
            pool->tail = pool->head = NULL;
            pthread_cond_broadcast(&(pool->queue_empty));
        }
        else
        {
            pool->head = pool->head->next;
        }

        pthread_mutex_unlock(&(pool->mutex));

        pjob->func(pjob->arg);
        free(pjob);
        pjob = NULL;
    }
}

struct threadpool *threadpool_init(int thread_num,int queue_max_num)
{
    struct threadpool *pool = (struct threadpool *)malloc(sizeof(struct threadpool));

    pool->queue_max_num = queue_max_num;
    pool->queue_cur_num = 0;
    pool->head = NULL;
    pool->tail = NULL;

    pthread_mutex_init(&(pool->mutex),NULL);
    pthread_cond_init(&(pool->queue_empty),NULL);
    pthread_cond_init(&(pool->queue_not_empty),NULL);
    pthread_cond_init(&(pool->queue_not_full),NULL);

    pool->thread_num = thread_num;
    pool->pthread_ids = (pthread_t *)malloc(sizeof(pthread_t) * thread_num);
    //可以检测一下是否分配成功

    for(int i = 0;i < pool->thread_num;i++)
    {
        pthread_create(&(pool->pthread_ids[i]),NULL,threadpool_function,(void *)pool);
    }

    return pool;
}

void threadpool_add_job(struct threadpool *pool,void *(*func)(void *),void * arg)
{
    //队列是否为满
    //复习一下条件变量
    pthread_mutex_lock(&(pool->mutex));
    while(pool->queue_cur_num == pool->queue_max_num)
    {
        pthread_cond_wait(&(pool->queue_not_full),&(pool->mutex));//等待这个
    }

    struct job *pjob = (struct job*)malloc(sizeof(struct job));
    pjob->func = func;
    pjob->arg = arg;
    pjob->next = NULL;

    //pjob->func(pjob->arg);//调用
    if(pool->head == NULL)
    {
        pool->head = pool->tail = pjob;
        pthread_cond_broadcast(&(pool->queue_not_empty));
    }
    else
    {
        pool->tail->next = pjob;
        pool->tail = pjob;
    }

    pool->queue_cur_num++;
    pthread_mutex_unlock(&(pool->mutex));
}
void thread_destory_pool(struct threadpool *pool)
{
    //灭有执行完的线程怎么办
    pthread_mutex_lock(&(pool->mutex));
    while (pool->queue_cur_num != 0)
    {
        pthread_cond_wait(&(pool->queue_empty),&(pool->mutex));
    }
    pthread_mutex_unlock(&(pool->mutex));

    //唤醒结束所有的线程
    pthread_cond_broadcast(&(pool->queue_not_empty));
    pthread_cond_broadcast(&(pool->queue_not_full));

    for(int i = 0; i < pool->queue_cur_num;i++)
    {
        //cancel有bug 线程发生系统调用的时候,才会调用cancel删除
        //pthread_cancel(pool->pthread_ids[i]);
        pthread_join(pool->pthread_ids[i],NULL);
    }
    pthread_mutex_destroy(&(pool->mutex));
    pthread_cond_destroy(&(pool->queue_empty));
    pthread_cond_destroy(&(pool->queue_not_full));
    pthread_cond_destroy(&(pool->queue_not_empty));

    //申请的内存还回去
    free(pool->pthread_ids);
    struct job *temp;
    while(pool->head != NULL)
    {
        temp = pool->head;
        pool->head = temp->next;
        free(temp);
    }
    free(pool);
}

void * work(void *arg)
{
    char *p = (char *)arg;
    printf("%s\n",p);
    printf("hello\n");
    printf("world\n");
    sleep(1);
}

int main()
{
    struct threadpool * pool = threadpool_init(10,100);
    threadpool_add_job(pool,work,"1");
    threadpool_add_job(pool,work,"2");
    threadpool_add_job(pool,work,"3");
    sleep(10);//主线程太快,子线程没有来得及运行

    thread_destory_pool(pool);
    
    return 0;
}
View Code

4、C语言第二版(可伸缩的线程池实现)

  利用任务队列、线程池以及管理线程池的线程

#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<pthread.h>
#include<unistd.h>


//可伸缩式线程池
//管理线程
/**
 * 任务多,线程少;添加线程
 * 任务少,线程多;关闭线程
 * 当前任务量 > 线程数 * 2 =》添加线程
 * 当前任务量 * 2 < 线程数 =》关闭线程 

*/
typedef struct Task
{
    void(*function)(void *arg);
    void *arg;
}Task;

typedef struct Threadpool
{
    Task *TaskQ;//任务队列
    int queueCapacity;    //容量
    int queueSize;    //当前任务数量
    int queueFront;        //队头 取数据
    int queueRear;        //队尾    放数据
    pthread_t managerID;    //管理者线程
    pthread_t *threadIDs;    //工作线程
    int minNum;        //最小线程数
    int Maxnum;        //最大线程数
    int busyNum;    //忙的线程数
    int liveNum;    //存活线程数
    int exitNum;    //要销毁线程数
    pthread_mutex_t mutexpool;        //锁整个线程池
    pthread_mutex_t mutexBusy;        //锁busyNum
    pthread_cond_t notFull;                //任务队列是否满了            
    pthread_cond_t notEmpty;            //任务队列是否为空

    int shutdown;    //是否销毁线程池,销毁为1,不销毁为0
}Threadpool;

Threadpool *threadpoolCreat(int min ,int max,int queueSize);
void *worker(void* arg);
void *manage(void *arg);
void pthreadExit(Threadpool *pool);
void threadpoolAdd(Threadpool *pool, void(*fun)(void *), void* arg);
int threadPoolAliveNum(Threadpool* pool);
int threadPoolBusyNum(Threadpool* pool);
int threadPoolDestroy(Threadpool *pool);


const int NUMBER = 2;

Threadpool * threadpoolCreat(int min, int max, int queueSize)
{
    Threadpool *pool = (Threadpool *)malloc(sizeof(Threadpool));
    do
    {
        if (pool == NULL)
        {
            printf("malloc threadpool fail!\n");
            break;
        }
        pool->threadIDs = (pthread_t *)malloc(sizeof(pthread_t)*max);
        if (pool->threadIDs == NULL)
        {
            printf("malloc threadIDs fail\n");
            break;
        }
        memset(pool->threadIDs, 0, sizeof(pthread_t) * max);
        pool->minNum = min;
        pool->Maxnum = max;
        pool->busyNum = 0;
        pool->liveNum = min;
        pool->exitNum = 0;

        if (pthread_mutex_init(&pool->mutexBusy, NULL) != 0 ||
            pthread_mutex_init(&pool->mutexpool, NULL) != 0 ||
            pthread_cond_init(&pool->notEmpty, NULL) != 0 ||
            pthread_cond_init(&pool->notFull, NULL) != 0)
        {
            printf("mutex or cond init fail\n");
            break;
        }

        //�������
        pool->TaskQ = (Task *)malloc(sizeof(Task) * queueSize);
        pool->queueCapacity = queueSize;
        pool->queueSize = 0;
        pool->queueFront = 0;
        pool->queueRear = 0;
        pool->shutdown = 0;

        //�����߳�
        pthread_create(&pool->managerID, NULL, manage, (void *)pool);
        for (int i = 0; i < min; i++)
        {
            pthread_create(&pool->threadIDs[i], NULL, worker, (void *)pool);
        }
        return pool;
    } while (0);
    if (pool->threadIDs)    free(pool->threadIDs);
    if (pool->TaskQ)            free(pool->TaskQ);
    if(pool)        free(pool);
    return NULL;
}

void * worker(void * arg)
{
    Threadpool *pool = (Threadpool *)arg;
    while (1)
    {
        pthread_mutex_lock(&pool->mutexpool);
        while (pool->queueSize == 0 && !pool->shutdown)
        {
            //���������߳�
            pthread_cond_wait(&pool->notEmpty, &pool->mutexpool);
            //�ж��Dz���Ҫ�����߳�
            if (pool->exitNum > 0)
            {
                pool->exitNum--;
                if (pool->liveNum > pool->minNum)
                {
                    pool->liveNum--;
                    pthread_mutex_unlock(&pool->mutexpool);
                    pthreadExit(pool);
                }
            }
        }
        if (pool->shutdown)
        {
            pthread_mutex_unlock(&pool->mutexpool);
            pthreadExit(pool);
        }
        //ȡ��һ������
        Task task;
        task.function = pool->TaskQ[pool->queueFront].function;
        task.arg = pool->TaskQ[pool->queueFront].arg;

        pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity;
        pool->queueSize--;
        pthread_cond_signal(&pool->notFull);
        pthread_mutex_unlock(&pool->mutexpool);

        pthread_mutex_lock(&pool->mutexBusy);
        pool->busyNum++;
        pthread_mutex_unlock(&pool->mutexBusy);

        task.function(task.arg);

        pthread_mutex_lock(&pool->mutexBusy);
        pool->busyNum--;
        pthread_mutex_unlock(&pool->mutexBusy);
    }
    return NULL;
}

void * manage(void * arg)
{
    Threadpool *pool = (Threadpool *)arg;
    int queueSize, liveNum, busyNum, counter;
    while (pool->shutdown == 0)
    {
        //��ʱѲ��
        sleep(2);

        //ȡ���̳߳������������͵�ǰ�̵߳�����
        pthread_mutex_lock(&pool->mutexpool);
        queueSize = pool->queueSize;
        liveNum = pool->liveNum;
        pthread_mutex_unlock(&pool->mutexpool);

        //ȡ��æ���߳�����
        pthread_mutex_lock(&pool->mutexBusy);
        busyNum = pool->busyNum;
        pthread_mutex_unlock(&pool->mutexBusy);

        //�����߳�
        if (queueSize > liveNum && liveNum < pool->Maxnum)
        {
            counter = 0;
            pthread_mutex_lock(&pool->mutexpool);
            for (int i = 0; i < pool->Maxnum&& counter < NUMBER && pool->liveNum < pool->Maxnum; i++)
            {
                if (pool->threadIDs[i] == 0)
                {
                    pthread_create(&pool->threadIDs[i], NULL, worker, (void *)pool);
                    counter++;
                    pool->liveNum++;
                }
            }
            pthread_mutex_unlock(&pool->mutexpool);
        }
        //�����߳�
        if (busyNum * 2 < liveNum&& liveNum > pool->minNum)
        {
            pthread_mutex_lock(&pool->mutexpool);
            pool->exitNum = NUMBER;
            pthread_mutex_unlock(&pool->mutexpool);
            for (int i = 0; i < NUMBER; i++)
            {
                pthread_cond_signal(&pool->notEmpty);
            }
        }
        return NULL;
    }
}
void pthreadExit(Threadpool *pool)
{
    pthread_t tid = pthread_self();
    for (int i = 0; i < pool->Maxnum; i++)
    {
        if (pool->threadIDs[i] == tid)
        {
            pool->threadIDs[i] = 0;
            break;
        }
    }
    pthread_exit(NULL);
}

void threadpoolAdd(Threadpool * pool, void(*func)(void *), void * arg)
{
    pthread_mutex_lock(&pool->mutexpool);
    while (pool->queueCapacity == pool->queueSize && pool->shutdown == 0 )//�̳߳����߳������ȴ����߳̿���
    {
        pthread_cond_wait(&pool->notFull, &pool->mutexpool);
    }
    if (pool->shutdown == 1)
    {
        pthread_mutex_unlock(&pool->mutexpool);
        return;
    }
    //��������
    pool->TaskQ[pool->queueRear].function = func;
    pool->TaskQ[pool->queueRear].arg = arg;
    pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity;
    pool->queueSize++;
    pthread_cond_signal(&pool->notEmpty);
    pthread_mutex_unlock(&pool->mutexpool);
}


int threadPoolBusyNum(Threadpool* pool)
{
    pthread_mutex_lock(&pool->mutexBusy);
    int busyNum = pool->busyNum;
    pthread_mutex_unlock(&pool->mutexBusy);
    return busyNum;
}



int threadPoolAliveNum(Threadpool* pool)
{
    pthread_mutex_lock(&pool->mutexpool);
    int aliveNum = pool->liveNum;
    pthread_mutex_unlock(&pool->mutexpool);
    return aliveNum;
}

int threadPoolDestroy(Threadpool * pool)
{
    if (pool == NULL)
    {
        return -1;
    }
    pool->shutdown = 1;
    pthread_join(pool->managerID, NULL);
    for (int i = 0; i < pool->liveNum; i++)
    {
        pthread_cond_signal(&pool->notEmpty);
    }
    if (pool->threadIDs)    free(pool->threadIDs);
    if (pool->TaskQ)            free(pool->TaskQ);
    
    pthread_mutex_destroy(&pool->mutexpool);
    pthread_mutex_destroy(&pool->mutexBusy);
    pthread_cond_destroy(&pool->notEmpty);
    pthread_cond_destroy(&pool->notFull);
    free(pool);
    pool = NULL;
    return 0;
}
View Code

5、C++线程池实现