Java8使用并行流(ParallelStream)注意事项

发布时间 2023-11-28 10:05:46作者: 德邦总管

本文转载自简书:https://www.jianshu.com/p/51c1d4f1bf84
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

 

Java8并行流ParallelStream和Stream的区别就是支持并行执行,提高程序运行效率。但是如果使用不当可能会发生线程安全的问题。Demo如下:

 1 public static void concurrentFun() {
 2         List<Integer> listOfIntegers =
 3                 new ArrayList<>();
 4         for (int i = 0; i <100; i++) {
 5             listOfIntegers.add(i);
 6         }
 7         List<Integer> parallelStorage = new ArrayList<>() ;
 8         listOfIntegers
 9                 .parallelStream()
10                 .filter(i->i%2==0)
11                 .forEach(i->parallelStorage.add(i));
12         System.out.println();
13 
14         parallelStorage
15                 .stream()
16                 .forEachOrdered(e -> System.out.print(e + " "));
17 
18         System.out.println();
19         System.out.println("Sleep 5 sec");
20         try {
21             TimeUnit.SECONDS.sleep(5);
22         } catch (InterruptedException e) {
23             e.printStackTrace();
24         }
25 
26         parallelStorage
27                 .stream()
28                 .forEachOrdered(e -> System.out.print(e + " "));
29     }

程序运行结果如下:

1 null 72 56 58 60 74 34 36 68 70 54 28 30 50 52 26 16 44 12 14 48 22 46 40 24 42 18 20 38 6 8 10 0 null 4 82 66 84 86 78 80 76 62 64 90 92 94 88 96 98 
2 Sleep 5 sec
3 null 72 56 58 60 74 34 36 68 70 54 28 30 50 52 26 16 44 12 14 48 22 46 40 24 42 18 20 38 6 8 10 0 null 4 82 66 84 86 78 80 76 62 64 90 92 94 88 96 98 

除了以上在ForEach里面添加集合元素会出现这种问题,以下这种方式也会:

1 listOfIntegers
2                 .parallelStream()
3                 .map(e -> {
4                     parallelStorage.add(e);
5                     return e;
6                 })
7                 .forEachOrdered(e -> System.out.print(e + " "));

两个问题:
1.为什么parallelStorage的大小不固定?
2.为什么parallelStorage会有null元素?

最初我以为是因为主线程执行完成后并行流中的线程并未结束,sleep了主线程后发现结果并没有发生改变,其实我们可以认为ArrayList内部维护了一个数组Arr其定义一个变量 n用以表式这个数组的大小那么向这个ArrayList中存储数据的过程可以分解为这么几步:

1.读取数组的长度存入n
2.向这个数组中储入元素arr[n]=a
3.将n+1
4.保存n
而对于parrallelStorage元素数量不固定的原因就是多线程有可能同时读取到相同的下标n同时赋值,这样就会出现元素缺失的问题了
如何解决这个问题呢?我们可以将其转化为一个同步集合也就是

1 Collections.synchronizedList(new ArrayList<>()) 

在使用并行流的时候是无法保证元素的顺序的,也就是即使你用了同步集合也只能保证元素都正确但无法保证其中的顺序。

除了以上这种方式,还有什么方法可以防止并行流出现线程不安全操作?

那就是最后调用collect(Collectors.tolist()),这种收集起来所有元素到新集合是线程安全的。Demo如下:

 1 public static void collectFun() {
 2         List<Integer> listOfIntegers =
 3                 new ArrayList<>();
 4 
 5         for (int i = 0; i <100; i++) {
 6             listOfIntegers.add(i);
 7         }
 8 
 9         List<Integer> parallelStorage = listOfIntegers
10                 .parallelStream()
11                 .filter(i -> i % 2 == 0)
12                 .collect(Collectors.toList());
13 
14 
15         System.out.println();
16 
17         parallelStorage
18                 .stream()
19                 .forEachOrdered(e -> System.out.print(e + " "));
20 
21         System.out.println();
22         System.out.println("Sleep 5 sec");
23         try {
24             TimeUnit.SECONDS.sleep(5);
25         } catch (InterruptedException e) {
26             e.printStackTrace();
27         }
28 
29         parallelStorage
30                 .stream()
31                 .forEachOrdered(e -> System.out.print(e + " "));
32     }

程序运行结果如下:

1 0 2 4 6 8 10 12 14 16 18 20 22 24 26 28 30 32 34 36 38 40 42 44 46 48 50 52 54 56 58 60 62 64 66 68 70 72 74 76 78 80 82 84 86 88 90 92 94 96 98 
2 Sleep 5 sec
3 0 2 4 6 8 10 12 14 16 18 20 22 24 26 28 30 32 34 36 38 40 42 44 46 48 50 52 54 56 58 60 62 64 66 68 70 72 74 76 78 80 82 84 86 88 90 92 94 96 98 

不光没有出现Null和数量不一致问题,还排序了!所以,在采用并行流收集元素到集合中时,最好调用collect方法,一定不要采用Foreach方法或者map方法。