C++11 多线程

发布时间 2023-12-19 22:13:26作者: Beasts777

文章参考:

C++ 教程 | 爱编程的大丙 (subingwen.cn)

C++11的原子量与内存序浅析 - 兔晓侠 - 博客园 (cnblogs.com)

从C++11起,C++为用户提供一套新的多线程类。线程相关操作头文件:thread

#include <thread>

一. std::thread

1. 初始化和析构

默认构造函数:

  • 原型:

    thread() noexcept;
    
  • 作用:创建一个新的线程,该线程为空。

含参构造函数:

  • 原型:

    template <class Fn, class... Args>
    explicit thread(Fn&& fn, Args&&... args);
    
    • fn:任务函数。有多种可选类型:普通函数类成员函数匿名函数仿函数(这些都是可执行对象)。
    • args:任务函数的参数列表(以右值形式传递)。
  • 作用:创建一个新的线程,且指定该线程的任务函数和参数。

复制构造函数:

  • 原型:

    thread(const thread&) = delete;
    
  • 说明:使用了delete关键字,证明C++11中thread类禁止拷贝构造。

移动构造函数:

  • 原型:

    thread(thread&&) noexcept;
    
  • 作用:构建一个新线程,将老线程的内容交给新线程。

析构函数:

  • 原型:

    ~thread();
    

2. 常用成员函数

2.1 获取线程id

原型:

std::thread::id get_id();

获取当前线程id:通过命名空间this_thread得到当前线程的id。

this_thread::get_id();

2.2 线程回收

join:等待线程结束并回收资源。(会阻塞)。

void join();

joinable:查看主线程与子线程是否存在联系,从而判断是否可以使用join函数进行回收。

bool joinable() const noexcept;

2.3 线程分离

detach

  • 作用:将线程和调用它的线程分离,彼此独立执行。

  • 注意:

    • 此方法必须在线程创建时调用,而且使用detach方法分离的线程无法通过join回收。
    • 使用该方法分离的线程将会被系统的其它进程接管并回收,但当主线程死亡,该线程依旧会被回收。
  • 原型:

    void detach();
    

2.4 operator=

存在两种情况:

  • 移动赋值符号:此时如果传递来一个右值,那么会转移资源的所有权。

    thread& operator= (thread&& other) noexcept
    
  • 拷贝赋值符号:线程中的资源不能被复制,因此C++禁止了线程的拷贝赋值操作。

    thread& operator= (const thread& other) = delete;
    

3. 静态函数

C++提供了用于获取当前计算机CPU核心数量的静态函数,用于帮助程序员更好的设计线程数量:

static unsigned hardware_concurrency() noexcept;

需要注意的是:该函数在thread命名空间下。

EG:

cout << thread::hardware_concurrency() << endl;

二. 命名空间this_thread

C++11中提供了一个关于线程的命名空间std::this_thread,该命名空间中提供了四个公共的成员函数,通过这些成员函数就可以对当前线程进行相关操作。

1. get_id()

作用:获取当前线程的id。

原型:

thread::id get_id() noexcept;

EG:

  • 代码:

    #include <iostream>
    #include <thread>
    using namespace std;
    
    void work_func(){
        cout << "son thread's id = " << this_thread::get_id() << endl;
    }
    
    int main(void){
        cout << "main thread's id = " << this_thread::get_id() << endl;
        thread t1(work_func);
        t1.join();
        return 0;
    }
    
  • 输出:

    main thread's id = 139730534856512
    son thread's id = 139730534852352
    

2. sleep_for()

和进程一样,线程被创建后也有五种状态:

  • 创建态
  • 就绪态
  • 运行态
  • 阻塞态(挂起态)
  • 退出态(终止态)

作用:将当前线程从运行态变成阻塞态,并在该状态下休眠一定的时长。随后,线程会从阻塞态变成就绪态,再次争抢时间片,抢到了才会变成运行态,从而继续运行。

原型:

template <class Rep, class Period>
void sleep_for(const chrono::duration<Rep, Peroid>& rel_time);
  • rel_time:休眠的时长,其类型duration是C++时间库chrono中的时间段类。

EG:

  • 代码:

    #include <iostream>
    #include <thread>
    using namespace std;
    
    void work_func(){
        this_thread::sleep_for(chrono::second(2));		
        cout << "son thread's id = " << this_thread::get_id() << endl;
    }
    
    int main(void){
        cout << "main thread's id = " << this_thread::get_id() << endl;
        thread t1(work_func);
        t1.join();
        return 0;
    }
    
  • 分析:第六行会让当前线程休眠2秒。

3. sleep_until()

作用:让当前线程从运行态转换到阻塞态,并阻塞到某一指定的时间点(time_point类型),随后从阻塞态变为就绪态,并开始争抢CPU时间片。

原型:

template <class Clock, class Duration>
void sleep_until(const chrono::time_point<Clock, Duration>& abs_time);
  • abs_time:线程阻塞到的时间点。time_point是C++时间库chrono中的时间点类。

EG:

#include <iostream>
#include <thread>
#include <chrono>
using namespace std;

void func()
{
    for (int i = 0; i < 10; ++i)
    {
        // 获取当前系统时间点
        auto now = chrono::system_clock::now();
        // 时间间隔为2s
        chrono::seconds sec(2);
        // 当前时间点之后休眠两秒
        this_thread::sleep_until(now + sec);
        cout << "子线程: " << this_thread::get_id() << ", i = " << i << endl;
    }
}

int main()
{
    thread t(func);
    t.join();
}

4. yield()

作用:让当前线程从主动让出已经抢占到的CPU时间片,从运行态转换为就绪态。从而避免一个线程长时间占用CPU资源。

注意:虽然这个函数可以让当前线程主动放弃争抢CPU资源,但该线程转变的是就绪态,这意味着该线程会立刻继续争抢时间片。

三. C++线程同步之互斥锁

