C++ 多线程详解之异步编程 std::packaged_task

发布时间 2023-08-18 09:30:52作者: 冰山奇迹

std::packaged_task 将任何可调用对象(比如函数、lambda 表达式等等)封装成一个 task,可以异步执行。执行结果可以使用 std::future 获取。

比如下面的例子,构造一个 std::packaged_task 后,get_future() 函数返回一个 std::future 对象,可以获取 task 异步或者同步执行的结果。

#include <cmath>
#include <functional>
#include <future>
#include <iostream>
#include <thread>

// unique function to avoid disambiguating the std::pow overload set
int f(int x, int y)
{
return std::pow(x, y);
}

void task_lambda()
{
std::packaged_task<int(int, int)> task([](int a, int b)
{
return std::pow(a, b); });
}
std::future<int> result = task.get_future();

task(2, 9);

std::cout << "task_lambda:\t" << result.get() << std::endl;
}

void task_bind()
{
std::packaged_task<int()> task(std::bind(f, 2, 11));
std::future<int> result = task.get_future();

task();

std::cout << "task_bind:\t" << result.get() <<
std::endl;
}

void task_thread()
{
std::packaged_task<int(int, int)> task(f);
std::future<int> result = task.get_future();

std::thread task_td(std::move(task), 2, 10);
task_td.join();

std::cout << "task_thread:\t" << result.get() <<
std::endl;
}

int main()
{
task_lambda();
task_bind();
task_thread();
}

std::packaged_task 是如何实现的呢?下面就剖析 STL 源码,分析其实现原理。通过前面的分析,std::future 有一个成员变量,保存 _State_base 的子类对象。std::packaged_task 就拓展了 _State_base 的功能,用于绑定到 std::future,用于传递 task 状态。

1、_Task_state

在介绍 _Task_state 之前,先介绍其基类 _Task_state_base。_Task_state_base 继承 __future_base::_State_base 。1)定义了 _Task_state 的接口;2)定义了一个成员变量,用于保存 result。该成员变量在构造函数中被初始化。

  template<typename _Res, typename... _Args>
struct __future_base::_Task_state_base<_Res(_Args...)>
: __future_base::_State_base
{
typedef _Res _Res_type;

template<typename _Alloc>
_Task_state_base(const _Alloc& __a)
: _M_result(_S_allocate_result<_Res>(__a))
{ }

// Invoke the stored task and make the state ready.
virtual void
_M_run(_Args&&... __args) = 0;

// Invoke the stored task and make the state ready at thread exit.
virtual void
_M_run_delayed(_Args&&... __args, weak_ptr<_State_base>) = 0;

virtual shared_ptr<_Task_state_base>
_M_reset() = 0;

typedef __future_base::_Ptr<_Result<_Res>> _Ptr_type;
_Ptr_type _M_result;
};

_Task_state 继承 _Task_state_base,实现接口函数。此外,新增加一个成员变量 _M_impl,用于保存可调用对象。

  // Holds a packaged_task's stored task.
template<typename _Fn, typename _Alloc, typename _Res, typename... _Args>
struct __future_base::_Task_state<_Fn, _Alloc, _Res(_Args...)> final
: __future_base::_Task_state_base<_Res(_Args...)>
{
template<typename _Fn2>
_Task_state(_Fn2&& __fn, const _Alloc& __a)
: _Task_state_base<_Res(_Args...)>(__a),
_M_impl(std::forward<_Fn2>(__fn), __a)
{ }

//...
struct _Impl : _Alloc
{
template<typename _Fn2>
_Impl(_Fn2&& __fn, const _Alloc& __a)
: _Alloc(__a), _M_fn(std::forward<_Fn2>(__fn)) { }
_Fn _M_fn;
} _M_impl;
};

下面是接口 _M_run() 和 _M_run_delayed() 的实现。

_M_run() 函数将可调用对象和参数打包成一个 lambda 表达式,构造成一个 _S_task_setter 对象,然后传入 _M_set_result() 函数。_M_set_result() 函数会调用传入的 lambda 表达式,然后将结果传到 result。

_M_set_delayed_result() 函数也是相同的实现逻辑。

      virtual void
_M_run(_Args&&... __args)
{
auto __boundfn = [&] () -> _Res {
return std::__invoke_r<_Res>(_M_impl._M_fn,
std::forward<_Args>(__args)...);
};
this->_M_set_result(_S_task_setter(this->_M_result, __boundfn));
}

