原生并行版std::accumulate

发布时间 2023-12-03 20:43:23作者: ZhangFirst1

原生并行版std::accumulate

​ 代码来自《c++并发编程实战》

#include <iostream>
#include <numeric>
#include <algorithm>
#include <thread>
#include <functional>
#include <vector>
#include <chrono>
typedef long long LL;

template<typename Iterator, typename T>
struct accumulate_block{
    void operator()(Iterator first, Iterator last, T& result){
        result = std::accumulate(first, last, result);
    }
};

template<typename Iterator, typename T>
T parallel_accumulate(Iterator first, Iterator last, T init){
    unsigned long const length = std::distance(first, last);
    
    // 若输入范围为空 返回init的值
    if(!length)                 
        return init;

    // 确定启动的线程数
    unsigned long const min_per_thread = 25;
    unsigned long const max_threads = (length + min_per_thread - 1) / min_per_thread;

    // 能并发在一个程序的线程数量
    unsigned long const hardware_threads = std::thread::hardware_concurrency();
    // 确定最终的线程数量 若hardware_concurrency返回0则选择两个线程
    unsigned long const num_threads = std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);
    // 每个线程中处理的元素数量
    unsigned long const block_size = length / num_threads;

    std::vector<T> results(num_threads);                // 存放中间结果
    std::vector<std::thread> threads(num_threads-1);    // 已有主线程 -1

    Iterator block_start = first;
    for(unsigned long i = 0; i < (num_threads - 1); i++){
        Iterator block_end = block_start;
        std::advance(block_end, block_size);            // block_end指向当前块的末尾
        // 启动一个新线程为当前块累加结果
        threads[i] = std::thread(accumulate_block<Iterator, T>(), block_start, block_end, std::ref(results[i]));
        block_start = block_end;                        // 当迭代器指向当前块末尾时启动一下一个块
    }
    // 处理最终块的结果
    accumulate_block<Iterator, T>()(block_start, last, results[num_threads - 1]);
    // 等待所有线程执行完毕
    std::for_each(threads.begin(), threads.end(), std::mem_fn(&std::thread::join));
    // 返回最终结果
    return std::accumulate(results.begin(), results.end(), init);
}

int main(){
    std::vector<LL> v;
    LL sum = 0;
    for(LL i = 0;i <= 10000000; i++) v.push_back(i);
    
    auto beforeTime = std::chrono::steady_clock::now();

    //std::cout << parallel_accumulate(v.begin(), v.end(), sum) << std::endl;
    //std::cout << std::accumulate(v.begin(), v.end(), sum) << std::endl;

    auto afterTime = std::chrono::steady_clock::now();
    double duration_millsecond = std::chrono::duration<double, std::milli>(afterTime - beforeTime).count();
    std::cout << duration_millsecond << std::endl;
    return 0;
}

​ 0~10000000相加时测试如下。

image-20231203203812492