任务队列C++实现-(完美转发)

发布时间 2023-10-31 15:30:32作者: 醉曦

需求

  • 任务队列中可以依次添加任务
  • 任务执行函数需要接受外部传输的参数
  • 主动调用Start开始执行任务

代码实现

class TaskQueue {
private:
    std::mutex mtx;
    std::condition_variable cv;
    std::queue<std::function<void()>> task_queue;
    std::atomic<bool> is_running;

public:
    TaskQueue() : is_running(false) {}
    ~TaskQueue() {}

    // std::forward is used to forward the parameter to the function
    template<typename F, typename... Args>
    void Push(F&& f, Args&&... args) {
        std::lock_guard<std::mutex> lock(mtx);
        task_queue.push(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
        cv.notify_one();
    }

    void Start() {
        is_running = true;
        std::thread t([this] {
            while(is_running) {
                std::unique_lock<std::mutex> lock(mtx);
                cv.wait(lock, [this] { return !task_queue.empty(); });
                auto task = task_queue.front();
                task_queue.pop();
                lock.unlock();
                task();
            }
        });
        t.detach();
    }

    void Stop() {
        is_running = false;
    }
};
int main(int argc, char** argv) {

    TaskQueue tq;
    tq.Push(DoSomething, 1);
    tq.Push(DoSomething, 2);
    tq.Push(DoSomething, 3);
    tq.Start();
    tq.Push(DoSomething, 4);

    // 等待任务结束
    while(1) {
        std::this_thread::sleep_for(std::chrono::seconds(1));
    }

    return 0;
}

实现笔记

任务队列,将需要执行的任务存储在队列中,存储的这个动作类似于生产者
当任务队列不为空时,会从队列中取出一个任务执行,当任务执行结束后再从队列取下一个,直到队列为空;
执行任务类似于消费者

基础概念理解

  1. C++左值和右值

判断表达式左值还是右值的两种办法:
a. 位于赋值符号=左侧的就是左值,只能位于右侧的就是右值;需要注意的是,左值也可以当右值用;
b. 有名称、可以取到存储地址的表达式就是左值,否则就是右值;

C++右值引用(用 &&标识)

  1. 和左值引用一样,右值引用也需要立即被初始化,且只能使用右值进行初始化
int num = 10;
// 左值不能用于初始化右值
// int &&a = num; 编译报错
int &&a = 123;
  2. 和常量左值引用不同的是,右值引用可以对右值进行修改:
int num = 10;
int &&ref = 12;
ref = 222;// 修改右值引用的值
std::cout << ref << std::endl;
  1. std::unique_lock

std::unique_lock是个类模板,工作中,一般使用std::lock_guard(推荐使用) ,std::unique_lockstd::lock_guard灵活很多,效率上差一点,内存占用多一点。

  1. std::async 和 std::future

std::async是个函数模板,用来启动一个异步任务,启动起来一个异步任务之后(什么叫“启动一个异步任务”,就是自动创建一个线程并开始执行对应的线程入口函数),他返回一个std::future对象,这个std::future对象里面就含有线程函数返回的结果,我们可以通过调用std::future对象的成员函数get()来获取结果;它返回一个std::future对象。

  1. 条件变量std::condition_variable

std:: condition_variable实际上是个类,是一个与条件相关的类,说白了就是等待一个条件的达成。这个类是需要和互斥量来配合工作的,用的时候我们要生成这个类的对象。
a. wait()

  1. 若第二个参数是true,wait()直接返回;
  2. 若第二个参数是Lambda表达式,且**返回值是false,wait()将解锁互斥量,且在本行阻塞**。阻塞到何时结束呢?堵塞到其他线程调用notify_one() 为止;
  3. 若wait没有第二个参数,则默认false;

b. notify_one()wait()的工作流程:
其他线程用notify_one()将本wait(原本是睡着/堵塞)的状态唤醒后,wait就开始恢复干活了,恢复后的wait干什么活