C++11提供四种锁,分别是:

  • std::mutex:独占的互斥锁,不能递归使用。
  • std::timed_mutex:带超时的独占互斥锁,不能递归使用。
  • std::recursive_mutex:递归互斥锁,不带超时功能。
  • std::recursive_timed_mutex:带超时功能的递归互斥锁。

1. std::mutex

原理与C语言中的互斥锁基本一致。

头文件:

#include <mutex>

1.1 成员函数

1.1.1 lock()

作用:给临界区加锁。只有一个线程可以获取锁的所有权,如果获取失败,当前线程会阻塞。

原型:

void lock();

1.1.2 try_lock()

作用:尝试获取锁并给临界区加锁。它和lock的区别在于lock如果获取锁失败,会阻塞当前线程,而lock不会阻塞当前线程。

原型:

bool try_lock();

1.1.3 unlock()

作用:将互斥锁解锁,释放。前提是当前线程确实拥有这把互斥锁。

原型:

void unlock();

1.2 线程同步案例

使用两个线程共同操作一个全局变量,二者交替数数,将数值存储到全局变量里并打印出来。

  • 代码:

    #include <iostream>
    #include <thread>
    #include <chrono>
    #include <mutex>
    using namespace std;
    
    int num = 0;
    mutex mt;
    
    void work_func(int id){
        for( int i = 0; i < 10; i++ ){
            mt.lock();			// 为临界区加锁
            num++;
            cout << id << ": " << num << endl;
            mt.unlock();		// 解锁
        }
    }
    
    int main(void){
        thread t1(work_func, 1);
        thread t2(work_func, 2);
        t1.join();
        t2.join();
        return 0;
    }
    

2. std::lock_guard

这时C++11新增的一个模板类。

作用:简化互斥锁lock()unlock()的写法。它使用RAII方式,即通过在构造函数中加锁,在析构函数中解锁,从而避免了程序员显式调用unlock()函数,也让代码更加安全。但同时,这会导致我们无法自主控制合适解锁,有时会降低程序效率

原型:

template <class Mutex>
class lock_guard;

常用构造:

explicit lock_guard(mutex_type& m);

EG:

void work_func(int id){
    for( int i = 0; i < 10; i++ ){
        // 使用哨兵互斥锁,避免了显式解锁操作
        lock_guard<mutex> lg(mt);		
        num++;
        cout << id << ": " << num << endl;
    }
}

2. std::recursive_mutex

作用:递归互斥锁std::recursive_mutex允许通过一个线程多次获取通过一个互斥锁,从而解决一个线程需要多次获取同一个互斥锁时的死锁问题。

EG:

  • 下面是一个需要多次获取锁的场景:错误示范

    #include <iostream>
    #include <thread>
    #include <mutex>
    using namespace std;
    
    struct Calculate
    {
        Calculate() : m_i(6) {}
    
        void mul(int x)
        {
            lock_guard<mutex> locker(m_mutex);
            m_i *= x;
        }
    
        void div(int x)
        {
            lock_guard<mutex> locker(m_mutex);
            m_i /= x;
        }
    
        void both(int x, int y)
        {
            lock_guard<mutex> locker(m_mutex);
            mul(x);
            div(y);
        }
    
        int m_i;
        mutex m_mutex;
    };
    
    int main()
    {
        Calculate cal;
        cal.both(6, 3);
        return 0;
    }
    

    由于已经在第24行加过锁,而调用的mul中又尝试获取锁加锁,这就导致同一线程多次请求同一个锁,导致了死锁发生。使用递归互斥锁std::recursive_mutex可以避免这一问题。

  • 改进后:

    #include <iostream>
    #include <thread>
    #include <mutex>
    using namespace std;
    
    struct Calculate
    {
        Calculate() : m_i(6) {}
    
        void mul(int x)
        {
            lock_guard<recursive_mutex> locker(m_mutex);
            m_i *= x;
        }
    
        void div(int x)
        {
            lock_guard<recursive_mutex> locker(m_mutex);
            m_i /= x;
        }
    
        void both(int x, int y)
        {
            lock_guard<recursive_mutex> locker(m_mutex);
            mul(x);
            div(y);
        }
    
        int m_i;
        recursive_mutex m_mutex;
    };
    
    int main()
    {
        Calculate cal;
        cal.both(6, 3);
        cout << "cal.m_i = " << cal.m_i << endl;
        return 0;
    }
    

    通过使用递归互斥锁,避免了死锁的出现。

注意:虽然使用递归互斥锁可以解决同一个线程频繁获取互斥锁的问题,但建议少用,原因如下:

  • 使用递归互斥锁的场景往往都是可以简化的,使用递归互斥锁很容易导致复杂逻辑的产生。
  • 递归互斥锁比非递归互斥锁效率低了一点。
  • 虽然递归互斥锁允许一个线程多次获取同一个互斥锁的所有权,但并不是无限次。而这个最大次数没有说明,所以一旦超过一定的次数,就会出现std::system错误。

4. std::timed_mutex

超时独占互斥锁std::timed_mutex

作用:在获取互斥锁资源是增加了超时等待的功能,如果指定时间内没有获取锁,那么就解除阻塞,干其他事。如果在此期间获得了锁,那么继续进行操作。

额外的成员函数:相较于std::mutexstd::timed_mutex多了两个成员函数:

  • try_lock_for():阻塞一定的时间长度来等待获取锁。

    template <class Rep, class Period>
    bool try_lock_for (const chrono::duration<Rep, Period>& reltime);
    
  • try_lock_until:阻塞到指定时间点,在这期间获取锁。

    template <class Clock, class Duration>
    bool try_lock_until (const chrono::time_point<Clock, Duration>& abs_time);
    

