Structured Concurrency:结构化并发

发布时间 2023-11-29 02:26:55作者: circlelll

一、参考

https://ericniebler.com/2020/11/08/structured-concurrency/

二、总结

1. 结构化并发是什么-概述

是什么:一种确保子操作在父操作之前完成的方式,类似函数在调用函数之前完成。

最典型的结构化并发:C++20的协程

意义:它通过使异步生存期与普通C++词法作用域相对应,为异步程序带来了现代C++风格,并且不需要引用计数(智能指针,垃圾回收)来管理对象的生命周期

总结:即使在并发环境中,函数嵌套调用时参数的作用域也是严格嵌套的,不需要用智能指针(shared_ptr)之类的技术,也不会发生不小心的内存泄露--对象的生命周期

2. 为什么需要结构化并发

2.1 结构化编程

具有严格的嵌套作用域和生命周期,并发情况下需考虑

2.2 多线程编程的困境

并发场景下编程难度大:

①对象生命周期已经结束,但是其他线程仍需要访问,会导致悬空引用问题,此时需要智能指针通过引用计数来管理其生命周期

②与作用域相关的生命周期不清晰

2.3 非结构化并发

非结构化异步编程很难保证正确性和效率,且编程复杂,且没有办法强制要求子任务在父任务之前完成

3. 结构化并发-深入

3.1 库

协程:Lewis Baker’s popular cppcoro library

提升:

①只需要一个函数

②状态保存在本地变量,而不需要保存在需要的引用计数的共享变量

③可使用一般的c++错误处理技术

从结构保证了子操作在父操作之前完成

3.2 取消机制

从结构保证了子操作在父操作之前完成(也就是说,如果父操作先完成,需要等待子操作)

为了避免为不再需要结果的子操作等待不必要的长时间,我们需要一种能够取消这些子操作的机制,以便它们快速完成。

3.3 结构化并发>协程

结构化并发:特定模式中的回调,可以在没有协程的情况下实现结构化并发

4. 程序示例

场景:存在一个子操作,父任务需要此操作的结果

编译运行环境:ubuntu22.04.3

①单线程:主线程等待子线程结束

耗时操作:computeResult

调用者:doThing

#include <iostream>
#include <unistd.h>

struct State{
	int result;
};

// synchronous
void computeResult(State & s)
{
	int time = 30;
	sleep(time);	// 用sleep模拟耗时操作
	s.result = time;
}

int doThing() {
	State s;
	computeResult(s);
	return s.result;
}

int main()
{
	std::cout << "result: " << doThing() << std::endl;
}

// compile: g++ main.cpp
// output: result:30

关注State s;的声明周期

②使用std::future:获取子线程的结果

#include <future>
#include <iostream>
#include <unistd.h>

struct State{
    int result;
};

int computeResult(State &s)
{
    int time = 30;
    sleep(time);
    s.result = time;
    
    std::cout << "p1" << std::endl;
    
    return 1;
}

std::future<int> doThing() 
{
    std::cout << "p2" << std::endl;

    State s;
    std::future<int> fut = std::async(computeResult, s);
    return fut;
}

int main()
{
    std::cout << "p3" << std::endl;
    auto fut = doThing();
    std::cout << "result:" << fut.get() << std::endl;
}

// compile: g++ main.cpp
// 编译阶段不通过: 无法传递引用类型的函数参数
// main.cpp:24:38: error: no matching function for call to ‘async(int (&)(State&), State&)’
//    24 |     std::future<int> fut = std::async(computeResult, s);

注意State s;的生命周期,使用智能指针shared_ptr管理State s;

#include <future>
#include <iostream>
#include <unistd.h>

struct State{
    int result;
};

int computeResult(std::shared_ptr<State> s)
{
    int time = 30;
    sleep(time);
    (*s).result = time;

    std::cout << "p1" << std::endl;

    return 1;
}

std::future<int> doThing() 
{
    std::cout << "p2" << std::endl;

    std::shared_ptr<State> s = std::make_shared<State>(); 
    std::future<int> fut = std::async(computeResult, s);    // std::async可以传递带有函数及其参数(参数是引用类型无法会出错),但是boost::async传不了多个参数-解决:使用std::bind绑定
    return fut; // std::future没有类似boost::future的then函数
}

int main()
{
    std::cout << "p3" << std::endl;
    auto fut = doThing();
    std::cout << "result:" << fut.get() << std::endl;
}