  1. wait不断地尝试重新获取互斥量锁,如果获取不到,那么流程就卡在wait这里等着获取,如果获取到,那么wait就继续执行b
  2. 上锁(实际上获取到了锁,等同于上锁);
  3. 若wait有第二个参数,就判断lambda的表达式值,若值为false,则wait又对互斥量解锁,休眠;直到lambda值为true时,才会执行下一步;
  4. 为防止假唤醒,wait()中要有第二个参数(lambda)并且这个lambda中要正确处理公共数据是否存在;

完美转发

定义:

函数模板可以将自己的参数“完美”地转发给内部调用的其它函数。所谓完美,即不仅能准确地转发参数的值,还能保证被转发参数的左、右值属性不变。

不管传入的参数是什么,都能够很好的匹配函数需要的参数类型

C++11实现:

#include <iostream>
using namespace std;

// 接收左值
void ref_func(int& t) {
    cout << "lvalue\n";
}
void ref_func(const int& t) {
    cout << "rvalue\n";
}

//实现完美转发的函数模板
template <typename T>
void function(T&& t) {
    ref_func(forward<T>(t));
}

int main()
{
    function(5);   // rvalue
    int  x = 1;
    function(x);   // lvalue
    return 0;
}

代码中,重载的函数ref_func可以接收一个左值引用,也可以接收一个右值引用,但这需要定义两个函数进行重载。为了实现形式的统一,定义了一个模板函数function,函数体内调用ref_func函数,该模板函数接收参数后,会将参数类型转到具体的函数中进行调用。

完美转发需要考虑的一些问题

  1. C++11规定,通常情况下右值引用形式的参数只能接收右值,不能接收左值;
  2. 对于函数模板中的使用右值引用语法定义的参数来说,上述规定不再有效。模板函数的右值引用参数既可以接收左值引用,也可以接收右值引用。此时的右值引用也被称为万能引用。
  3. 在实现完美转发的时候,只要函数模板的参数类型为T&&,C++就可以自行准确判定实际传入的实参是左值还是右值;
  4. 如何将函数模板接收到的形参,连同参数的左右值属性,一切传递给被调用的函数呢?
    1. C++11为了解决这个问题,引入了std::forward()模板
//实现完美转发的函数模板
template <typename T>
void function(T&& t) {
    // 将形参和其左右值属性传递给被调用的函数
    ref_func(std::forward<T>(t));
}

队列实现

  1. 添加任务的实现
    1. 需要将不同任务添加进队列中,函数名可能不一样,参数也不一样
    2. 要求能够添加不同的函数,执行不同的任务;

实现原理:
a. 类内定义一个队列,元素是std::function<void()>,即std::function对象;
b. 使用一个模板函数,和完美转发特性,将不同的函数添加进队列中;

template<typename F, typename... Args>
void Push(F&& f, Args&&... args) {
    std::lock_guard<std::mutex> lock(mtx);
    task_queue.push(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
    cv.notify_one();
}

Push函数中使用了std::bind类模板,将传入函数f和其需要的参数绑定在一起,生成一个std::function类对象,

往队列中添加完任务之后,则需要通过条件变量cv通知消费者可以进行消费

  1. 按序执行任务,需要从队列中一个个取出来执行,
void Start() {
    is_running = true;
    std::thread t([this] {
        while(is_running) {
            std::unique_lock<std::mutex> lock(mtx);
            cv.wait(lock, [this] { return !task_queue.empty(); });
            auto task = task_queue.front();
            task_queue.pop();
            lock.unlock();
            task();
        }
    });
    t.detach();
}
这里将创建的执行任务线程用detach方法放在后台执行,

​ 这里将创建的执行任务线程用detach方法放在后台执行,当队列中没有任务可以执行的之后,将会等待队列中有任务时在执行,将一直阻塞在cv.wait(lock, [this] { return !task_queue.empty(); });中。

使用说明

  1. 先生成一个任务队列的对象;

  2. 调用Push将需要执行的函数和参数加到队列中;

  3. 调用Start接口,让任务按序执行;

拓展:

  1. 如果要等任务结束后在执行下一个任务,则需要在task()后面加上一个条件变量,等待任务结束在取下一个任务;
  2. 若要让执行任务的线程一开始就运行,则可以将Start函数放在构造函数中;