EG:

  • 代码:

    #include <iostream>
    #include <thread>
    #include <chrono>
    #include <mutex>
    using namespace std;
    
    timed_mutex t_mt;
    
    void work_func(){
        chrono::seconds timeout(1);
        while(true){
            if(t_mt.try_lock_for(timeout)){
                cout << "current thread's id == " << this_thread::get_id() << ", getting mutex" << endl;
                // 模拟任务耗时
                this_thread::sleep_for(chrono::seconds(5));
                t_mt.unlock();
                break;
            }
            else{
                cout << "current thread's id == " << this_thread::get_id() << ", doesn't get mutex" << endl;
                this_thread::sleep_for(chrono::seconds(2));
            }
        }
    }
    
    int main(void){
        thread t1(work_func);
        thread t2(work_func);
        t1.join();
        t2.join();
        return 0;
    }
    
  • 输出:

    current thread's id == 140237342021376, getting mutex
    current thread's id == 140237333628672, doesn't get mutex
    current thread's id == 140237333628672, doesn't get mutex
    current thread's id == 140237333628672, getting mutex
    
  • 分析:

    在工作函数中,使用一个while循环去反复尝试获取锁。如果线程A获取了锁,那么线程A要拿着锁工作5秒,5秒后释放锁。这期间线程B由于使用std::timed_lock,所以每1秒获取不到锁,就会输出一次无法获取锁的信息。

    • 为什么无法获取锁的信息只输出了两条,而不是5条?

      显然,线程无法获取锁,将会进入阻塞态,要想再次尝试获取锁,要从阻塞态变成就绪态,再从就绪态变成运行态,代码才开始运行,并不是无缝衔接式与逆行,因此没有输出5条。

5. std::recursive_timed_mutex

递归超时互斥锁。

作用:与超时互斥锁std::timed_mutex基本一致,指定时间段或时间点尝试获取互斥锁,不同点在于它和递归互斥锁std::recursive_mutex一样,都允许同一个线程多次申请同一个锁资源而不会死锁。

使用方式:和超时互斥锁std::timed_mutex基本一致。

弊端:和递归互斥锁一致

四. C++线程同步之条件变量

1. 概述

条件变量式C++11提供的另一种同步机制,可以阻塞一个或多个线程,并往往和互斥锁配合使用。

条件变量头文件为:<condition_variable>

C++11提供两种条件变量:

  • condition_variable:需要配合模板类std::unique_lock<std::mutex>才能进行wait操作,且该模板类只能使用一种锁:mutex

  • condition_variable_any:可以和任意带有lock()unlock()语义的锁搭配使用,也就是说有四种可供搭配使用的锁:

    • std::mutex:独占的非递归互斥锁。
    • std::timed_mutex:带超时的独占非递归互斥锁。
    • std::recursive_mutex:不带超时功能的递归互斥锁。
    • std::recursive_timed_mutex:带超时功能的递归互斥锁。

    此外,它还可以和lock_guardunique_lock模板类配合使用。

条件变量往往用于生产者——消费者模型,大致使用过程如下:

  1. 拥有条件变量的线程获取互斥锁。
  2. 循环检查某个条件,如果条件不满足则阻塞当前线程,否则继续向下执行。
    • 生产者:产品数量达到上线,生产者阻塞,否则继续生产。
    • 消费者:产品数量为0,消费者阻塞,否则继续消费。
  3. 条件满足后,可以调用notify_one()notify_all(),来唤醒一个或所有被条件变量阻塞的线程。
    • 生产者:唤醒被阻塞的消费者,让消费者继续消费。
    • 消费者:唤醒被阻塞的生产者,让生产者继续生产。

2. condition_variable

2.1 成员函数

2.1.1 wait()

调用wait,将线程阻塞。

原型:

  •   void wait(unique_lock<mutex>& lock);
    
    • 调用该函数的线程直接被阻塞。
    • 参数:必须通过模板类unique_lock对互斥锁进行包装。
  •   template <class Predicate>
      void wait(unique_lock<mutex>& lck, Predicate pred);
    
    • 参数:
      • lck:相关的锁,必须使用模板类unique_lock进行包装。
      • pred:判断条件,是一个返回值为bool类型的函数。
        • 返回值为false:线程被阻塞。
        • 返回值true:线程继续向下运行。

注意:

如果使用wait导致当前线程被阻塞,那么wait函数也会将当前线程占有的锁资源释放。当阻塞结束后,wait函数会去抢占原有的锁资源并上锁。这是为了避免死锁。

unique_lock

模板类。独占的互斥锁对象不能直接传递给wait函数,需要通过模板类unique_lock进行二次处理。模板类依旧可以对互斥锁对象进行如下处理:

  • lock():锁定关联的互斥锁。
  • try_lock():尝试锁定关联的互斥锁,如果无法锁定,函数直接返回,线程不会被阻塞。
  • try_lock_for():在指定时间段内尝试锁定互斥锁,如果成功,线程继续向下;如果失败,函数直接返回,不会阻塞。
  • try_lock_until():在指定时间之前尝试锁定互斥锁,如果成功,线程继续向下;如果失败,函数直接返回,不会阻塞。
  • unlock():将互斥锁解锁。

此外,unqie_locklock_gurad一样,都采用了RAII设计方式,在构造函数内抢占锁,在析构函数内释放锁。因此一般使用它们时无需对锁进行显式抢占与释放。

2.1.2 wait for()

作用:wait()的功能基本一致,只不过多了一个阻塞时长。如果在阻塞时长内线程没有被唤醒,那么到指定时间,线程会自动解除阻塞。

原型:wait()一样,有两种

  • 不带判断函数:

    template <class Rep, class Period>
    cv_statuc wait_for(unique_lock<mutex>& lck, const chrono::duration<Red, Period>& rel_time);
    
  • 带判断函数的:

    template <class Rep, class Period, class Predicate>
    bool wait_for(unique_lock<mutex>& lck,
                   const chrono::duration<Rep,Period>& rel_time, Predicate pred);
    

