c++并发编程实战-第2章 线程管控-读书笔记

发布时间 2023-09-01 15:51:20作者: 西兰花战士

线程的基本管控

每个应用程序都至少拥有一个线程,即运行main函数的线程,称为主线程,它由c++运行时系统启动。我们可以在软件运行中产生其他线程,它们以指定的函数作为入口函数。当main函数返回后,程序会退出;同样,当入口函数返回后,与之对应的线程结束。

发起线程

线程是通过构造std::thread对象启动,该对象指明要运行的任务。任务可以是一个普通的函数,也可以是一个可调用对象。只要是通过C++标准库启动线程,归根到底,代码总会构造一个std::thread对象。std::thread构造函数定义如下:

explicit thread(_Fn&& _Fx, _Args&&... _Ax);

//_Fx:可调用对象
//_Ax:传递给可调用对象的参数包

下面列举出构建线程的几种方法:

方法一:普通函数

1 void do_some_work(int nVal);
2 
3 int a = 10;
4 std::thread my_thread(do_some_work, a);

方法二:可调用对象

 1 class someWork
 2 {
 3 public:
 4     void operator()()const 
 5     {
 6         do_some_work();
 7     }
 8 };
 9 
10 someWork sw;
11 std::thread my_thread(sw);

在使用函数对象构造线程时,需要注意“C++最麻烦的解释”问题。即在构造std::thread对象时,传递临时对象,而不是具名变量,那么调用构造函数的语法可能与函数声明相同,产生二义性。例如:

1 std::thread my_thread(someWork());

本意是想构造一个线程对象,却被解释为函数声明,返回值类型为std::thread,函数名为my_thread,函数参数是一个函数指针,该函数指针没有参数和函数名,返回someWork类型对象,类似于:

 1 void func(int(x), int())
 2 {
 3     std::cout << x << " : " /*<< y */<< std::endl;
 4 }
 5 
 6 int test() { return 10; }
 7 
 8 int main()
 9 {
10     func(10, test);
11 }

为了避免出现上述情况,可以用初始化列表或者多加一对括号:

1 std::thread my_thread{someWork()};    //初始化列表
2 std::thread my_thread((someWork()));  //多加一对括号

方法三:lambda表达式

1 auto cbk = [](){ do_some_work(); };
2 std::thread my_thread(cbk);

方法四:类对象的成员函数作为入口函数 

 1 class ThreadEntry
 2 {
 3 public:
 4     void Process_some_data(int x) 
 5     {
 6         cout << "thread_id : " << this_thread::get_id() << endl;
 7     }
 8 };
 9 
10 int main()
11 {
12     cout << "main thread id : " << this_thread::get_id() << endl;
13     ThreadEntry entry;
14     std::thread th(&ThreadEntry::Process_some_data, &entry, 20);
15     th.join();
16 }

等待与分离

一旦std::thread对象被构建后,线程就可能会被执行,因此,我们需要明确指定是等待该线程结束,或是任由它独自运行。假设等到std::thread对象销毁时还未决定好,那么std::thread对象的析构函数将调用std::terminate函数终止整个程序。因此,创建线程后,必须调用如下函数中的一个:

1 th.join();      //等待
2 th.detach();     //分离

等待

若需等待线程完成,那么可以在与之关联的std::thread实例上,通过调用成员函数join()实现。该函数会阻塞当前线程,并等待线程示例结束后join()函数才返回。

 1 void do_some_work()
 2 {
 3     std::cout << "do_some_work start" << std::endl;
 4     std::this_thread::sleep_for(std::chrono::seconds(3));
 5     std::cout << "do_some_work end" << std::endl;
 6 }
 7 
 8 int main()
 9 {
10     std::thread th(do_some_work);
11     std::cout << "main start" << std::endl;
12     th.join();    //会等待子线程入口函数返回后该函数才返回
13     std::cout << "main end" << std::endl;
14 }

输出:

main start
do_some_work start
do_some_work end
main end

如果子线程中处理的数据量特别大,那么join()函数可能等到天荒地老。也有可能在调用join()函数时,该线程已经结束,那么join()将立即返回。如果只想等待特定事件,可以通过条件变量或者future完成,会在后续章节中介绍。join()函数只能调用一次;只要线程对象曾经调用过join()函数,该线程就不可再汇合,可以通过joinable()成员函数判断当前线程是否可汇合,当线程已经调用过join()函数,那么joinable()函数将返回false。

