JUC并发编程学习(十三)ForkJoin

发布时间 2023-11-06 13:06:31作者: 高同学,你好

ForkJoin

什么是ForkJoin

ForkJoin在JDK1.7,并发执行任务!大数据量时提高效率。

大数据:Map Reduce(把大任务拆分成小任务)

ForkJoin特点:工作窃取

为什么可以取窃取其他线程的任务呢?因为这里面维护的都是双端队列(即队列的两端都可以取元素)

ForkJoin操作

在java.util.concurrent下的接口摘要中,有以下两个接口

点进其中一个找到具体的类,可以看到ForkJoinPool

这是具体的执行类,就像ThreadPoolExecutor

具体的执行通过ForkJoinTask类来执行

使用ForkJoin

1、需要一个ForkJoinPool,通过execute方法来执行,参数为ForkJoinTask

2、计算任务 forkJoinPool.execute(ForkJoinTask<?> task)

3、定义一个ForkJoinTask

如何去定义一个ForkJoinTask,打开jdk文档,找到ForkJoinTask类,查看具体的子类。

其中递归事件没有返回值,而任务肯定要有结果,所以递归任务是有返回值的

点进任务,查看示例

代码示例:

package org.example.forkjoin;
/*
* 求和计算的任务
*
*
* 程序员的三六九等
* 三(普通求和)、六(ForkJoin)、九(Stream并行流)
*
*
* 使用ForkJoin:
* 1、ForkJoinPoll 通过他来执行
* 2、计算任务 forkJoinPool.execute(ForkJoinTask<?> task)
* 3、定义一个ForkJoinTask
*
*
* */
import java.util.concurrent.RecursiveTask;

public class ForkJoinDemo extends RecursiveTask<Long> {
    private Long start;
    private Long end;
    //临界值
    private Long temp = 10000L;

    public ForkJoinDemo(Long start, Long end) {
        this.start = start;
        this.end = end;
    }


    //计算方法
    @Override
    protected Long compute() {
        if ((end-start)<temp){
            Long sum = 0L;
            for (Long i = start; i <= end; i++) {
                sum+=i;
            }
            return sum;
        }else {
            //使用ForkJoin分支合并计算
            Long middle = (end + start) / 2;//中间值
            ForkJoinDemo task1 = new ForkJoinDemo(start,middle);
            //拆分任务、把任务压入线程队列
            task1.fork();
            ForkJoinDemo task2 = new ForkJoinDemo(middle+1,end);
            task2.fork();

            return task1.join()+task2.join();
        }
    }
}

三六九等程序员的测试

package org.example.forkjoin;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;

public class Test {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
//        test01();//11720
//        test02();//7369
        test03();//239
    }
    /*
    * 普通程序员
    * */
    public static void test01() {
        long startTime = System.currentTimeMillis();
        Long sum = 0L;
        for (Long i = 1L; i <= 10_0000_0000L; i++) {
            sum+=i;
        }
        long endTime = System.currentTimeMillis();
        System.out.println("结果:" + sum + "-->耗时:" + (endTime - startTime));
    }
/*
* 会使用ForkJoin
* */
    public static void test02() throws ExecutionException, InterruptedException {
        long startTime = System.currentTimeMillis();
        ForkJoinDemo forkJoinDemo = new ForkJoinDemo(0L,10_0000_0000L);
        ForkJoinPool forkJoinPool = new ForkJoinPool();
//        forkJoinPool.execute(forkJoinDemo);//执行
        ForkJoinTask<Long> submit = forkJoinPool.submit(forkJoinDemo);
        Long aLong = submit.get();
        long endTime = System.currentTimeMillis();

        System.out.println("结果:" + aLong + "-->耗时:" + (endTime - startTime));
    }
    /*
    * 九等程序员:Stream流并行运算
    * 效率高十几倍
    *
    * */
    public static void test03() {
        long start = System.currentTimeMillis();

        Long sum = LongStream.rangeClosed(0L, 10_0000_0000L).parallel().reduce(0, Long::sum);

        long end = System.currentTimeMillis();
        System.out.println("结果:" + sum + "-->耗时:" + (end - start));
    }
}