2.1.3 wait until()

作用:wait for()功能基本一致,只不过wait for是在指定时长内阻塞线程,而

wait until()是阻塞线程到指定时间点,到点线程会自动脱离阻塞态,继续去抢CPU时间片和锁。阻塞期间也可以被其它线程唤醒。

原型:

  • 不带判断函数的:

    template <class Rep, class Duration>
    cv_statuc wair_until (unique_lock<mutex>& lck, const chrono::time_point<Clock, Duration>& abs_time);
    
  • 带判断函数:

    template <class Rep, class Duration, class Predicate>
    cv_statuc wair_until (unique_lock<mutex>& lck, 
                          const chrono::time_point<Clock, Duration>& abs_time, Predicate pred);
    

2.1.4 通知函数

有两个通知函数:

  •   void notify_one() noexcept;
    

    唤醒一个被当前条件变量阻塞的线程。

  •   void notify_all() noexcept;
    

    唤醒所有被当前条件变量阻塞的线程。

2.2 实例

问题:使用生产者——消费者模型,维护一个商品队列,两个生产者向里面生产商品,三个消费者从里面取出商品。

代码:

#include <iostream>
#include <thread>
#include <chrono>
#include <mutex>
#include <condition_variable>
#include <list>
#include <functional>
using namespace std;

template < class T >
class Test{
private:
    int max_num;        // 商品的最大数量
    list<T> stack;      // 商品队列
    condition_variable empty;       // 商品是否为空条件变量
    condition_variable full;        // 商品是否已经满了条件变量
    mutex mt;                       // 互斥锁
public:
    Test(int num): max_num(num){}
    void put(const T& val){
        unique_lock<mutex> locker(mt);
        while(true){
            if(stack.size() == max_num){
                full.wait(locker);
            }
            else {
                break;
            }    
        }
        stack.push_back(val);
        cout << "生产者生产:" << val << endl;
        // 提醒消费者消费
        empty.notify_one();
    }
    void take(){
        unique_lock<mutex> locker(mt);
        empty.wait(locker,[this](){
                return !stack.empty();
                });
        cout << "消费者消费:" <<  stack.front() << endl;; 
        stack.pop_front();
        // 通知生产者去生产
        full.notify_one();
    }
};

int main(void){
    Test<int> t(50);
    auto produce = bind(&Test<int>::put, &t, placeholders::_1);
    auto consumer = bind(&Test<int>::take, &t);
    thread t1[3];
    thread t2[3];
    for(int i = 0; i < 3; i++){
        t1[i] = thread(produce, i+100);
        t2[i] = thread(consumer);
    }
    for(int i = 0; i < 3; i++){
        t1[i].join();
        t2[i].join();
    }
    return 0;
}

输出:输出并不固定

生产者生产:100
生产者生产:101
消费者消费:100
消费者消费:101
生产者生产:102
消费者消费:102

分析:

  • 通过对比22行和37行中关于生产者——消费者临界条件的判断方式,我们发现:使用wait()的重载方法比使用while循环要简练的多,因此更加推荐使用wait()的重载方法配合lambda表达式。
  • 注意:
    • wait()的判断式中,如果为真表示继续执行,为假才阻塞线程。不要记混了。
    • unique_lock在构造函数和析构函数内完成了锁的抢占与释放。注意不要重复加锁,但可以重复释放

3. condition_variable_any

3.1 成员函数

