Stream流编程

发布时间 2023-07-20 16:54:08作者: 离人怎挽_wdj

1、Stream流编程概念

Stream(流)是一个来自数据源的元素队列并支持聚合操作

元素是特定类型的对象,形成一个队列。 Java中的Stream并不会存储元素,而是按需计算。
数据源 流的来源。 可以是集合,数组,I/O channel, 产生器generator 等。
聚合操作 类似SQL语句一样的操作, 比如filter, map, reduce, find, match, sorted等。
和以前的Collection操作不同, Stream操作还有两个基础的特征:

Pipelining: 中间操作都会返回流对象本身。 这样多个操作可以串联成一个管道, 如同流式风格(fluent style)。 这样做可以对操作进行优化, 比如延迟执行(laziness)和短路( short-circuiting)。
内部迭代: 以前对集合遍历都是通过Iterator或者For-Each的方式, 显式的在集合外部进行迭代, 这叫做外部迭代。 Stream提供了内部迭代的方式, 通过访问者模式(Visitor)实现。

package stream;

import java.util.stream.IntStream;

public class StreamDemo1 {

    public static void main(String[] args) {
        int[] nums = {1, 2, 3};
        // 外部迭代
        int sum = 0;
        for (int num : nums) {
            sum += num;
        }
        System.out.println("结果为:" + sum);

        // 使用stream的内部迭代
        // map就是中间操作(懒加载,返回stream的操作)
        // sum就是终止操作(副作用)
        // int sum2 = IntStream.of(nums).map(i -> i * 2).sum();
        int sum2 = IntStream.of(nums).map(StreamDemo1::doubleNum).sum();
        System.out.println("结果为:" + sum2);

        System.out.println("惰性求值就是终止没有调用的情况下,中间操作不会执行");
        IntStream.of(nums).map(StreamDemo1::doubleNum);

    }

    public static int doubleNum(int i) {
        System.out.println("执行了乘以2");
        return i * 2;
    }


}

2、流的创建


package stream;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.stream.IntStream;
import java.util.stream.Stream;

public class StreamDemo2 {

public static void main(String[] args) {
    List<String> list = new ArrayList<>();

    // 从集合创建
    list.stream();
    list.parallelStream();

    // 从数组创建
    Arrays.stream(new int[]{2, 3, 5});

    // 创建数字流
    IntStream.of(1, 2, 3);
    // rangeClosed包含结束节点,range不包含
    IntStream.rangeClosed(1, 10);

    // 使用random创建一个无限流(非静态方法)
    // 因为它是一个无限流,所以需要执行一些短路操作,例如limit,否则它会一直执行下去
    new Random().ints().limit(10);

    // 自己产生流
    Random random = new Random();
    Stream.generate(random::nextInt).limit(20);
}

}

3、流的中间操作


无状态和有状态操作的解释:

诸如map或者filter会从输入流中获取每一个元素,并且在输出流中得到一个结果,这些操作没有内部状态,称为无状态操作。
但是像reduce、sum、max这些操作都需要内部状态来累计计算结果,所以称为有状态操作

package stream;

import java.util.Random;
import java.util.stream.Stream;

public class StreamDemo3 {

    public static void main(String[] args) {

        String str = "my name is 007";
        // 把每个单词的长度大于2的长度打印出来
        // filter传入一个Predicate接口函数,map传入一个Function函数接口
        System.out.println("------filter and map------");
        Stream.of(str.split(" ")).filter(s -> s.length() > 2).map(String::length).forEach(System.out::println);

        // flatMap A -> B属性(是个集合),最终得到所有的A元素里面的所有B属性集合
        // intStream/longStream并不是Stream的子类所以经常要进行装箱 boxed
        // 获取字符串中每个单词的字符
        System.out.println("------flatMap--------");
        Stream.of(str.split(" ")).flatMap(s -> s.chars().boxed()).forEach(i -> System.out.println((char)i.intValue()));

        // peek 用于debug,是个中间层操作,和forEach类似,forEach是终止操作
        System.out.println("-------peek---------");
        Stream.of(str.split(" ")).peek(System.out::print).forEach(System.out::println);

        // limit使用,主要用于无限流
        new Random().ints().filter(i -> i > 100 && i < 10000).limit(10).forEach(System.out::println);
    }
}

4、 流的终止操作


短路操作就是不需要等待所有结果都计算完就可以结束流的操作,只需要得到指定个结果后就可以结束的终止操作

package stream;

