Linux条件变量

发布时间 2023-11-22 09:13:00作者: 野原丶广志

1.为什么要有条件变量?

在实际应用中,常常会有如下的需求:

  • 用于反复判断一个多线程的共享条件是否满足。
//伪代码
int WaitForTrue()
{
	do{
	 pthread_mutex_lock(&m);
	
	 //验证 condition 是否为 true
	
	 //解锁,让其它线程有机会改变condition
	 pthread_mutex_unlock(&m);
	
	 //睡眠 n 秒
	  sleep(n);
	}while(condition is false);
}
return 1;

2.条件变量为什么要与互斥体对象结合使用?


pthread_mutex_lock(&m)
while(condition_is_false)
{
	pthread_mutex_unlock(&m);
	//解锁之后,等待之前,可能条件变量已经满足,信号已经发出,但是该信号可能被错过
	cond_wait(&cv);
	pthread_mutex_lock(&m);
}

如上伪代码所示,假设线程 A 在执行完第 5 行代码后 CPU 时间片被剥夺,此时另一个线程 B 获得该互斥体对象 m,然后发送条件信号,等线程 A 重新获得时间片后,由于该信号已经被错过,可能会导致线程 A 在代码第 7 行无限阻塞下去。

造成这个问题的根源是释放互斥体对象与条件变量等待唤醒不是原子操作,即解锁和等待这两个步骤必须在同一个原子操作中,才能确保 cond_wait 在唤醒之前不会有其它线程获得这个互斥体对象。

3.条件变量的使用

利用条件变量实现生产者-消费者模型

#include <pthread.h>
#include <errno.h>
#include <stdio.h>
#include <unistd.h>
#include <list>
#include <iostream>

class Task
{
public:
    Task(int taskID){
        this->taskID = taskID;
    }

    void doTask(){
        std::cout << "handle a task, taskID: " << taskID << ", threadID: " << pthread_self() << std::endl;
    }

private:
    int taskID;
};

pthread_mutex_t mymutex;
std::list<Task*> tasks;
pthread_cond_t mycv;

void* consumer_thread(void* param)
{
    Task* pTask = NULL;
    while(true)
    {
        pthread_mutex_lock(&mymutex);
        while(tasks.empty()){
            //如果获得了互斥锁,但是条件不合适,则pthread_cond_wait会释放锁,不往下执行
            //发生变化后,如果条件合适,则pthread_cond_wait将直接获得锁
            pthread_cond_wait(&mycv,&mymutex);
        }
        pTask = tasks.front();
        tasks.pop_front();
        pthread_mutex_unlock(&mymutex);

        if(pTask == NULL){
            continue;
        }
        pTask->doTask();
        delete pTask;
        pTask = NULL;
    }
    return NULL;

}

void* producer_thread(void* param)
{
    int taskID = 0;
    Task* pTask = NULL;

    while(true){
        pTask = new Task(taskID);
        
        pthread_mutex_lock(&mymutex);
        tasks.push_back(pTask);
        std::cout << "prodece a task, taskID: " << taskID << ", threadID: " << pthread_self() << std::endl;
        pthread_mutex_unlock(&mymutex);

        //释放信号量,通知消费者线程
        pthread_cond_signal(&mycv);

        taskID++;

        //休眠1秒
        sleep(1);
    }
    return NULL;
}



int main()
{
    pthread_mutex_init(&mymutex,NULL);
    pthread_cond_init(&mycv, NULL);

    //创建5个消费者线程
    pthread_t consumerThread[5];
    for(int i = 0; i < 5; i++){
        pthread_create(&consumerThread[i],NULL,consumer_thread,NULL);
    }
    
    //创建1个生产者线程
    pthread_t producerThread;
    pthread_create(&producerThread,NULL,producer_thread,NULL);

    pthread_join(producerThread,NULL);

    for(int i = 0; i < 5; i++){
        pthread_join(consumerThread[i],NULL);
    }

    pthread_mutex_destroy(&mymutex);
    pthread_cond_destroy(&mycv);

    return 0;
}

4.条件变量的虚假唤醒

  • 即某次 pthread_cond_wait 被唤醒时, task.empty() 可能仍然为 True;
  • 因此条件变量醒来之后再次测试条件是否满足就可以解决虚假唤醒问题,因此使用 while 循环而不是 if 语句来判断条件。

c++11实现消费者-生产者模型

#include <thread>
#include <mutex>
#include <condition_variable>
#include <list>
#include <iostream>


class Task
{
public:
    Task(int taskID){
        this->taskID = taskID;
    }

    void doTask(){
        std::cout << "handle a task, taskID: " << taskID << ", threadID: " << std::this_thread::get_id() << std::endl;
    }
private:
    int taskID;
};

std::mutex mymutex;
std::condition_variable mycv;
std::list<Task*> tasks;

void* consumer_thread(){
    Task* pTask = NULL;
    while(true){

        std::unique_lock<std::mutex> guard(mymutex);
        while(tasks.empty()){
            //如果获得了互斥锁,但是条件不合适,则pthread_cond_wait会释放锁,不向下执行
            //发生变化后,如果条件合适,则pthread_cond_wait将直接获得锁
            mycv.wait(guard);
        }
        pTask = tasks.front();
        tasks.pop_front();

        if(pTask == NULL){
            continue;
        }
        pTask->doTask();
        delete pTask;
        pTask = NULL;
    }
    return NULL;
}


void* producer_thread(){
    Task* pTask = NULL;
    int taskID = 0;

    while(true){
        pTask = new Task(taskID);
        //使用小括号减小guard锁的作用范围
        {
            std::lock_guard<std::mutex> guard(mymutex);
            tasks.push_back(pTask);
            std::cout << "produce a task, taskID: " << taskID << ", threadID: " << std::this_thread::get_id() << std::endl;
        }

        //释放信号量,通知消费者进程
        mycv.notify_one();

        taskID++;

        //休眠1秒
        std::this_thread::sleep_for(std::chrono::seconds(1));

    }
    return NULL;

}
int main(){
    //创建5个线程
    std::thread consumer1(consumer_thread);
    std::thread consumer2(consumer_thread);
    std::thread consumer3(consumer_thread);
    std::thread consumer4(consumer_thread);
    std::thread consumer5(consumer_thread);


    //创建1个生产者线程
    std::thread producer(producer_thread);

    producer.join();
    consumer1.join();
    consumer2.join();
    consumer3.join();
    consumer4.join();
    consumer5.join();

    return 0;
}