生产者-消费者代码实现

发布时间 2023-11-21 11:39:49作者: 野原丶广志
#include <pthread.h>
#include <errno.h>
#include <list>
#include <iostream>
#include <semaphore.h>
#include <unistd.h>




class Task
{
public:
    Task(int taskID){
        this->taskID = taskID;
    }

    void doTask()
    {
        std::cout << "handle a task, taskID: " << taskID << ", threadID: " << pthread_self() << std::endl;
    }

private:
    int taskID;

};

pthread_mutex_t mymutex;
std::list<Task*> tasks;
sem_t mysemaphore;

void* consumer_thread(void* param)
{
    Task* ptask = NULL;

    while(true){
        if(sem_wait(&mysemaphore) != 0) continue;

        if(tasks.empty()) continue;

        pthread_mutex_lock(&mymutex);
        ptask = tasks.front();
        tasks.pop_front();
        pthread_mutex_unlock(&mymutex);

        ptask->doTask();
        delete ptask;
    }
    return NULL;
}

void* producer_thread(void* param)
{
    int taskID = 0;
    Task* pTask = NULL;
    while(true){
        pTask = new Task(taskID);

        pthread_mutex_lock(&mymutex);
        tasks.push_back(pTask);
        std::cout << "produce a task, taskID: " << taskID << ", threadID: " << pthread_self() << std::endl;
        pthread_mutex_unlock(&mymutex);

        sem_post(&mysemaphore);
        taskID++;

        sleep(1);
    }
    return NULL;

}




int main()
{
    pthread_mutex_init(&mymutex,NULL);
    //初始化信号量资源计数为0
    sem_init(&mysemaphore,0,0);

    //创建5个消费者线程
    pthread_t consumerThreadID[5];
    for(int i = 0; i < 5; i++){
        pthread_create(&consumerThreadID[i],NULL,consumer_thread,NULL);
    }

    //创建一个生产者线程
    pthread_t producerThreadID;
    pthread_create(&producerThreadID,NULL,producer_thread,NULL);

    pthread_join(producerThreadID,NULL);

    for(int i = 0; i < 5; ++i){
        pthread_join(consumerThreadID[i],NULL);
    }

    sem_destroy(&mysemaphore);
    pthread_mutex_destroy(&mymutex);

    return 0;
}