如果选择等待线程结束,则需要选择合适的位置来调用join()函数。最重要一个原因是线程启动后有异常抛出,而join()尚未执行,则该join()调用会被略过,从而导致应用程序终止。可以运用RAII手法,构造一个thread_guard类,在析构函数中调用join()。代码如下:

 1 class thread_guard
 2 {
 3 public:
 4     explicit thread_guard(std::thread& th) : m_th(th) {}
 5     ~thread_guard() 
 6     {
 7         if (m_th.joinable())  //避免多次调用join
 8             m_th.join();
 9     }
10     thread_guard(const thread_guard&) = delete;
11     thread_guard& operator=(const thread_guard&) = delete;
12 
13 private:
14     std::thread& m_th;
15 };

用法如下:

 1 void threadEntry(int* pData)
 2 {
 3     ...
 4 }
 5 
 6 void func()
 7 {
 8     int nLocalData = 0;
 9     std::thread th(threadEntry(&nLocalData));
10     thread_guard(th);
11     do_something_in_current_thread();    //该函数可能导致异常
12 }

即使在do_something_in_current_thread()函数中发生了异常行为,线程对象同样也能正确调用join函数汇合。

分离

若不需要等待线程结束,可以调用detach成员函数将线程分离,从而避免异常引发的安全问题。分离操作会切断线程和std::thread对象间的关联,后续无法通过std::thread对象再操作该线程。被分离后的线程会在后台运行,由c++运行时库托管,当该线程运行完毕,与之对应的资源将被回收。

如果要把std::thread对象和线程分离,就必须存在与其关联的执行线程。若没有与其关联的执行线程,便不能在std::thread对象上凭空调用detach()。可以用joinable()函数检测。仅当joinable()函数返回true时,才能调用detach函数。

以下做法极不可取:意图在函数中创建线程,并让线程访问函数的局部变量。除非线程肯定会在该函数退出前结束,否则切勿这么做。代码如下:

 1 class ThreadOperator
 2 {
 3 public:
 4     ThreadOperator(int& nData) : m_nData(nData) {}
 5     void operator()()
 6     {
 7         do_some_work(m_nData);  //对象的引用,可能该对象已经被销毁
 8     }
 9 private:
10     int& m_nData;
11 };
12 
13 void func()
14 {
15     int nLocalData = 0;
16     ThreadOperator op(nLocalData);
17     std::thread  th(op);
18     th.detach();                //不等待线程结束
19 }

可能func函数已经结束,而线程还在运行,此时访问nLocalData将会出现崩溃。 上述情形的处理方法通常是:令线程函数完全自含(self-contained),即将数据复制到新线程内部,而不是共享数据。

1 ThreadOperator(int nData) : m_nData(nData) {}

向线程函数传递参数

若需向新线程上的函数或可调用对象传递参数,方法相当简单,直接向std::thread的构造函数增添更多参数即可。

 1 class someWork
 2 {
 3 public:
 4     void operator()(int x, double y)const
 5     {
 6         cout << x << " : " << y << endl;  // 10 : 23.5
 7     }
 8 };
 9 
10 int main()
11 {
12     std::thread th(someWork(), 10, 23.5);    //根据参数表依次传递
13     th.join();
14 }

参数传递流程

std::thread对象拥有自己的内部存储空间,传递的参数首先会按照默认方式拷贝到std::thread对象内部,之后以右值形式移动给线程的入口函数,即一次拷贝,一次移动。例如:

 1 class Res_Data
 2 {
 3 public:
 4     Res_Data()
 5     {
 6         cout<<"Constractor"<<endl;
 7     }
 8 
 9     ~Res_Data()
10     {
11         cout<<"Destractor"<<endl;
12     }
13 
14     Res_Data(const Res_Data& )
15     {
16         cout<<"Copy Constractor"<<endl;
17     }
18 
19     Res_Data& operator=(const Res_Data&)
20     {
21         cout<<"Copy Assignment Operator"<<endl;
22         return *this;
23     }
24 
25     Res_Data(Res_Data&& )
26     {
27         cout<<"Move Constractor"<<endl;
28     }
29 
30     Res_Data& operator=(Res_Data&&)
31     {
32         cout<<"Move Assignment Operator"<<endl;
33         return *this;
34     }
35 };
36 
37 void func(Res_Data data) { }
38 
39 int main()
40 {
41     Res_Data data;
42     thread th(func, data);
43     th.join();
44     cout<<"Main End"<<endl;
45     return 0;
46 }

