Dating Java8系列之并行数据处理

发布时间 2024-01-12 23:40:30作者: givemomo
翎野君/文

 

图片

 

图片

分支合并框架

 

分支合并框架介绍

分支/合并框架的目的是以递归的方式将可以并行的任务拆分成更小的任务,然后将每个子任务的结果合并起来生成整体结果。

它是ExecutorService接口的一个实现,它把子任务分配给线程池(称为ForkJoinPool)中的工作线程。把任务提交到这个池,必须创建RecursiveTask<R>的一个子类,其中R是并行化任务(以及所有子任务)产生的结果类型,或者如果任务不返回结果,则是RecursiveAction类型。

要定义RecursiveTask,只需实现它唯一的抽象方法compute:

protected abstract R compute();

这个方法同时定义了将任务拆分成子任务的逻辑,以及无法再拆分或不方便再拆分时,生成单个子任务结果的逻辑。正由于此,这个方法的实现类似于下面的伪代码:

if (任务足够小或不可分) {   顺序计算该任务} else {     将任务分成两个子任务     递归调用本方法,拆分每个子任务,等待所有子任务完成     合并每个子任务的结果}

图片

 

图片

使用分支合并框架的例子

 

执行递增求和任务

public class CalculatorSumTask extends RecursiveTask<Long> {
// 开始的值 private Long start;
// 结束的值 private Long end;
// 阈值,即结束fork的条件 private static final Long THRESHOLD = 10000L;
public CalculatorSumTask(Long start, Long end) { this.start = start; this.end = end; }
/** * 求 start - end 之间的所有数的和 * * @return */ @Override protected Long compute() {
// 数的间隔 long length = end - start;
// 任务足够小或不可分 if (length <= THRESHOLD) { long sum = 0L; // 顺序计算该任务 for (long i = start; i <= end; i++) { sum += i; } return sum; } else { long middle = (start + end) / 2; // 将任务分成两个子任务 CalculatorSumTask leftTask = new CalculatorSumTask(start, middle); leftTask.fork(); CalculatorSumTask rightTask = new CalculatorSumTask(middle + 1, end); rightTask.fork();
// 合并每个子任务的结果 return leftTask.join() + rightTask.join(); } }}

当把CalculatorSumTask任务传给ForkJoinPool时,这个任务就由池中的一个线程执行,这个线程会调用任务compute方法。该方法会检查任务是否小到足以顺序执行,如果不够小则会把,要求和的数组分成两半,分给两个新的CalculatorSumTask,而它们也由ForkJoinPool安排执行。

因此,这一过程可以递归重复,把原任务分为更小的任务,直到满足不方便或不可能再进一步细分的条件(本例中是求和的项目数小于等于10000)。这时会顺序计算每个任务的结果,然后由分支过程创建的(隐含的)任务二叉树遍历回到它的根。接下来会合并每个子任务的部分结果,从而得到总任务的结果。

图片

 

模拟发送短信

public class SendSmsTask extends RecursiveAction {
private List<String> phoneList;
private String smsText;
private static final Integer THRESHOLD = 1000;
public SendSmsTask(List<String> phoneList, String smsText) { this.phoneList = phoneList; this.smsText = smsText; }
private void send(String phoneNumber, String smsText) { try { Thread.sleep(10); System.out.println("发送:" + phoneNumber + " 内容:" + smsText); } catch (InterruptedException e) { e.printStackTrace(); } }
/** * 对一批手机号进行发短信处理 */ @Override protected void compute() { int batchSize = phoneList.size(); if (batchSize <= 1000) { for (String number : phoneList) { send(number, smsText); } } else { // 拆的逻辑和如何拆分由自己来定义 int middle = phoneList.size() / 2;
List<String> leftList = phoneList.subList(0, middle); List<String> rightList = phoneList.subList(middle, phoneList.size());
SendSmsTask leftTask = new SendSmsTask(leftList,smsText); SendSmsTask rightTask = new SendSmsTask(rightList,smsText);
leftTask.fork(); rightTask.fork();
// 下面这两步骤可以省略 leftTask.join(); rightTask.join(); } }}

 

图片

工作窃取算法

 

算法介绍

分出大量的小任务一般来说都是一个好的选择。这是因为,理想情况下,划分并行任务时,应该让每个任务都用完全相同的时间完成,让所有的CPU内核都同样繁忙。不幸的是,实际中,每个子任务所花的时间可能天差地别。

分支/合并框架工程用一种称为工作窃取(work stealing)的技术来解决这个问题。在实际应用中,这意味着这些任务差不多被平均分配到ForkJoinPool中的所有线程上。

  • 每个线程都为分配给它的任务保存一个双向链式队列,每完成一个任务,就会从队列头上取出下一个任务开始执行。

  • 基于前面所述的原因,某个线程可能已经完成了分配给它的所有任务,也就是它的队列已经空了,而其他的线程还很忙。

  • 这时,这个线程并没有闲下来,而是随机选了一个别的线程,从队列的尾巴上“偷走”一个任务。

  • 这个过程一直继续下去,直到所有的任务都执行完毕,所有的队列都清空。

  • 这就是为什么要划分成许多小任务而不是少数几个大任务,这有助于更好地在工作线程之间平衡负载。

图片

相关代码

public class ParallelMain {
public static void main(String[] args) { Long start = Instant.now().toEpochMilli(); // 求0-10000000所有数据的和 Long sum = 0L; for (long i = 0; i <= 10000000000L; i++) { sum += i; } System.out.println(sum); Long end = Instant.now().toEpochMilli(); System.out.println("for循环单线程执行 耗费时间:" + (end - start));
// 获取Jvm最大的可用线程数 System.out.println(Runtime.getRuntime().availableProcessors());
Long start1 = Instant.now().toEpochMilli();
// 要处理的数据量的太小 ForkJoinPool forkJoinPool = new ForkJoinPool(); Long sum1 = forkJoinPool.invoke(new CalculatorSumTask(0L,10000000000L)); System.out.println(sum1);
Long end1 = Instant.now().toEpochMilli(); System.out.println("fork join 并行执行 耗费时间:" + (end1 - start1));
Long start2 = Instant.now().toEpochMilli();
Long sum2 = LongStream.rangeClosed(0L,10000000000L).parallel().reduce(0L,Long::sum); System.out.println(sum2);
Long end2 = Instant.now().toEpochMilli(); System.out.println("java8 parallel 并行执行 耗费时间:" + (end2 - start2)); }}

 

图片

小结

 

  • 分支/合并框架使用递归的方式将可以并行的任务拆分成更小的任务,在不同的线程上执行,然后将各个子任务的结果合并起来生成整体结果。

  • 内部迭代让你可以并行处理一个流,而无需在代码中显式使用和协调不同的线程。

  • 虽然并行处理一个流很容易,却不能保证程序在所有情况下都运行得更快。并行软件的行为和性能有时是反直觉的,因此一定要测量,确保你并没有把程序变得更慢。

  • 像并行流那样对一个数据集并行执行操作可以提升性能,特别是要处理的元素数量庞大,或处理单个元素特别耗时的时候。

 

作者:给我馍馍
博客:https://www.cnblogs.com/givemomo/

 

本篇文章如有帮助到您,请给「给我馍馍」点个赞,感谢您的支持。