import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class StreamDemo4 {

    public static void main(String[] args) {
        String str = "my name is 007";

        // 迭代器
        // 使用并行流,乱序(效率更高)
        str.chars().parallel().forEach(i -> System.out.print((char)i));
        System.out.println();
        // 使用 forEachOrdered保证顺序
        str.chars().parallel().forEachOrdered(i -> System.out.print((char)i));
        System.out.println();

        // 收集器
        // 收集到list
        List<String> list = Stream.of(str.split(" ")).collect(Collectors.toList());
        System.out.println(list);
        // 收集到Set
        Set<String> set = Stream.of(str.split(" ")).collect(Collectors.toSet());
        System.out.println(set);

        // 归约,将流中的元素结合起来得到一个值
        // 使用reduce 拼接字符串
        Optional<String> letters = Stream.of(str.split(" ")).reduce((s1, s2) -> s1 + "|" + s2);
        // 使用optional进行空判断
        System.out.println(letters.orElse(null));

        // 带初始化值的reduce,但是默认值也会被算入元素之中
        String reduce = Stream.of(str.split(" ")).reduce("", (s1, s2) -> s1 + "|" + s2);
        System.out.println(reduce);

        // 计算所有单词总长度
        Integer length = Stream.of(str.split(" ")).map(String::length).reduce(0, (s1, s2) -> s1 + s2);
        System.out.println(length);

        // max的使用,找出最长的单词
        Optional<String> max = Stream.of(str.split(" ")).max(Comparator.comparingInt(String::length));
        System.out.println(max.orElse(null));
        // min的使用,找出最短的单词
        Optional<String> min = Stream.of(str.split(" ")).min(Comparator.comparingInt(String::length));
        System.out.println(min.orElse(null));
        // count的使用,统计有多少个单词
        long count = Stream.of(str.split(" ")).count();
        System.out.println(count);

        // 使用 allMatch判断是否所有单词长度都大于1,返回boolean
        // anyMatch和noneMatch用法差不多,都是返回boolean
        boolean all = Stream.of(str.split(" ")).anyMatch(s -> s.length() > 1);
        System.out.println(all);

        // 使用 findFirst 短路操作
        OptionalInt findFirst = new Random().ints().findFirst();
        System.out.println(findFirst.orElse(0));
        // 使用 findAny 短路操作
        OptionalInt findAny = new Random().ints().findAny();
        System.out.println(findAny.orElse(0));



    }
}

5、并行流

package stream;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

public class StreamDemo5 {

