C++ condition_variable 条件变量

发布时间 2023-04-02 22:45:09作者: 牛犁heart

本节来了解下C++11 中关于条件变量(condition_variable) 的相关知识,这一部分的内容相信网上已经有了很多的分享,这里仅是对该部分内容学习的记录、总结。

条件变量(condition_variable)

条件变量是一种多线程的同步机制,它能够阻塞线程,直到某一条件满足。条件变量要与互斥量联合使用,以避免出现竞争的情况,当调用condition_variable的一个等待函数时,它使用一个unique_lock对象来锁定线程。

PS:为什么有了lock_guard,还需要unique_lock?
虽然lock_guard挺好用的,但是有个很大的缺陷,在定义lock_guard的地方会调用构造函数加锁,在离开定义域的话lock_guard就会被销毁,调用析构函数解锁。这就产生了一个问题,如果这个定义域范围很大的话,那么锁的粒度就很大,很大程序上会影响效率。

条件变量作用

条件变量的作用是用于多线程之间关于共享数据状态变化的同学。当一个动作需要另外一个动作完成时才能进行,即:当一个线程的行为依赖于另外一个线程对共享数据状态的改变时,这时候就可以使用条件变量。

条件变量是与互斥相关联的一种用于多线程之间关于共享数据状态改变的通信机制。 它将解锁与挂起封装成原子操作。等待一个条件变量时,会解开与该条件变量相关的锁,因此,使用条件变量等待的前提之一就是保证互斥量加锁。该互斥量会被自动加锁,所以,在完成操作之后需要解锁。

底层实现

class condition_variable {
public:
	typedef _Cnd_t native_handle_type;
 
	condition_variable() {				// 构造函数,初始化条件变量,所有的条件变量必须初始化后才能使用。
		_Cnd_initX(&_Cnd);
	}
 
	~condition_variable() _NOEXCEPT {	// 析构函数
		_Cnd_destroy(&_Cnd);
	}
 
	condition_variable(const condition_variable&) = delete;
	condition_variable& operator=(const condition_variable&) = delete;
 
	void notify_one() _NOEXCEPT {		// 唤醒一个在等待线程
		_Cnd_signalX(&_Cnd);
	}
 
	void notify_all() _NOEXCEPT {		// 唤醒所有在等待的线程
		_Cnd_broadcastX(&_Cnd);
	}
 
	void wait(unique_lock<mutex>& _Lck) {					// 等待
		_Cnd_waitX(&_Cnd, &_Lck.mutex()->_Mtx);
	}
 
	template<class _Predicate>
	void wait(unique_lock<mutex>& _Lck, _Predicate _Pred) {	// 等待,带有描述式
		while (!_Pred())
			wait(_Lck);
	}
 
	template<class _Rep, class _Period>
	_Cv_status wait_for(unique_lock<mutex>& _Lck, const chrono::duration<_Rep, _Period>& _Rel_time) {
		stdext::threads::xtime _Tgt = _To_xtime(_Rel_time);
		return (wait_until(_Lck, &_Tgt));
	}
 
	template<class _Rep, class _Period, class _Predicate>
	bool wait_for(unique_lock<mutex>& _Lck, const chrono::duration<_Rep, _Period>& _Rel_time, _Predicate _Pred) {
		stdext::threads::xtime _Tgt = _To_xtime(_Rel_time);
		return (wait_until(_Lck, &_Tgt, _Pred));
	}
 
	template<class _Clock, class _Duration>
	_Cv_status wait_until( unique_lock<mutex>& _Lck, const chrono::time_point<_Clock, _Duration>& _Abs_time) {
		typename chrono::time_point<_Clock, _Duration>::duration
			_Rel_time = _Abs_time - _Clock::now();
		return (wait_for(_Lck, _Rel_time));
	}
 
	template<class _Clock, class _Duration, class _Predicate>
	bool wait_until(unique_lock<mutex>& _Lck, const chrono::time_point<_Clock, _Duration>& _Abs_time, _Predicate _Pred) {
		typename chrono::time_point<_Clock, _Duration>::duration
			_Rel_time = _Abs_time - _Clock::now();
		return (wait_for(_Lck, _Rel_time, _Pred));
	}
 
