概述
Asio 是一个用于网络和低级 I/O 编程的跨平台 C++ 库,它使用现代 C++ 方法为开发人员提供一致的异步模型.
io_context
io_context 类为异步I/O对象的用户提供了核心I/O功能,包含:
asio::ip::tcp::socket
asio::ip::tcp::acceptor
asio::ip::udp::socket
asio::deadline_timer
线程安全[1]
Boost.Asio 的线程安全模型。对于大多数 Boost.Asio 对象,在一个对象上挂起多个异步操作是安全的;只是指定对该对象的并发调用是不安全的。在下图中,每一列代表一个线程,每一行代表一个线程在某个时刻正在做什么。
单个线程进行顺序调用而其他线程不进行调用是安全的:
thread_1 | thread_2
--------------------------------------+------------ ----------------------------
socket.async_receive(...); | ...
socket.async_write_some(...); | ...
多个线程进行调用是安全的,但不能同时进行:
thread_1 | thread_2
--------------------------------------+------------ ----------------------------
socket.async_receive(...); | ...
... | socket.async_write_some(...);
但是,多个线程并发调用是不安全的
thread_1 | thread_2
--------------------------------------+------------ ----------------------------
socket.async_receive(...); | socket.async_write_some(...);
... | ...
向io_context 提交任意任务
要向 io_context 提交函数,请使用 asio::dispatch、asio::post 或 asio::defer 自由函数。
void my_task()
{
//...
}
int main()
{
asio::io_context io_context;
//提交一个函数
asio::post(io_context, my_task);
//提交一个lambda 表达式
asio::post(io_context, [](){
//...
});
//运行 io_context 直到它用完为止。
io_context.run();
return 0;
}
阻止 io_context 耗尽工作
某些应用程序可能需要阻止 io_context 对象的 run() 调用在没有更多工作要做时返回。 例如,io_context 可能在应用程序的异步操作之前启动的后台线程中运行。 run() 调用可以通过创建一个针对 io_context 跟踪工作的执行器来保持运行:
int main()
{
asio::io_context io_context;
auto work = asio::require(io_context.get_executor(), asio::execution::outstanding_work.tracked);
return 0;
}
为了实现关闭,应用程序需要调用 io_context 对象的 stop() 成员函数。 这将导致 io_context run() 调用尽快返回,放弃未完成的操作并且不允许分派准备好的处理程序。
或者,如果应用程序要求允许所有操作和处理程序正常完成,请将工作跟踪执行程序存储在 any_io_executor 对象中,以便可以显式重置它。
int main()
{
asio::io_context io_context;
asio::any_io_executor work = asio::require(io_context.get_executor(), asio::execution::outstanding_work.tracked);
//...
work = asio::any_io_executor();//允许 run() 退出
return 0;
}
io_context.run()
当线程调用 io_context.run() 时,工作和处理程序将从该线程内调用。
asio::io_context io_context;
asio::ip::tcp::socket socket(io_context);
io_context.post(&print); // 1
socket.connect(endpoint); // 2
socket.async_receive(buffer, &handle_async_receive); // 3
io_context.post(&print); // 4
io_context.run(); // 5
在上面的示例中,io_context .run()(5) 将阻塞直到:
- 它已从两个print处理程序调用并返回,接收操作成功或失败完成,并且其handle_async_receive处理程序已被调用并返回。
- 通过 io_context 明确停止 io_context.stop()。
- 从处理程序中抛出异常。
请注意,当 io_context 工作用完时,应用程序必须 reset(), 在 io_context 再次运行之前。
run() 函数会阻塞,直到所有工作完成并且没有更多的处理程序要分派,或者直到 io_context 停止。
示例:
#include <iostream>
#include <asio.hpp>
int main()
{
asio::io_context io_context;
io_context.run();
std::cout << "Do you reckon this line displays?" << std::endl;
return 0;
}
//output: Do you reckon this line displays?
- 如果我们的程序用完了怎么办?工作类是一个“在有工作要做时通知 io_context 的类"。换句话说,只要 io_context 有一个与之关联的工作对象,它就永远不会无事可做。
示例:
#include <iostream>
#include <asio.hpp>
int main()
{
asio::io_context io_context;
asio::io_context::work work(io_context);
io_context.run();
std::cout << "Do you reckon this line displays?" << std::endl;
return 0;
}
//output:
- 如果我们不喜欢这种必须阻塞线程来工作的想法怎么办?我们将简单地模拟一个循环,调用io_context 的poll函数。poll 函数“运行 io_context 对象的事件处理循环来执行就绪的处理程序”。
#include <iostream>
#include <asio.hpp>
int main()
{
asio::io_context io_context;
for(int x = 0; x < 42; ++x)
{
io_context.poll();
std::cout << "Counter: " << x << std::endl;
}
return 0;
}
//output: 我们将看到 42 行文本输出到控制台,然后程序退出
- 当我们运行这个程序时,我们得到与之前完全相同的输出和结果。这是因为当有更多工作要做时,轮询函数不会阻塞。它只是执行当前的一组工作然后返回。在一个真实的程序中,循环将基于一些其他事件,但为了简单起见,我们只使用一个固定的事件。
#include <iostream>
#include <asio.hpp>
int main()
{
asio::io_context io_context;
asio::io_context::work work(io_context);
for(int x = 0; x < 42; ++x)
{
io_context.poll();
std::cout << "Counter: " << x << std::endl;
}
return 0;
}
- 如果我们想要从 io_context 中删除一个工作对象怎么办?为了实现此功能,我们必须改用指向工作对象的指针。与 boost 库保持一致,我们将使用shared_ptr,一个智能指针类。
#include <iostream>
#include <asio.hpp>
int main()
{
asio::io_context io_context;
std::shared_ptr<asio::io_context::work> work(new asio::io_context::work(io_context));
work.reset();
io_context.run();
std::cout << "Do you reckon this line displays?" << std::endl;
return 0;
}
//如果我们运行该程序,我们将看到显示的文本行。这有效地向我们展示了如何从 io_service 中删除工作对象。
- 现在我们知道如何用一个线程驱动 io_context ,我们需要弄清楚更多线程需要什么。io_context 文档页面告诉我们“多个线程可以调用 run() 函数来设置一个线程池,io_context 可以从中执行处理程序。池中等待的所有线程都是等效的,io_context 可以选择其中的任何一个他们调用处理程序。”
Asio 多线程
- 在多线程的场景下,每个线程都持有一个 io_context ,并且每个线程都调用各自的 io_context 的run()方法。
- 全局只分配一个io_context ,并且让这个 io_context 在多个线程之间共享,每个线程都调用全局的 io_context 的run()方法。
每个线程一个 I/O Context[2]
特点:
- 在多核的机器上,这种方案可以充分利用多个 CPU 核心。
- 某个 socket 描述符并不会在多个线程之间共享,所以不需要引入同步机制。
- 在 event handler 中不能执行阻塞的操作,否则将会阻塞掉 io_context 所在的线程。
示例:
#include <iostream>
#include <asio.hpp>
#include <vector>
class AsioIOContextPool
{
public:
using IOContext = asio::io_context;
using Work = asio::io_context::work;
using WorkPtr = std::unique_ptr<Work>;
//返回当前系统支持的并发线程数
AsioIOContextPool(std::size_t size = std::thread::hardware_concurrency()) :
ioContexts_(size),
works_(size),
nextIOContext_(0)
{
for(std::size_t i = 0; i < size; ++i)
{
works_[i] = std::unique_ptr<Work>(new Work(ioContexts_[i]));
}
for(std::size_t i = 0; i < ioContexts_.size(); ++i)
{
threads_.emplace_back([this, i](){
ioContexts_[i].run();
});
}
}
AsioIOContextPool(const AsioIOContextPool&) = delete;
AsioIOContextPool &operator=(const AsioIOContextPool&) = delete;
asio::io_context& getIOContext()
{
auto &context = ioContexts_[nextIOContext_++];
if(nextIOContext_ == ioContexts_.size())
{
nextIOContext_ = 0;
}
return context;
}
void stop()
{
for(auto &work : works_)
{
work.reset();
}
for(auto &t : threads_)
{
t.join();
}
}
private:
std::vector<IOContext> ioContexts_;
std::vector<WorkPtr> works_