virtual void
_M_run_delayed(_Args&&... __args, weak_ptr<_State_base> __self)
{
auto __boundfn = [&] () -> _Res {
return std::__invoke_r<_Res>(_M_impl._M_fn,
std::forward<_Args>(__args)...);
};
this->_M_set_delayed_result(_S_task_setter(this->_M_result, __boundfn),
std::move(__self));
}

然后是 _M_reset() 函数实现:使用移动语义,用可调用对象重新构造一个 _Task_state_base 对象。

  template<typename _Signature, typename _Fn,
typename _Alloc = std::allocator<int>>
static shared_ptr<__future_base::_Task_state_base<_Signature>>
__create_task_state(_Fn&& __fn, const _Alloc& __a = _Alloc())
{
typedef typename decay<_Fn>::type _Fn2;
typedef __future_base::_Task_state<_Fn2, _Alloc, _Signature> _State;
return std::allocate_shared<_State>(__a, std::forward<_Fn>(__fn), __a);
}

template<typename _Fn, typename _Alloc, typename _Res, typename... _Args>
shared_ptr<__future_base::_Task_state_base<_Res(_Args...)>>
__future_base::_Task_state<_Fn, _Alloc, _Res(_Args...)>::_M_reset()
{
return __create_task_state<_Res(_Args...)>(std::move(_M_impl._M_fn),
static_cast<_Alloc&>(_M_impl));
}

通过上面的分析,只要调用 _M_run() 函数,传入的可调用对象就会被执行,其结果被设置到 result。

2、std::packaged_task

std::packaged_task 有一个 _Task_state_base 的成员变量 _M_state,在构造函数中被初始化。get_future() 就是使用 _M_state 构造一个 std::future 对象返回。

std::packaged_task 重载了调用运算符 operator(),成员可调用对象。一旦调用,_M_run() 函数就会被执行。

  /// packaged_task
template<typename _Res, typename... _ArgTypes>
class packaged_task<_Res(_ArgTypes...)>
{
typedef __future_base::_Task_state_base<_Res(_ArgTypes...)> _State_type;
shared_ptr<_State_type> _M_state;

// _GLIBCXX_RESOLVE_LIB_DEFECTS
// 3039. Unnecessary decay in thread and packaged_task
template<typename _Fn, typename _Fn2 = __remove_cvref_t<_Fn>>
using __not_same
= typename enable_if<!is_same<packaged_task, _Fn2>::value>::type;

public:
// Construction and destruction
packaged_task() noexcept { }

template<typename _Fn, typename = __not_same<_Fn>>
explicit
packaged_task(_Fn&& __fn)
: _M_state(
__create_task_state<_Res(_ArgTypes...)>(std::forward<_Fn>(__fn)))
{ }

#if __cplusplus < 201703L
// ...
#endif

~packaged_task()
{
if (static_cast<bool>(_M_state) && !_M_state.unique())
_M_state->_M_break_promise(std::move(_M_state->_M_result));
}

// No copy
packaged_task(const packaged_task&) = delete;
packaged_task& operator=(const packaged_task&) = delete;

// Move support
packaged_task(packaged_task&& __other) noexcept
{ this->swap(__other); }

packaged_task& operator=(packaged_task&& __other) noexcept
{
packaged_task(std::move(__other)).swap(*this);
return *this;
}

void
swap(packaged_task& __other) noexcept
{ _M_state.swap(__other._M_state); }

bool
valid() const noexcept
{ return static_cast<bool>(_M_state); }

// Result retrieval
future<_Res>
get_future()
{ return future<_Res>(_M_state); }

// Execution
void
operator()(_ArgTypes... __args)
{
__future_base::_State_base::_S_check(_M_state);
_M_state->_M_run(std::forward<_ArgTypes>(__args)...);
}

void
make_ready_at_thread_exit(_ArgTypes... __args)
{
__future_base::_State_base::_S_check(_M_state);
_M_state->_M_run_delayed(std::forward<_ArgTypes>(__args)..., _M_state);
}

void
reset()
{
__future_base::_State_base::_S_check(_M_state);
packaged_task __tmp;
__tmp._M_state = _M_state;
_M_state = _M_state->_M_reset();
}
};