输出:

Constractor
Copy Constractor
Move Constractor
Destractor
Destractor
Main End
Destractor

注:上面代码在不同的编译器、相同编译器的不同版本可能存在差异,如有需要,请查询c++返回值优化(RVO机制)。本例是在Qt Creator 6.2.4 msvc2019 64位上测试得出。

将func()函数的参数换成引用类型:

1 void func(const Res_Data& data) { }

输出:

1 Constractor
2 Copy Constractor      //依旧发生了一次拷贝构造
3 Destractor
4 Main End
5 Destractor

我们发现,在传递参数时,即使入口函数是使用的“万能引用(const reference)”,依旧会发生一次拷贝构造,这一点非常重要。如果想避免此次拷贝,可以在构造线程对象时,用ref函数包裹对象,例如:

1 thread th(func, std::ref(data));

输出:

Constractor
Main End
Destractor

如果代码使用的detach()函数汇合,再次强调,不要向线程的入口函数传递局部变量的引用或者指针

注意点1

上面的func函数中,使用的是万能引用,实际上还可以用移动语义,但切记不要使用普通引用:

1 void func(const Res_Data& data) { }    //万能引用
2 void func(Res_Data&& data) { }         //移动语义
3 void func(Res_Data& data) { }          //error

原因是:std::thread对象在传递参数给入口函数时,使用的是右值。而普通引用只能引用左值。

注意点2

如果传递的是指针,使用detach同样会出现访问局部变量的问题,例如:

 1 void func(char* pData)
 2 {
 3     cout << pData << endl;               //pData : 0x00effdac
 4 }
 5 
 6 int main()
 7 {
 8     char pBuf[] = "abcdef";             //pBuf : 0x00effdac
 9     thread th(func, pBuf);
10     th.detach();
11 }

我们发现,pData 与 pBuf地址相同,此时同样会出现异常。接下来我们来改进上面代码,将func()入口函数中的char*改为string类型,通过string的构造函数将char*隐式转换为string类型。代码如下:

void func(string sData)
{
    cout << sData<< endl;
}

int main()
{
    char pBuf[] = "abcdef";
    thread th(func, pBuf);      //pBuf会调用string的构造函数构造string对象
    th.detach();
}

上面代码实际上是存在缺陷的,在工作中我们很容易遇到类似的问题。前面说过,std::thread对象存在自己的内部存储空间,将pBuf传递给std::thread对象时,会直接等位拷贝一份pBuf,此时这两个指针都指向同一份内存。假设此时main函数执行结束,子线程还未运行,后续当子线程运行时,通过拷贝的指针去构建对象,此时内存已经被回收,产生异常。那到底应该如何解决这个问题呢?

1 int main()
2 {
3     char pBuf[] = "abcdef";
4     thread th(func, string(pBuf));//pBuf会调用string的构造函数构造string对象
5     th.detach();
6 }

只需要手动构建string对象即可。切记:在向线程传递对象时,不要使用对象的隐式类型转换

注意点3

当一个类对象仅支持移动语义时,需要手动调用std::move来转移对象的归属权。比如,如果需要向线程入口函数传递一个std::unique_ptr对象,该对象只支持移动,而不支持拷贝。例子如下:

 1 class big_obj
 2 {
 3 public:
 4     big_obj() {}
 5     void setdata(int nData) { m_nData = nData; }
 6 
 7     void print() 
 8     {
 9         cout << m_nData << endl;
10     }
11 
12 private:
13     int m_nData = 0;
14 };
15 
16 void process_big_object(std::unique_ptr<big_obj> _ptr)
17 {
18     cout << "son thread : " << this_thread::get_id() << endl;
19     _ptr->print();
20 }
21 
22 int main()
23 {
24     cout << "main thread : " << this_thread::get_id() << endl;
25     std::unique_ptr<big_obj> pObj = std::make_unique<big_obj>();  //构造std::unique_ptr对象
26     pObj->setdata(10);
27     std::thread th(process_big_object, std::move(pObj));  //将std::unique_ptr对象传递给线程入口函数
28     th.join();
29 }

