C++ 简易消息循环

发布时间 2023-03-22 21:15:32作者: 1bite

前言

本文将向大家介绍如何使用 C++ 的标准库实现一个异步和并发编程中都非常重要的编程模式:消息循环(Event Loop)。尽管市面上存在不少库也提供了同样的功能,但有时候出于一些原因,我们并不想引入外部库,就想要一个小巧、只使用 C++ 标准库的实现。

话不多说,上代码

using sys_clock = std::chrono::system_clock;

struct message {
    sys_clock::time_point when;
    std::function<void()> callback;
};

class message_loop {
public:
    message_loop()
    : _stop(false)
    {
        //
    }

    message_loop(const message_loop&) = delete;
    message_loop& operator=(const message_loop&) = delete;

    void run() {
        while (!_stop) {
            auto msg = wait_one();
            msg.callback();
        }
    }

    void quit() {
        post({sys_clock::now(), [this](){ _stop = true; } });
    }

    void post(std::function<void()> callable) {
        post({sys_clock::now(), std::move(callable)});
    }

    void post(std::function<void()> callable, std::chrono::milliseconds delay) {
        post({sys_clock::now() + delay, std::move(callable)});
    }

private:
    struct msg_prio_comp {
        inline bool operator() (const message& a, const message& b) {
            return a.when > b.when;
        }
    };

    using queue_type = std::priority_queue<message, std::vector<message>, msg_prio_comp>;

    std::mutex _mtx;
    std::condition_variable _cv;
    queue_type _msgs;
    bool _stop;

    void post(message msg) {
        {
            auto lck = acquire_lock();
            _msgs.emplace(std::move(msg));
        }
        _cv.notify_one();
    }

    std::unique_lock<std::mutex> acquire_lock() {
        return std::unique_lock<std::mutex>(_mtx);
    }

    bool idle() const {
        return _msgs.empty();
    }

    const message& top() const {
        return _msgs.top();
    }

    message pop() {
        auto msg = top();
        _msgs.pop();
        return msg;
    }

    message wait_one() {
        while (true) {
            auto lck = acquire_lock();
            if (idle())
                _cv.wait(lck);
            else if (top().when <= sys_clock::now())
                return pop();
            else {
                _cv.wait_until(lck, top().when);
                // 可能是新消息到达,再循环一次看看
            }
        }
    }
};

接下来,演示一下使用方式:

int main() {
    using namespace std;
    using namespace std::chrono;

    message_loop *pLoop = nullptr;
    thread th([&loop](){
        message loop;
        pLoop = &loop;
        loop.run();
        pLoop = nullptr;
    });

    logger() << "投递消息#1";
    pLoop->post([](){
        logger() << "消息#1 处理了";
    });

    logger() << "投递消息#2,延迟 500 毫秒";
    pLoop->post([](){
        logger() << "消息#2 处理了";
    }, milliseconds(500));

    logger() << "投递消息#3";
    pLoop->post([](){
        logger() << "消息#3 处理了";
    });

    logger() << "投递消息#4,延迟 1000 毫秒";
    pLoop->post([](){
        logger() << "消息#4 处理了";
    }, milliseconds(1000));

    this_thread::sleep_for(milliseconds(1500));
    pLoop->quit();
    logger() << "退出";
    th.join();
    return 0;
}

运行上面的示例可能看到如下输出:

[11:22:33.000] 投递消息#1
[11:22:33.000] 投递消息#2,延迟 500 毫秒
[11:22:33.000] 消息#1 处理了
[11:22:33.000] 投递消息#3
[11:22:33.000] 消息#3 处理了
[11:22:33.000] 投递消息#4,延迟 1000 毫秒
[11:22:33.501] 消息#2 处理了
[11:22:34.000] 消息#4 处理了
[11:22:34.502] 退出

可见,相比单纯的先进先出队列,这个消息循环支持延迟消息,可以用来做简单定时器,覆盖更多使用场景。

效率

当然,这么简单的消息循环,效率如何呢。在我的 i5 10500 上,针对 1048576 个消息的压测结果为每毫秒能处理约 2400 个消息。

瓶颈

效率瓶颈主要在以下几的地方:

  1. 锁粒度太高。每次投递消息与取出消息都会锁住整个循环
  2. 消息多了之后,priority_queue 插入、移除的耗时变得可观

优化方向

针对上述原因,可以采取以下优化措施:

  1. 减小锁粒度或者采用无锁数据结构(参考 Disruptor 的 RingBuffer)
  2. 消息一般可分为两类:一类是定时消息,要在某个时间点执行;另一类是非定时消息,只要执行它就行。因此可以把消息队列分为至少两个:一个先入先出队列;一个带排序的队列(堆)
  3. 采用两个缓冲区。一个用于写,一个用于读
  4. 采用对象池优化内存分配

优化过程要注意以下问题:

  1. 消息的回调函数内可能会再调用 post 发送消息,容易发生死锁。

在我电脑上的测试表明,即使不采用无锁数据结构,只把锁粒度减小,就能把效率翻倍。

拓展

如果觉得 post 函数使用太麻烦,也可以稍稍拓展一下。

execSync

使得我们可以像使用 GCD 一样,把函数调用委派到相应队列中:

logger() << pLoop->execSync([](int a, int b) { return a + b; }, 1, 2);

实现如下:

template<class Func, typename... Args>
auto execSync(Func&& fn, Args&& ...args) {
    if (std::this_thread::get_id() == _tid) { // _tid 是新引入的成员变量,表示 message_loop 所在的线程的 ID
        return std::invoke(std::forward<Func>(fn), std::forward<Args>(args)...);
    }

    using return_type = std::invoke_result_t<Func, Args...>;
    std::packaged_task<return_type(Args&&...)> task(std::forward<Func>(fn));

    post([&](){ task(std::forward<Args>(args)...); });

    return task.get_future().get();
}

execAsync

execSync 的异步版本,用于想自己处理异步结果的情形:

auto result = pLoop->execAsync([](int a, int b) { return a + b; }, 1, 2);
// ...
logger() << result.get();

实现如下:

template<class Func, typename... Args>
[[nodiscard]] auto execAsync(Func&& fn, Args&& ...args) {
    using return_type = std::invoke_result_t<Func, Args...>;
    using task_type = std::packaged_task<return_type()>;

    auto pTask = std::make_shared<task_type>(
        std::bind(
            std::forward<Func>(fn),
            std::forward<Args>(args)...));

    post([pTask](){ (*pTask)(); });

    return pTask->get_future();
}

循环方式

其实,循环的方式多种多样,像我遇到的场景就采用了下面的循环:

while (!quit) {
    bool onceMore = myLogic();
    if (!onceMore) {
        while (!quit && !otherCondition()) {
            message msg = getNext();
            msg.callback();
        }
    }
    else if (hasNext()) {
        message msg = getNext();
        msg.callback();
    }
}

这种循环的特点是,myLogic() 会尽可能多的执行,同时消息来了也能及时处理,适合一些实时性高的场合。正是因为循环的方式多样,封装好的 message_loop 往往需要提供各种 hook 点,比如空闲处理、进入等待前、唤醒后等等。不过,灵活性增加后,效率就会牺牲一点,这时可以考虑把消息队列和消息循环分开。