	_Cv_status wait_until( unique_lock<mutex>& _Lck, const xtime *_Abs_time) {
		if (!_Mtx_current_owns(&_Lck.mutex()->_Mtx))
			_Throw_Cpp_error(_OPERATION_NOT_PERMITTED);
		int _Res = _Cnd_timedwaitX(&_Cnd, &_Lck.mutex()->_Mtx, _Abs_time);
		return (_Res == _Thrd_timedout ? cv_status::timeout : cv_status::no_timeout);
	}
 
	template<class _Predicate>
	bool wait_until(unique_lock<mutex>& _Lck, const xtime *_Abs_time, _Predicate _Pred) {
		bool _Res = true;
		while (_Res && !_Pred())
			_Res = wait_until(_Lck, _Abs_time)
				!= cv_status::timeout;
		return (_Pred());
	}
 
	native_handle_type native_handle() {					// 返回条件变量的句柄
		return (_Cnd);
	}
 
	void _Register(unique_lock<mutex>& _Lck, int *_Ready) {
		_Cnd_register_at_thread_exit(&_Cnd, &_Lck.release()->_Mtx, _Ready);
	}
 
	void _Unregister(mutex& _Mtx) {
		_Cnd_unregister_at_thread_exit(&_Mtx._Mtx);
	}
 
private:
	_Cnd_t _Cnd;
};

在类condition_variable中,包含了私有成员变量、构造函数、析构函数、等待和唤醒等方法。
等待操作: wait()、wait_for()、wait_until()
唤醒操作: notify_one()、notify_all()

条件变量的使用

#include <iostream>                 // std::cout
#include <thread>                   // std::thread
#include <mutex>                    // std::mutex, std::unique_lock
#include <condition_variable>       // std::condition_variable
 
using namespace std;
 
mutex mtx;                          // 互斥量
condition_variable cv;              // 条件变量
bool ready = false;                 // 标志量
 
void print_id(int id) {
    unique_lock<mutex> lck(mtx);    // 上锁
    while (!ready) {
        cv.wait(lck);               // 线程等待直到被唤醒(释放锁 + 等待,唤醒,在函数返回之前重新上锁)
    }
    cout << "thread " << id << '\n';
}
 
void go() {
    unique_lock<mutex> lck(mtx);    // 上锁
    ready = true;
    cv.notify_all();                // 唤醒所有正在等待(挂起)的线程(在这里面要释放锁,为了在wait函数返回之前能成功的重新上锁)
}
 
int main() {
    thread threads[10];
    for (int i = 0; i<10; ++i) {
        threads[i] = thread(print_id, i);
    }
 
    cout << "10 threads ready to race...\n";
    go();
 
    for (auto& th : threads) {
        th.join();
    }
 
    return 0;
}

来分析一下多线程代码时如何运行的:
1、线程A一来,就将互斥量上锁(持有了锁),ready为false,那么线程A将调用条件变量的wait()方法;
2、在wait()方法中,做的第一件事就是将互斥量解锁(释放持有权),并进入等待状态(在wait()中阻塞,线程A挂起);
3、现在线程B来了,互斥量是没有上锁的,所以线程B能持有锁,同理,接下来线程B也会挂起;
4、当所有线程都挂起了(就绪),此时互斥量也没有被上锁,在主线程中将ready置为true,并调用notify_all()将所有挂起的线程都唤醒;
5、此时所有线程将从wait()方法中返回,比如线程C先返回,在return之前,wait()方法做的最后一件事就是自动将互斥量上锁(线程C重新持有锁,以配合unique_lock的析构函数);
6、由于while循环,此时再判断到ready为true,那么线程C将执行打印id的语句,由于此时只有线程C持有锁,不存在线程竞争问题,执行完打印之后,线程C就结束了,此时由unique_lock的析构函数解锁,释放所有权。
7、由于在wait()方法return之前,会自动重新去持有锁,若此时锁由线程C持有,则其他线程将继续阻塞,直到线程C释放锁;若线程C执行完毕后释放了锁,那么其他线程将会争取锁的持有权,争取到锁的就会像之前的线程C一样;没有争取到的就继续阻塞;
8、以此类推,由于每个线程都join,那么当所有线程执行完毕后,主线程才会继续执行;
实际上,条件变量的wait()、wait_for()、wait_until()方法中所作的事是:解锁 + 等待、唤醒、加锁,这三个是有序发生的。

GCC源码位置:http://ftp.gnu.org/gnu/gcc/

参考:《探索C++多线程》:condition_variable源码(一)