1、进程和线程
1、进程:进程是操作系统中独立执行单位,每个进程都有自己独立的的内存空间,
所以优点:隔离性强:不同的进程之间相互独立,一个进程的崩溃不会影响到其他进程的运行
稳定性高:进程之间分配独立的内存空间,一个进程的错误不会直接影响其他进程
可靠性高:操作系统可以为每个进城分配独立的资源,确保他们不会互相干扰
支持多核:不同的进程可以在不同的处理器核心上并行执行,充分利用多核处理器的性能
缺点:创建销毁代价大:创建和销毁进程的代价相对较大,涉及到资源分配、内存管理等复杂操作。
上下文切换开销大:由于进程切换需要保存和恢复大量的上下文信息,操作系统需要耗费较多的开销。
进程间通信复杂:进程之间的通信需要通过操作系统提供的进程间通信机制,如管道、共享内存、消息队列等,实现起来较为复杂。
2、线程:是进程内的一个执行流,它共享相同的地址空间和其他资源,并使用进程的上下文来进行执行。线程之间的通信更加简单和高效,因为它们共享同一进程的资源
所以优点:创建销毁代价小:线程的创建和销毁比进程轻量,只需分配一些栈空间和少量的管理结构。
上下文切换开销小:线程之间的上下文切换开销远小于进程的上下文切换,因为它们共享相同的地址空间。
共享资源:线程可以直接访问进程的共享资源,简化了进程间通信的复杂性。
响应速度快:线程的创建和切换速度快,能够更快地响应事件和处理任务。
缺点 :缺乏隔离性:线程之间共享相同的地址空间,一个线程的错误可能会影响到其他线程,造成整个进程崩溃。
安全性问题:由于多个线程共享数据和资源,必须保证线程之间的同步和互斥,否则可能引发数据竞争和不一致的结果。
可靠性较低:一个线程的bug或异常可能会导致整个进程的崩溃,因为它们共享相同的进程资源。
2、为什么要线程池
线程池是一种管理和复用线程的机制,它内部维护了一组线程,可以根据需要自动创建、复用和回收这些线程
解决的问题
-
减少线程创建和销毁的开销:线程的创建和销毁都需要消耗系统资源,包括内存、CPU时间等。如果在需要执行任务时频繁地创建和销毁线程,会带来较大的开销。而线程池可以预先创建一定数量的线程,这些线程可以被重复利用,减少了线程创建和销毁的开销。
-
控制线程数量:在某些情况下,同时创建大量的线程可能会导致系统负载过重,降低系统的性能甚至导致崩溃。通过使用线程池,可以控制线程的数量,并通过设置线程池的参数来限制并发执行的任务数量,从而更好地平衡系统资源的利用。
-
提高响应速度:线程池可以在任务到达时立即执行,而不需要等待线程的创建。这样可以提高任务的响应速度,减少等待时间。
-
提高程序的稳定性:线程池可以对线程进行统一的管理和监控,避免因为线程未正确销毁或异常终止而导致整个程序的崩溃。线程池可以监控线程的状态,及时检测到线程出现异常或错误,并进行相应的处理。
-
提供任务队列:线程池通常会使用任务队列来存储待执行的任务,这样可以实现任务的排队和调度。任务队列可以有效地管理任务,避免任务过多导致资源竞争和系统负载过重的问题。
3、C语言第一版(不可伸缩线程池的实现)
利用任务队列以及线程池
#include<stdio.h> #include<stdlib.h> #include<string.h> #include<pthread.h> #include<unistd.h> // //任务队列 struct job { void *(*func)(void *arg); void *arg; struct job *next; }; struct threadpool { int thread_num;//线程池中工作的线程 pthread_t *pthread_ids;//保存线程池中线程的id struct job *head;//任务队列的头 struct job *tail;//任务队列的尾 int queue_max_num;//任务队列的最大数 int queue_cur_num;//任务队列已有多少任务 pthread_mutex_t mutex; pthread_cond_t queue_empty;//任务为空的条件 pthread_cond_t queue_not_empty;//任务队列不为空的条件 pthread_cond_t queue_not_full;//任务队列不为满的条件 }; void *threadpool_function(void *arg) { struct threadpool *pool = (struct threadpool *)arg; struct job *pjob = NULL; while(1) { pthread_mutex_lock(&(pool->mutex)); while(pool->queue_cur_num == 0) { pthread_cond_wait(&(pool->queue_not_empty),&(pool->mutex)); } pjob = pool->head; pool->queue_cur_num--; if(pool->queue_cur_num != pool->queue_max_num) { pthread_cond_broadcast(&(pool->queue_not_full)); } if(pool->queue_cur_num == 0) { pool->tail = pool->head = NULL; pthread_cond_broadcast(&(pool->queue_empty)); } else { pool->head = pool->head->next; } pthread_mutex_unlock(&(pool->mutex)); pjob->func(pjob->arg); free(pjob); pjob = NULL; } } struct threadpool *threadpool_init(int thread_num,int queue_max_num) { struct threadpool *pool = (struct threadpool *)malloc(sizeof(struct threadpool)); pool->queue_max_num = queue_max_num; pool->queue_cur_num = 0; pool->head = NULL; pool->tail = NULL; pthread_mutex_init(&(pool->mutex),NULL); pthread_cond_init(&(pool->queue_empty),NULL); pthread_cond_init(&(pool->queue_not_empty),NULL); pthread_cond_init(&(pool->queue_not_full),NULL); pool->thread_num = thread_num; pool->pthread_ids = (pthread_t *)malloc(sizeof(pthread_t) * thread_num); //可以检测一下是否分配成功 for(int i = 0;i < pool->thread_num;i++) { pthread_create(&(pool->pthread_ids[i]),NULL,threadpool_function,(void *)pool); } return pool; } void threadpool_add_job(struct threadpool *pool,void *(*func)(void *),void * arg) { //队列是否为满 //复习一下条件变量 pthread_mutex_lock(&(pool->mutex)); while(pool->queue_cur_num == pool->queue_max_num) { pthread_cond_wait(&(pool->queue_not_full),&(pool->mutex));//等待这个 } struct job *pjob = (struct job*)malloc(sizeof(struct job)); pjob->func = func; pjob->arg = arg; pjob->next = NULL; //pjob->func(pjob->arg);//调用 if(pool->head == NULL) { pool->head = pool->tail = pjob; pthread_cond_broadcast(&(pool->queue_not_empty)); } else { pool->tail->next = pjob; pool->tail = pjob; } pool->queue_cur_num++; pthread_mutex_unlock(&(pool->mutex)); } void thread_destory_pool(struct threadpool *pool) { //灭有执行完的线程怎么办 pthread_mutex_lock(&(pool->mutex)); while (pool->queue_cur_num != 0) { pthread_cond_wait(&(pool->queue_empty),&(pool->mutex)); } pthread_mutex_unlock(&(pool->mutex)); //唤醒结束所有的线程 pthread_cond_broadcast(&(pool->queue_not_empty)); pthread_cond_broadcast(&(pool->queue_not_full)); for(int i = 0; i < pool->queue_cur_num;i++) { //cancel有bug 线程发生系统调用的时候,才会调用cancel删除 //pthread_cancel(pool->pthread_ids[i]); pthread_join(pool->pthread_ids[i],NULL); } pthread_mutex_destroy(&(pool->mutex)); pthread_cond_destroy(&(pool->queue_empty)); pthread_cond_destroy(&(pool->queue_not_full)); pthread_cond_destroy(&(pool->queue_not_empty)); //申请的内存还回去 free(pool->pthread_ids); struct job *temp; while(pool->head != NULL) { temp = pool->head; pool->head = temp->next; free(temp); } free(pool); } void * work(void *arg) { char *p = (char *)arg; printf("%s\n",p); printf("hello\n"); printf("world\n"); sleep(1); } int main() { struct threadpool * pool = threadpool_init(10,100); threadpool_add_job(pool,work,"1"); threadpool_add_job(pool,work,"2"); threadpool_add_job(pool,work,"3"); sleep(10);//主线程太快,子线程没有来得及运行 thread_destory_pool(pool); return 0; }
4、C语言第二版(可伸缩的线程池实现)
利用任务队列、线程池以及管理线程池的线程
#include<stdio.h> #include<stdlib.h> #include<string.h> #include<pthread.h> #include<unistd.h> //可伸缩式线程池 //管理线程 /** * 任务多,线程少;添加线程 * 任务少,线程多;关闭线程 * 当前任务量 > 线程数 * 2 =》添加线程 * 当前任务量 * 2 < 线程数 =》关闭线程 */ typedef struct Task { void(*function)(void *arg); void *arg; }Task; typedef struct Threadpool { Task *TaskQ;//任务队列 int queueCapacity; //容量 int queueSize; //当前任务数量 int queueFront; //队头 取数据 int queueRear; //队尾 放数据 pthread_t managerID; //管理者线程 pthread_t *threadIDs; //工作线程 int minNum; //最小线程数 int Maxnum; //最大线程数 int busyNum; //忙的线程数 int liveNum; //存活线程数 int exitNum; //要销毁线程数 pthread_mutex_t mutexpool; //锁整个线程池 pthread_mutex_t mutexBusy; //锁busyNum pthread_cond_t notFull; //任务队列是否满了 pthread_cond_t notEmpty; //任务队列是否为空 int shutdown; //是否销毁线程池,销毁为1,不销毁为0 }Threadpool; Threadpool *threadpoolCreat(int min ,int max,int queueSize); void *worker(void* arg); void *manage(void *arg); void pthreadExit(Threadpool *pool); void threadpoolAdd(Threadpool *pool, void(*fun)(void *), void* arg); int threadPoolAliveNum(Threadpool* pool); int threadPoolBusyNum(Threadpool* pool); int threadPoolDestroy(Threadpool *pool); const int NUMBER = 2; Threadpool * threadpoolCreat(int min, int max, int queueSize) { Threadpool *pool = (Threadpool *)malloc(sizeof(Threadpool)); do { if (pool == NULL) { printf("malloc threadpool fail!\n"); break; } pool->threadIDs = (pthread_t *)malloc(sizeof(pthread_t)*max); if (pool->threadIDs == NULL) { printf("malloc threadIDs fail\n"); break; } memset(pool->threadIDs, 0, sizeof(pthread_t) * max); pool->minNum = min; pool->Maxnum = max; pool->busyNum = 0; pool->liveNum = min; pool->exitNum = 0; if (pthread_mutex_init(&pool->mutexBusy, NULL) != 0 || pthread_mutex_init(&pool->mutexpool, NULL) != 0 || pthread_cond_init(&pool->notEmpty, NULL) != 0 || pthread_cond_init(&pool->notFull, NULL) != 0) { printf("mutex or cond init fail\n"); break; } //������� pool->TaskQ = (Task *)malloc(sizeof(Task) * queueSize); pool->queueCapacity = queueSize; pool->queueSize = 0; pool->queueFront = 0; pool->queueRear = 0; pool->shutdown = 0; //�����߳� pthread_create(&pool->managerID, NULL, manage, (void *)pool); for (int i = 0; i < min; i++) { pthread_create(&pool->threadIDs[i], NULL, worker, (void *)pool); } return pool; } while (0); if (pool->threadIDs) free(pool->threadIDs); if (pool->TaskQ) free(pool->TaskQ); if(pool) free(pool); return NULL; } void * worker(void * arg) { Threadpool *pool = (Threadpool *)arg; while (1) { pthread_mutex_lock(&pool->mutexpool); while (pool->queueSize == 0 && !pool->shutdown) { //���������߳� pthread_cond_wait(&pool->notEmpty, &pool->mutexpool); //�ж��Dz���Ҫ�����߳� if (pool->exitNum > 0) { pool->exitNum--; if (pool->liveNum > pool->minNum) { pool->liveNum--; pthread_mutex_unlock(&pool->mutexpool); pthreadExit(pool); } } } if (pool->shutdown) { pthread_mutex_unlock(&pool->mutexpool); pthreadExit(pool); } //ȡ��һ������ Task task; task.function = pool->TaskQ[pool->queueFront].function; task.arg = pool->TaskQ[pool->queueFront].arg; pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity; pool->queueSize--; pthread_cond_signal(&pool->notFull); pthread_mutex_unlock(&pool->mutexpool); pthread_mutex_lock(&pool->mutexBusy); pool->busyNum++; pthread_mutex_unlock(&pool->mutexBusy); task.function(task.arg); pthread_mutex_lock(&pool->mutexBusy); pool->busyNum--; pthread_mutex_unlock(&pool->mutexBusy); } return NULL; } void * manage(void * arg) { Threadpool *pool = (Threadpool *)arg; int queueSize, liveNum, busyNum, counter; while (pool->shutdown == 0) { //��ʱѲ�� sleep(2); //ȡ���̳߳������������͵�ǰ�̵߳����� pthread_mutex_lock(&pool->mutexpool); queueSize = pool->queueSize; liveNum = pool->liveNum; pthread_mutex_unlock(&pool->mutexpool); //ȡ��æ���߳����� pthread_mutex_lock(&pool->mutexBusy); busyNum = pool->busyNum; pthread_mutex_unlock(&pool->mutexBusy); //�����߳� if (queueSize > liveNum && liveNum < pool->Maxnum) { counter = 0; pthread_mutex_lock(&pool->mutexpool); for (int i = 0; i < pool->Maxnum&& counter < NUMBER && pool->liveNum < pool->Maxnum; i++) { if (pool->threadIDs[i] == 0) { pthread_create(&pool->threadIDs[i], NULL, worker, (void *)pool); counter++; pool->liveNum++; } } pthread_mutex_unlock(&pool->mutexpool); } //�����߳� if (busyNum * 2 < liveNum&& liveNum > pool->minNum) { pthread_mutex_lock(&pool->mutexpool); pool->exitNum = NUMBER; pthread_mutex_unlock(&pool->mutexpool); for (int i = 0; i < NUMBER; i++) { pthread_cond_signal(&pool->notEmpty); } } return NULL; } } void pthreadExit(Threadpool *pool) { pthread_t tid = pthread_self(); for (int i = 0; i < pool->Maxnum; i++) { if (pool->threadIDs[i] == tid) { pool->threadIDs[i] = 0; break; } } pthread_exit(NULL); } void threadpoolAdd(Threadpool * pool, void(*func)(void *), void * arg) { pthread_mutex_lock(&pool->mutexpool); while (pool->queueCapacity == pool->queueSize && pool->shutdown == 0 )//�̳߳����߳������ȴ����߳̿��� { pthread_cond_wait(&pool->notFull, &pool->mutexpool); } if (pool->shutdown == 1) { pthread_mutex_unlock(&pool->mutexpool); return; } //�������� pool->TaskQ[pool->queueRear].function = func; pool->TaskQ[pool->queueRear].arg = arg; pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity; pool->queueSize++; pthread_cond_signal(&pool->notEmpty); pthread_mutex_unlock(&pool->mutexpool); } int threadPoolBusyNum(Threadpool* pool) { pthread_mutex_lock(&pool->mutexBusy); int busyNum = pool->busyNum; pthread_mutex_unlock(&pool->mutexBusy); return busyNum; } int threadPoolAliveNum(Threadpool* pool) { pthread_mutex_lock(&pool->mutexpool); int aliveNum = pool->liveNum; pthread_mutex_unlock(&pool->mutexpool); return aliveNum; } int threadPoolDestroy(Threadpool * pool) { if (pool == NULL) { return -1; } pool->shutdown = 1; pthread_join(pool->managerID, NULL); for (int i = 0; i < pool->liveNum; i++) { pthread_cond_signal(&pool->notEmpty); } if (pool->threadIDs) free(pool->threadIDs); if (pool->TaskQ) free(pool->TaskQ); pthread_mutex_destroy(&pool->mutexpool); pthread_mutex_destroy(&pool->mutexBusy); pthread_cond_destroy(&pool->notEmpty); pthread_cond_destroy(&pool->notFull); free(pool); pool = NULL; return 0; }
5、C++线程池实现