// compile: g++ main.cpp
// output:
/*p3
p2
result:p1
1*/

②使用boost::future获取结果:父操作需要子操作的结果

参考:http://www.bryh.cn/a/142977.html

注意:使用boost::future需要设置宏#define BOOST_THREAD_PROVIDES_FUTURE

// 写法一
#define BOOST_THREAD_PROVIDES_FUTURE
#define BOOST_THREAD_PROVIDES_FUTURE_CONTINUATION
#include <boost/thread.hpp>
#include <boost/thread/future.hpp>
#include <iostream>

struct State{
    int result;
};

// asynchronous
boost::future<void> computeResult(State &s)
{
    int time = 30;
    sleep(time);
    s.result = time;
    std::cout << "p1" << std::endl;
}

boost::future<int> doThing() {
    State s;
    auto fut = computeResult(s);
    std::cout << "p2" << std::endl;
    return fut.then(
      [&](auto&&) { return s.result; }); //OOPS
}

int main()
{
    std::cout << "p3" << std::endl;
    auto fut = doThing();
    std::cout << "result:" << fut.get() << std::endl;
}

// compile: g++ main.cpp -lboost_thread
// output: 
/*
p3
p1
p2
terminate called after throwing an instance of 'boost::wrapexcept<boost::lock_error>'
  what():  boost: mutex lock failed in pthread_mutex_lock: Invalid argument
已放弃 (核心已转储)
*/

// 写法二
#define BOOST_THREAD_PROVIDES_FUTURE
#define BOOST_THREAD_PROVIDES_FUTURE_CONTINUATION
#include <boost/thread.hpp>
#include <boost/thread/future.hpp>
#include <iostream>
#include <memory>
#include <unistd.h>

struct State{
    int result;
};

int computeResult(State &s)
{
    std::cout << "p1" << std::endl;

    int time = 30;
    sleep(time);
    s.result = time;
    return 1;
}

boost::future<int> doThing() {
    std::cout << "p2" << std::endl;

    State s;
    boost::future<int> fut = boost::async(boost::bind(computeResult, s));
    return fut.then(
     [&](auto&&) { return s.result; }); //OOPS
}

int main()
{
    std::cout << "p3" << std::endl;
    auto fut = doThing();
    std::cout << "result:" << fut.get() << std::endl;
}

// compile: g++ main.cpp -lboost_thread
// output: 
/*
p3
p2
result:p1
21978
*/

写法一问题:State s;的声明周期:其超出doThing作用域范围消亡时,computeResult仍可能在访问或使用这个变量,此时引用悬空,引发程序崩溃,即当computeResult线程试图访问或修改已经被销毁的State对象时,会发生未定义的行为。在这种情况下,尝试从返回的future对象中获取结果可能会导致不可预测的结果或程序崩溃。

写法二问题:未获取到正确的值,因为s是局部的,并且当doThing返回时会被销毁,所以尝试访问它的result成员是未定义的行为(标记为"OOPS")

解决:使用智能指针shared_ptr管理

#define BOOST_THREAD_PROVIDES_FUTURE
#define BOOST_THREAD_PROVIDES_FUTURE_CONTINUATION
#include <boost/thread.hpp>
#include <boost/thread/future.hpp>
#include <iostream>
#include <memory>
#include <unistd.h>

struct State{
    int result;
};

int computeResult(std::shared_ptr<State> s)
{
    std::cout << "p1" << std::endl;

    int time = 30;
    sleep(time);
    (*s).result = time;
    return 1;
}

boost::future<int> doThing() {
    std::cout << "p2" << std::endl;

    std::shared_ptr<State> s = std::make_shared<State>();
    boost::future<int> fut = boost::async(boost::bind(computeResult, s));
    return fut.then(
     [&](auto&&) { return (*s).result; });
}

int main()
{
    std::cout << "p3" << std::endl;
    auto fut = doThing();
    std::cout << "result:" << fut.get() << std::endl;
}

// compile: g++ main.cpp -lboost_thread
// output: 
/*
p3
p2
result:p1
30
*/

④使用结构化并发:cppcoro库---一个用于C++20的协程库,它提供了一个轻量级和易于使用的协程支持

未实际编译

cppcoro::task<> computeResult(State & s);
 
cppcoro::task<int> doThing() {
  State s;
  co_await computeResult(s);
  co_return s.result;
}