    public static void main(String[] args) {

        // peek在jdk10以后声明了使用count终止操作时不会执行peek中的方法,所以我们换成了sum()
        // 调用parallel产生一个并行流
        // IntStream.range(1, 100).parallel().peek(StreamDemo5::debug).sum();

        // 现在要实现这样一个效果:先并行,再串行
        // 多次调用 parallel / sequential,以最后一次调用为准
//        IntStream.range(1, 100)
//                // 调用parallel产生并行流
//                .parallel().peek(StreamDemo5::debug)
//                // 调用sequential产生串行流
//                .sequential().peek(StreamDemo5::debug2)
//                .sum();

        // 并行流使用的线程池:ForkJoinPool.commonPool
        // 默认的线程数时当前机器的cpu个数
        // 使用这个属性可以修改默认的线程数
//        System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
//        IntStream.range(1, 100).parallel().peek(StreamDemo5::debug).sum();

        // 使用自己的线程池,不适用默认线程池,防止任务被阻塞
        ForkJoinPool pool = new ForkJoinPool(20);
        pool.submit(() -> IntStream.range(1, 100).parallel().peek(StreamDemo5::debug).sum());
        pool.shutdown();

        // 主线程停止的话线程池也会被结束,所以我们在这里让它等待一下
        synchronized (pool) {
            try {
                pool.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private static void debug(int i) {
        System.out.println(Thread.currentThread().getName() + "debug " + i);
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private static void debug2(int i) {
        System.err.println("debug " + i);
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

6、收集器

package stream;

import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
 * 学生 对象
 */
class Student {
    /**
     * 姓名
     */
    private String name;

    /**
     * 年龄
     */
    private int age;

    /**
     * 性别
     */
    private Gender gender;

    /**
     * 班级
     */
    private Grade grade;

    public Student(String name, int age, Gender gender, Grade grade) {
        super();
        this.name = name;
        this.age = age;
        this.gender = gender;
        this.grade = grade;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public Grade getGrade() {
        return grade;
    }

    public void setGrade(Grade grade) {
        this.grade = grade;
    }

    public Gender getGender() {
        return gender;
    }

    public void setGender(Gender gender) {
        this.gender = gender;
    }

    @Override
    public String toString() {
        return "[name=" + name + ", age=" + age + ", gender=" + gender
                + ", grade=" + grade + "]";
    }

}

/**
 * 性别
 */
enum Gender {
    MALE, FEMALE
}

/**
 * 班级
 */
enum Grade {
    ONE, TWO, THREE, FOUR
}

public class CollectDemo {

    public static void main(String[] args) {
        List<Student> students = Arrays.asList(
                new Student("小明", 10, Gender.MALE, Grade.ONE),
                new Student("大明", 9, Gender.MALE, Grade.THREE),
                new Student("小白", 8, Gender.FEMALE, Grade.TWO),
                new Student("小黑", 13, Gender.FEMALE, Grade.FOUR),
                new Student("小红", 7, Gender.FEMALE, Grade.THREE),
                new Student("小黄", 13, Gender.MALE, Grade.ONE),
                new Student("小青", 13, Gender.FEMALE, Grade.THREE),
                new Student("小紫", 9, Gender.FEMALE, Grade.TWO),
                new Student("小王", 6, Gender.MALE, Grade.ONE),
                new Student("小李", 6, Gender.MALE, Grade.ONE),
                new Student("小马", 14, Gender.FEMALE, Grade.FOUR),
                new Student("小刘", 13, Gender.MALE, Grade.FOUR));

        // 得到所有学生的年龄列表
        // s -> s.getAge() --> Student::getAge,不会多生成一个类似 lambda$0这样的函数
        // 这里的Student::getAge是Function<Student, Integer> fun = s -> s.getAge()的简写
        // Student::getAge这种写法方法需要用静态修饰,没有参数的话似乎也可以
        List<Integer> ages = students.stream().map(Student::getAge)
                .collect(Collectors.toList());
        System.out.println("所有学生的年龄:" + ages);

//        // 达到去重效果
//        Set<Integer> set = students.stream().map(Student::getAge)
//                .collect(Collectors.toSet());
//        System.out.println("所有学生的年龄:" + set);
        // 使用toCollection可以自己指定集合实现类
        Set<Integer> set = students.stream().map(Student::getAge)
                .collect(Collectors.toCollection(TreeSet::new));
        System.out.println("所有学生的年龄:" + set);

        // 统计汇总信息
        IntSummaryStatistics summaryStatistics = students.stream()
                .collect(Collectors.summarizingInt(Student::getAge));
        System.out.println("年龄汇总信息:" + summaryStatistics);

        // 分块
        Map<Boolean, List<Student>> genders = students.stream()
                .collect(Collectors.partitioningBy(s -> s.getGender() == Gender.MALE));
        System.out.println("男女学生列表:" + genders);

        // 分组
        Map<Grade, List<Student>> grades = students.stream()
                .collect(Collectors.groupingBy(Student::getGrade));
        System.out.println("学生班级列表:" + grades);

        // 得到所有班级学生的个数
        Map<Grade, Long> gradesCount = students.stream()
                .collect(Collectors.groupingBy(Student::getGrade, Collectors.counting()));
        System.out.println("学生班级个数列表:" + gradesCount);
    }
}

7、Stream运行机制

package stream;

import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;


/**
 * 验证steam运行机制
 * 1. 所有操作时链式调用,一个元素只迭代一次
 * 2. 每一个中间操作返回一个新的流,流里面有一个属性sourceStage指向同一个地方,就是Head
 * 3. Head -> nextStage -> nextStage-> ... -> null
 * 4. 有状态操作会把无状态操作截断,单独处理
 * 5. 并行环境下,有状态的中间操作不一定能并行操作
 * 6. parallel/sequential 这两个操作也是中间操作(也是返回stream)
 *      但是他们不创建流,他们只修改Head的并行标志
 */
public class RunStream {

    public static void main(String[] args) {
        Random random = new Random();
        // 随机产生数据
        Stream<Integer> stream = Stream.generate(random::nextInt)
                // 产生500个(无限流需要短路操作)
                .limit(500)
                // 第一个无状态操作
                .peek(s -> print("peek: " + s))

                // 有状态操作
                .sorted((i1, i2) -> {
                    print("排序:" + i1 + ", " + i2);
                    return i1.compareTo(i2);
                })

                // 第二个无状态操作
                .filter(s -> {
                    print("filter" + s);
                    return s > 1000000;
                }).parallel();

         // 终止操作
        stream.allMatch(s -> true);
    }

    /**
     * 打印日志并sleep5毫秒
     * @param s
     */
    public static void print(String s) {
        // 带线程名(测试并行情况)
        System.out.println(Thread.currentThread().getName() + ">" + s);
        try {
            TimeUnit.MILLISECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

参考连接:https://www.ayulong.cn/blog/91