condition_variable_any的成员函数功能和逻辑与condition_variable的成员函数基本一致,最大的不同在于锁的类型不同。condition_variable_any可以接收四种锁和lock_guardunique_lock模板类;而``condition_variable只能接受unique_lock模板类,而这种模板类只能接收最基础的一种锁:mutex`。

也就是说,实际使用中,condition_variable_any更加全面。

3.1.1 wait()

调用wait,将线程阻塞。

原型:

  •   template <class Lock>
      void wait(Lock& lck);
    
    • 调用该函数的线程直接被阻塞。
    • 参数:四种锁或者lock_guardunique_lock模板类封装过的锁。
  •   template <class Lock, class Predicate>
      void wait(unique_lock<mutex>& lck, Predicate pred);
    
    • 参数:
      • lck:四种锁或者lock_guardunique_lock模板类封装过的锁。
      • pred:判断条件,是一个返回值为bool类型的函数。
        • 返回值为false:线程被阻塞。
        • 返回值true:线程继续向下运行。

注意:

如果使用wait导致当前线程被阻塞,那么wait函数也会将当前线程占有的锁资源释放。当阻塞结束后,wait函数会去抢占原有的锁资源并上锁。这是为了避免死锁。

3.1.2 wait for()

作用:wait()的功能基本一致,只不过多了一个阻塞时长。如果在阻塞时长内线程没有被唤醒,那么到指定时间,线程会自动解除阻塞。

原型:wait()一样,有两种

  • 不带判断函数:

    template <class Lock, class Rep, class Period>
    cv_statuc wait_for(Lock& lck, const chrono::duration<Red, Period>& rel_time);
    
  • 带判断函数的:

    template <class Lock, class Rep, class Period, class Predicate>
    bool wait_for(Lock& lck,
                   const chrono::duration<Rep,Period>& rel_time, Predicate pred);
    

3.1.3 wait until()

作用:wait for()功能基本一致,只不过wait for是在指定时长内阻塞线程,而

wait until()是阻塞线程到指定时间点,到点线程会自动脱离阻塞态,继续去抢CPU时间片和锁。阻塞期间也可以被其它线程唤醒。

原型:

  • 不带判断函数的:

    template <class Lock, class Rep, class Duration>
    cv_statuc wair_until (Lock& lck, const chrono::time_point<Clock, Duration>& abs_time);
    
  • 带判断函数:

    template <class Lock, class Rep, class Duration, class Predicate>
    cv_statuc wair_until (Lock& lck, 
                          const chrono::time_point<Clock, Duration>& abs_time, Predicate pred);
    

3.1.4 通知函数

有两个通知函数:

  •   void notify_one() noexcept;
    

    唤醒一个被当前条件变量阻塞的线程。

  •   void notify_all() noexcept;
    

    唤醒所有被当前条件变量阻塞的线程。

五. 原子变量

1. 多线程下共享变量的问题

在进行多线程编程时,有时需要多个线程对同一个变量进行写操作,这往往会导致一些匪夷所思的问题。

1.1 i++问题

即:多个线程对同一个共享变量i进行++操作,出现的结果与预期结果并不固定。

这样的问题本质在于i++并不是一个原子操作,而是分为三步;

  1. i->reg:从内存中读取i的值到寄存器。
  2. inc-reg:在寄存器中将i的值加一。
  3. reg->i:将寄存器中的值写回到内存中。

多线程执行时,会出现以下情况:

  1. 线程A从内存读取变量i的值到寄存器,i==0
  2. 线程A对寄存器中的i的值++,寄存器中i==1
  3. 线程B抢到了时间片,线程A暂停。
  4. 线程B从内存中读取变量i的值到寄存器,i==0
  5. 线程B对寄存器中i的值++,寄存器中i==1
  6. 线程B将寄存器中i的值写回到内存,此时内存中i==1
  7. 线程A抢回时间片,线程B结束。线程A继续之前的步骤。
  8. 线程A将寄存器中的i写回到内存中,此时内存中i==1。

可以看到,在两个线程都对变量i进行++操作后,i最后只增加了1,而不是2。

1.2 指令重排问题

由于编译器不同等级的优化和CPU的乱序执行,有时代码的执行顺序和我们书写的顺序并不一致。例如如下两个线程的代码:

  • 线程A:

    a = 1;
    flag = true;
    
  • 线程B:

    if(flag==true){
    	assert(a == 1);
    }
    

理想情况下,如果代码按照书写顺序执行,无论是先执行B;或A执行a=1,随后执行B;还是A执行完毕再执行B,程序都不会被断言终止,可以正常运行。

但是,编译器和CPU可以对指令进行重排,这种重排可能导致一个线程内相互之间不存在依赖关系的指令交换执行顺序,从而获得更高的效率。而在上述代码中,线程A的两句代码看起来是没有关系的,那么就可能导致线程A的代码重排,而多线程有可能导致A和B的指令交叉执行,最后导致如下执行顺序:

flag = true;
if(flag==true){
    assert(a==1);			// 程序到此终止
}
a = 1;

程序断言失败,被迫终止。

为了解决i++问题和语序重排问题,我们有两种方案:

  • 加锁。
  • 原子操作。

2. 概述

C++11提供了原子类型std::atomic<T>,通过该原子类型管理的内部变量可以称之为原子变量,我们可以指定模板参数为boolcharintlong指针等类型。(不支持浮点类型和复合类型)。

2.1 原子操作

指一系列不可以被CPU上下文交换的机器指令,这些指令组合在一起就形成了原子操作。在多核CPU下,当某个CPU核心开始了原子操作时,会先暂停其它CPU内核对内存的操作。从而避免该操作被其余CPU内核所干扰。也就是说,在多线程情况下,一旦原子操作开始,它就必须运行到结束,中间不会有任何上下文切换。

2.2 可以定义为原子变量的类型

除了上文中指定的类型,用户自定义的类型也可以作为std::atomic<T>的模板参数,作为原子变量使用,前提是这个类型是TriviallyCopyable类型的。一个简单的判断方式是:

如果某类型可以使用std:memcpy按位进行复制,那么它就是TriviallyCopyable类型的,就可以封装为原子变量。

2.3 C++11实现方式

有两种,具体用哪种跟平台有关

  • 一般是通过使用CAS循环(全称为Compare and swap。它通过一条指令读取指定的内存地址,然后判断其中的值是否等于给定的前置值,如果相等,则将其修改为新的值。)
  • 有时会使用锁。

如何判断使用的哪种实现方式?

通过std::atomic<T>的成员函数is_lock_free()来判断,原型如下:

bool is_lock_free();
  • 返回值如果为1,说明没有使用锁;如果为0,说明用锁了。

3. atomic类成员

3.1 类定义

头文件:

#include <atomic>

定义:

template <class T>
struct atomic;

3.2 构造函数

  •   atomic() noexcept = default;
    

    默认无参构造函数。

    atomic<int> a;
    
  •   constexpr atomic(T desired) noexcept;
    

    使用desired初始化原子变量的值。

    atomic<int> a(10);
    
  •   atomic(const atomic&) = delete;
    

    显式删除拷贝构造函数,不允许进行原子变量对象之间的拷贝。

3.3 公共成员函数

3.3.1 重载=

重载=符号,用来对原子变量进行赋值,并且禁止使用=来对std::atomic对象进行拷贝赋值:

// 重载=操作符
T operator= (T desired) noexcept;
T operator= (T desired) volatile noexcept;

// 显式禁止了拷贝赋值操作。
atomic& operator=( const atomic& ) = delete;
atomic& operator=( const atomic& ) volatile = delete;

EG:

atomic<int> a;
a = 10;
atomic<int> b(100);
// a = b;		// error。因为显式禁止了拷贝赋值操作

3.3.2 store()

作用:

依照指定内存顺序,修改原子变量的值。

原型:

void store( T desired, std::memory_order order = std::memory_order_seq_cst ) noexcept;
void store( T desired, std::memory_order order = std::memory_order_seq_cst ) volatile noexcept;
  • desired:存储到原子变量中的值。
  • order:强制的内存顺序。一共有六种,后面会进行详细介绍。

3.3.3 load()

作用:

依照指定的内存顺序,加载原子变量的值。

原型:

T load( std::memory_order order = std::memory_order_seq_cst ) const noexcept;
T load( std::memory_order order = std::memory_order_seq_cst ) const volatile noexcept;
  • order:强制的内存顺序。一共有六种,后面会进行详细介绍。
  • 返回值:原子变量的值。

3.4 特化成员函数

复合赋值运算操作符重载:以整形和指针为例:

  • 整形:

    T operator+= (T val) volatile noexcept;
    T operator+= (T val) noexcept;
    T operator-= (T val) volatile noexcept;
    T operator-= (T val) noexcept;
    T operator&= (T val) volatile noexcept;
    T operator&= (T val) noexcept;
    T operator|= (T val) volatile noexcept;
    T operator|= (T val) noexcept;
    T operator^= (T val) volatile noexcept;
    T operator^= (T val) noexcept;
    
  • 指针:

    T operator+= (ptrdiff_t val) volatile noexcept;
    T operator+= (ptrdiff_t val) noexcept;
    T operator-= (ptrdiff_t val) volatile noexcept;
    T operator-= (ptrdiff_t val) noexcept;
    

重载操作符对应的fetch_*成员函数:

对于上述重载操作符,都有对应的fetch_*成员函数,如下表所示:

操作符 操作符重载函数 对应的成员函数 整形 指针 其他
+ atomic::operator += atomic::fetch_add
- atomic::operator -= atomic::fetch_sub
& atomic::operator &= atomic::fetch_and
| atomic::operator |= atomic::fetch_or
^ atomic::operator ^= atomic::fetch_xor

例如:

atomic<int> a(10);
a+=10;			// a的值为20
a.fetch(10);	// a的值为30

3.5 内存顺序约束

3.5.1 概述

原子变量自带的执行期间禁止CPU切换上下文的特性帮助我们解决了i++问题,但指令重排问题依旧未能解决。而内存顺序约束,正是解决这一问题的关键。

所谓内存顺序约束,其作用在于控制变量在不同线程中的顺序可见性问题。C++11中提供了六种可用顺序,用来指定如何同步不同线程的操作:

typedef enum memory_order {
    memory_order_relaxed,   // relaxed
    memory_order_consume,   // consume
    memory_order_acquire,   // acquire
    memory_order_release,   // release
    memory_order_acq_rel,   // acquire/release
    memory_order_seq_cst    // sequentially consistent
} memory_order;
  • memory_order_relaxed:这是最宽松的一种内存顺序约束,它对编译器和CPU不做任何限制,可以乱序。

  • memory_order_release释放。它设定内存屏障(Memory barrier),保证它之前的操作永远在它之前,但是它之后的操作可能被重新排在它的前面。一般用于store()函数。

  • memory_order_acquire获取。设定内存屏障,保障它之后的操作永远在它之后,但它之前的操作却有可能被重排到它后面。往往和memory_order_release在不同线程中搭配使用。一般用于load()函数。

  • memory_order_consume:改进版的memory_order_acquire。设定内存屏障,保障它之后的和该原子变量有依赖的操作永远在后面,其他的操作不确定。往往和memory_order_release在不同线程中搭配使用。一般用于load()函数。

  • memory_order_acq_rel:是memory_order_releasememory_order_acquire的结合。用用于读取-修改-写回这一类既有读取又有修改的操作,例如i++。保证该操作之前和操作不会重新排序到后面,之后的操作不会重新排序到前面。

  • memory_order_seq_cst顺序一致性。是memory_order_acq_rel的加强版。如果对原子变量的操作都是使用的memory_order_seq_cst内存序,则多线程行为相当于是这些操作都以一种特定顺序被一个线程执行,在哪个线程观察到的对这些原子量的操作都一样。同时,任何使用该选项的写操作都相当于release操作,任何读操作都相当于acquire操作,任何“读取-修改-写回”这一类的操作都相当于使用memory_order_acq_rel的操作。(是各个成员函数的内存默认选项

3.5.2 代价

总的爱说,越是严格的内存序其性能开销就会越大。如我们常用的x86处理器,这种处理器本身就支持release/acquire语义,因此release/acquire/consume都只影响编译器的优化,而memort_order_seq_cst还会影响处理器的指令编排。

在实际操作中,由于内存序的复杂性,建议还是使用锁来对语序进行保障。如果是在不行,建议使用memory_order_seq_cst这一默认选项,虽然性能消耗稍大了一点,但胜在安全。

3.4 C++20新增成员

功能函数:新增功能函数,可以通过原子类型来阻塞线程。

image-20231219182934152

类型别名:

image-20231219183014841 image-20231219183034447 image-20231219183055480 image-20231219183121425

4. 实例

4.1 i++

#include <iostream>
#include <thread>
#include <atomic>
using namespace std;

atomic<int> num(0);

void work(){
    for(int i = 0 ; i < 10000; i++){
        num++;
    }
}

int main(void){
    thread t[10];
    for(int i = 0; i < 10; i++){
        t[i] = thread(work);
    }
    for(int i = 0; i < 10; i++){
        t[i].join();
    }
    cout << num << endl;
    return 0;
}

六. 线程异步

所有相关内容都定义在头文件future中。

#include <future>

1. 异步

所谓异步,实际上是主线程和子线程做不同的事情,主线程不需要为了获取子线程的某个结果而一直等着子线程。例如点餐:

  • 异步:主线程点了一份面条,子线程做面条,主线程在这期间干别的事,而不是干等着面条。
  • 同步:主线程点了一份面条,子线程做面条,主线程在这期间一直等着子线程,啥事也没干。

2. std::future

2.1 定义

作用:用于承载子线程传递的数据。本身是一个模板类,可以存储任意指定类型的数据。需要注意的是,单独这个类无法实现线程间数据传递,必须和std::promisestd::packaged_taskstd::async配合使用。

定义:有三种,都定义于头文件<future>中:

  •   template <class T> 
      class future
    
  •   template <class T>
      class future<T&>
    
  •   template <>
      class future<void>
    

2.2 构造函数

  •   future() noexcept;
    

    无参构造函数。

  •   future(future&& other) noexcept;
    

    移动构造函数,转移资源的所有权。

  •   future(const future& other) = delete;
    

    显式删除拷贝构造函数,不允许future对象之间的拷贝。

2.3 常用成员函数

2.3.1 重载=操作符

  •   future& operator=( future&& other ) noexcep
    

    移动赋值函数,转移资源所有权。

  •   future& operator=( const future& other ) = delete;
    

    显式删除拷贝赋值函数,不允许future对象之间的拷贝。

2.3.2 get()

作用:

取出future对象内部保存的数据。

原型:有三种:

T get();
T& get();
void get();

其中返回值为void类型的函数是为了fucure<void>准备的,因此该对象内部的类型就是void

注意:

这是一个阻塞函数。在获取子线程的数据之前,主线程会阻塞在这里。

2.3.3 wait()

作用:

阻塞主线程,直到子线程将数据写入到future对象中。

原型:

void wait() const;

2.3.4 wait_for()/until()

作用:

wait_for()wait_until()二者功能基本一致。前者是阻塞到某一指定的时间点,后者是阻塞一定的时长。

原型:

  • wait_for():

    template <class Rep, class Period>
    std::future_status wait_for( const std::chrono::duration<Rep,Period>& timeout_duration ) const;
    
  • wait_until():

    template <class Rep, class Period>
    std::future_status wait_until( const std::chrono::time_point<Clock,Duration>& timeout_time ) const;
    

两个函数返回的都是子线程当前的状态,包括三种:

  • future_status::deferred:子线程中的任务函数尚未启动。
  • future_status::ready:子线程中的任务已经执行完毕,结果已经就绪。
  • future_status::timeout:子线程中的任务正在执行中,指定等待时长已经用完。

3. std::promise

std::promise是一个协助线程赋值的类,通过与future合作,可以实现线程之间的传值。

3.1 定义

定义于头文件<future>中。有三种定义方式:

  •   template <class R>
      class promise;
    
  •   template <class R>
      class promise<R&>;
    
  •   template<>
      class promise<void>
    

3.2 构造函数

  •   promise();
    

    无参构造函数。

  •   promise(promise&& other) noexcept;
    

    移动构造函数。

  •   promise(const promise& other) = delete;
    

    显式删除拷贝构造函数,禁止promise对象之间的拷贝。

3.3 常用成员函数

3.3.1 get_future()

作用:std::promise类内部管理者一个用于承载数据的future类对象,通过get_future()方法获取该对象。

原型:

std::future<T> get_future();

3.3.2 set_value()

作用:设置要传给外部线程的值,立即让状态就绪,这样外部线程就可以通过get_future()方法获取future对象,在通过future对象的get()方法获取传递出的值了。

原型:有四种:

void set_value(const R& value);
void set_value(R&& value);
void set_value(R& value);
void set_value();

3.3.3 set_value_at_thread_exit()

作用:存储要传给外部线程的值,但是当线程退出时状态才会就绪。也就是说,如果外部线程想获取传出的值,需要等到子线程结束,否则就会阻塞在future类的get()上。

3.4 使用

流程:promise传递数据必须搭配future类,一共需要5步

  1. 在主线程创建std::promise对象。
  2. 将这个std::promise对象通过引用的方式传递给子线程的任务函数。(注意使用std::ref()来传参)。
  3. 在子线程任务函数中给std::promise对象赋值。
  4. 在主线程中通过std::promise对象取出绑定的future实例对象。
  5. 通过future对象的get()方法取出子线程任务函数中返回的值。

EG:

  • 代码:

    #include <iostream>
    #include <thread>
    #include <chrono>
    #include <future>
    using namespace std;
    
    void work(int& x){
        cout << &x << endl;
    }
    
    int main(void){
        promise<int> pr;
        thread t1([](promise<int>& pr){
                this_thread::sleep_for(chrono::seconds(1));
                cout << "子线程休眠完毕" << endl;
                pr.set_value(100);
                },ref(pr));
        future<int> f = pr.get_future();
        int a = f.get();
        cout << "从子线程获取的数据为:" << a << endl;
        cout << "主线程id==" << this_thread::get_id() << endl;
        t1.join();
        return 0;
    }
    
  • 输出:程序阻塞到第19行大概1s后,输出如下内容:

    子线程休眠完毕
    从子线程获取的数据为:100
    主线程id==140541407467328
    
  • 分析:

    • 显然,future类的get()函数是一个阻塞函数,在获取到数据之前会让线程阻塞。
    • 传参:如果要向子线程的任务函数传递引用类型的参数,那么一定要使用std::ref(),否则会报错:static assertion failed: std::thread arguments must be invocable after conversion to rvalues。详情见个人博客。

4. std::packaged_task

std::packaged_task类包装了一个可调用对象包装器类对象,通过它可以对子线程的任务函数进行包装,然后传入子线程。此外,这个类内部内部维护了一个future类的对象,用来承接子线程传递出的值,而这个值就是子线程任务函数的返回值

4.1 类定义

定义于头文件<future>,是一个模板类,有两种定义形式:

  •   template <class> 
      class packaged_task;
    
  •   template <class R, class ...Args>
      class packaged_task<R(Args...)>;
    
    • R:返回值类型
    • Args:不定长参数列表。

4.2 构造函数

  •   paclaged_task() noexcept;
    

    无参构造,构造一个无任务的空对象。

  •   template <class F>
      explicit packaged_task( F&& f );
    

    通过一个可调用对象,构造一个任务对象。例如:

    packaged_task<int(int)> task([](int x){return x*2;});
    
  •   packaged_task(const packaged_task& ) = delete;
    

    显式禁止拷贝构造函数。

  •   packaged_task(packaged_task&& rhs) noexpcet;
    

    移动构造函数。

4.3 常用公共成员函数

4.3.1 get_future()

作用:获取类内部的future对象,里面封装的是线程任务函数的返回值。

原型:

std::future<R> get_future();

4.4 使用

步骤:必须配合future类使用,一共有四步:

  1. 使用std::packaged_task封装任务函数(记作变量x)。
  2. 用1中得到的x和函数的实参构建子线程。注意:传递x的时候一定要使用std::ref包装。
  3. 通过x调用函数get_future()获取future对象。
  4. 通过future对象调用get()方法获取值。(这里获取的值时第一步封装的任务函数的返回值。)

EG:

  • 代码:

    #include <iostream>
    #include <thread>
    #include <future>
    using namespace std;
    
    void work(int& x){
        cout << &x << endl;
    }
    
    int main(void){
        packaged_task<int(int)> task([](int x){
                    return x * 2;
                });
        thread t1(ref(task),50);			// 注意这里使用了std::ref()
        future<int> f = task.get_future();
        int a = f.get();
        cout << "从子线程获取的数据为:" << a << endl;
        t1.join();
        return 0;
    }
    
    
  • 输出:

    100
    
  • 分析:

    • future:里面封装的数据是std::packaged_task模板类封装的线程任务函数的返回值。这一点和std::promise不同。
    • 创建子线程:注意创建子线程时,由于任务函数被std::packaged_task模板类封装了,因此要使用std::ref进行包装,否则会报错。

5. std::async

std::async是一个函数,通过该函数可以直接启动一个子线程,并在该子线程中执行对应的任务函数。任务函数返回的结果会存储在一个future对象中,再通过这个future对象,我们可以获取子线程传递的值。

5.1 原型

std::async是一个模板函数,定义于头文件<future>中。原型有两种:

  •   template< class Function, class... Args>
      std::future<std::result_of_t<std::decay_t<Function>(std::decay_t<Args>...)>>
          async( Function&& f, Args&&... args );
    
    • 参数:
      • f:子线程任务函数。
      • args:传递给子线程任务函数的实参。
    • 返回值:future对象,里面的值是子线程任务函数的返回值。
  •   template< class Function, class... Args >
      std::future<std::result_of_t<std::decay_t<Function>(std::decay_t<Args>...)>>
          async( std::launch policy, Function&& f, Args&&... args );
    
    • 参数:
      • policy:子线程任务函数的执行策略。有两种:
        • std::launch::async:默认策略。创建新的线程执行任务函数。
        • std::launch::deferred:不执行任务函数,直到调用了futureget()wait()方法时才执行任务。需要注意的是:这种方法不会创建新的线程
      • f:子线程任务函数。
      • args:传递给子线程任务函数的实参。
    • 返回值:future对象,里面的值是子线程任务函数的返回值。

5.2 使用

5.2.1 调用async()函数直接创建子线程执行任务

  • 代码:

    #include <iostream>
    #include <thread>
    #include <chrono>
    #include <future>
    using namespace std;
    
    int main(void){
        future<int> f = async([](int x)->int {
                cout << "子线程ID==" << this_thread::get_id() << endl;
                this_thread::sleep_for(chrono::seconds(5));
                return x*2;
                }, 50);
        future_status fs;
        do{
            fs = f.wait_for(chrono::seconds(1));
            if(fs == future_status::deferred){
                cout << "子线程还没有开始" << endl;
            }else if(fs == future_status::ready){
                cout << "子线程执行完毕,返回结果为:" << f.get() << endl;
            }else if(fs == future_status::timeout){
                cout << "任务还未执行完毕,继续等待..." << endl;
            }
        }while(fs != future_status::ready);
        return 0;
    }
    
  • 输出:

    子线程ID==140233739269888
    任务还未执行完毕,继续等待...
    任务还未执行完毕,继续等待...
    任务还未执行完毕,继续等待...
    任务还未执行完毕,继续等待...
    子线程执行完毕,返回结果为:100
    
  • 分析:

    std::async创建子线程,子线程内部休眠5秒模拟处理任务耗时。任务函数的返回值就是future内部的值。第14-23行通过循环的方式查询子线程执行状况。

5.2.2 调用async()函数不创建子线程执行任务

  • 代码:

    #include <iostream>
    #include <thread>
    #include <future>
    using namespace std;
    
    int main()
    {
        cout << "主线程ID: " << this_thread::get_id() << endl;
        // 调用函数直接创建线程执行任务
        future<int> f = async(launch::deferred, [](int x) {
            cout << "子线程ID: " << this_thread::get_id() << endl;
            return x += 100;
        }, 50);
        cout << "主线程开始休眠5秒..." << endl;
        this_thread::sleep_for(chrono::seconds(5));
        cout << "主线程休眠结束" << endl;
        cout << f.get() << endl;;
    
        return 0;
    }
    
  • 输出:

    主线程ID==139747171080000
    主线程开始休眠5秒...
    主线程休眠结束
    子线程ID==139747171080000
    100
    
  • 分析:可以看到:因为采用了launch::deferred的策略,所以async()函数并不会创建新的线程执行任务,只有当返回的future类对象调用了get()或者wait()方法后才开始创建子线程,执行任务函数。(注意:使用wait_for()函数,并不会触发创建子线程。)

6. 总结

综上,共有三种方式进行线程间传递数据:

  • future+promise
  • future+packaged_task
  • future+async

它们的特点如下:

  • 使用std::promise类,在子线程中可以传出返回值也可以传出其他数据,并且可选择在什么时机将数据从子线程中传递出来,使用起来更灵活。
  • 使用std::packaged_task类,可以将子线程的任务函数进行包装,并且可以得到子线程的返回值。
  • 使用async()函数,是多线程操作中最简单的一种方式,不需要自己创建线程对象,并且可以得到子线程函数的返回值。