移交线程归属权

std::thread对象仅支持移动语义,而不支持对象拷贝。对于一个具体的执行线程,其归属权可以在不同的作用域间转换,例如:

 1 void do_some_work() {}
 2 void do_work(){}
 3 
 4 std::thread th1(do_some_work);
 5 std::thread th2(std::move(th1));
 6 std::thread th3;
 7 th3 = std::move(th2);
 8 
 9 std::thread th4(do_work);
10 th4 = std::move(th3);      //error

切记:当一个std::thread对象正在管控一个线程,就不能将新的线程交由其管控,否则将会导致程序崩溃

移动语义允许函数向外部转移线程的归属权,同时也允许将线程的归属权转移至某个函数内,示例如下:

 1 void do_work(){}
 2 
 3 std::thread GetThread()
 4 {
 5     std::thread th(do_work);
 6     return th;    //将该线程交由外部管理
 7 }
 8 
 9 void ProcessThread(std::thread th)    //交由某个函数管理
10 {
11     if(th.joinable())
12         th.join();
13 }

因为std::thread支持移动语义,所以只要容器同样知悉移动意图,就可以将std::thread作为元素存入容器中。因此,我们可以产生多个线程对象,将其保存在容器中,集中管理。

void func()
{
    std::vector<std::thread> vThreads;
    for (int i = 0; i < 10; ++i)
    {
        vThreads.emplace_back(do_work, i);
    }

    for (auto& itr : vThreads)
        itr.join();
}

在运行时选择线程数量

 

可以使用标准库提供的std::thread::hardware_concurrency()函数,该函数用于获取软件运行时可用于并发的线程数量。例如在多核系统上,该值可能是CPU的核芯数量。若线程信息无法获取,该值可能返回0。若需要使用多线程分解完整的任务,该值不失为一个有用的指标。

例如,下面例子用于实现一个多线程版本的accumulate累加函数:

 1 template<typename Iterator, typename T>
 2 struct accumulate_block
 3 {
 4     void operator()(Iterator first, Iterator last, T& result)
 5     {
 6         result = std::accumulate(first, last, 0);
 7     }
 8 };
 9 
10 template<typename Iterator, typename T>
11 T parallel_accumulate(Iterator first, Iterator last, T init)
12 {
13     unsigned long const length = std::distance(first, last);
14     if (!length)
15         return init;
16 
17     unsigned long const min_per_thread = 25;
18     unsigned long const max_thread = (length + min_per_thread - 1) / min_per_thread;
19     unsigned long const hardware_threads = std::thread::hardware_concurrency();
20     unsigned long const num_thread = std::min(hardware_threads != 0 ? hardware_threads : 2, max_thread);
21 
22     unsigned long const block_size = length / num_thread;
23 
24     std::vector<T>results(num_thread);
25     std::vector<std::thread> ths(num_thread - 1);
26     Iterator block_start = first;
27 
28     for (int i = 0; i < num_thread - 1; ++i)
29     {
30         Iterator block_end = block_start;
31         std::advance(block_end, block_size);
32 
33         accumulate_block<Iterator, T> _block;
34         std::thread th = std::thread(_block, block_start, block_end, std::ref(results[i]));
35         ths[i] = std::move(th);
36         block_start = block_end;
37     }
38     accumulate_block<Iterator, T>()(block_start, last, std::ref(results[num_thread - 1]));
39 
40     for (auto& itr : ths)
41         itr.join();
42 
43     return std::accumulate(results.begin(), results.end(), init);
44 }

识别线程

每个线程都有标识自己的唯一ID,类型为std::thread::id。可以通过std::thread类对象成员get_id()函数获取该线程的线程ID,也可以通过std::this_thread::get_id()函数获取。这两个函数的区别在于,前者需要获取std::thread对象,由该对象调用成员函数,而后者则是全局函数,可以在任意位置调用,该函数返回当前线程的线程ID。

std::thread::id类型作为标识线程ID的类,可以随意进行复制操作或比较运算。如果两个ID对象相等,则表示他们是同一个线程,或者表示这两个线程都不存在。std::thread::id类型可以用作关联容器的键值。

Copyright

本文参考至《c++并发编程实战》 第二版,作者:安东尼·威廉姆斯。本人阅读后添加了自己的理解并整理,方便后续查找,可能存在错误,欢迎大家指正,感谢!