Reactor接口之三

发布时间 2023-05-07 15:20:30作者: shigp1

defer

@Test
public void testDefer() {
    Flux.defer(() -> {
        return Flux.range(0,10);
    })
            .subscribe(System.out::println);
}

输出0到9。defer每次对结果Flux进行订阅时,懒惰地提供发布服务。因此实际的源实例化会推迟到每次订阅时。

collect

 @Test
public void testCollect() {
    Flux.range(1,10)
            .collect(Collectors.toList())
            .subscribe(System.out::println);
}

收集成List。输出[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]。打印的是List。

 

@Test
public void testCollect1() {
    Flux.just("a","b","c","a")
            .collect(Collectors.toSet())
            .subscribe(System.out::println);
}

收集成Set。打印[a, b, c]。

 

@Test
public void testCollect2() {
    Flux.range(1,10)
            .collectList()
            .subscribe(System.out::println);
}

collectList和collect(Collectors.toList())相同。

 

@Test
public void testCollect3() {
    Flux.range(1,10)
            .collect(Collectors.toMap(k -> k,k->k))
            .subscribe(System.out::println);
}

@Test
public void testCollect4() {
    Flux.range(1,10)
            .collectMap(k -> k,k->k)
            .subscribe(System.out::println);
}

以上两个功能相同,转化成Map。输出:{1=1, 2=2, 3=3, 4=4, 5=5, 6=6, 7=7, 8=8, 9=9, 10=10}

 

repeat

@Test
public void testRepeat() {
    Flux.range(1,10)
            .repeat()
            .subscribe(System.out::println);
}

无限重复Flux.range(1,10)。

 

@Test
public void testRepeat1() {
    Flux.range(1,10)
            .repeat(2)
            .subscribe(System.out::println);
}

先输出Flux.range(1,10),在重复Flux.range(1,10)两次,输出:

1
2
3
4
5
6
7
8
9
10
1
2
3
4
5
6
7
8
9
10
1
2
3
4
5
6
7
8
9
10

 

Filter

@Test
public void testFilter() {
    Flux.create(t -> {
                for (int i = 0; i < 10; i++) {
                    t.next(random.nextInt(100));
                }
            })
            .filter(k -> (int)k > 50)
            .subscribe(System.out::println);
}

过滤出满足条件的数据。filter筛选出大于50的数。

map

@Test
public void testMap() {
    Flux.create(t -> {
                for (int i = 0; i < 10; i++) {
                    t.next(random.nextInt(100));
                }
            })
            .map(k -> (int)k * (int)k)
            .subscribe(System.out::println);
}

map对数据做映射。上面例子对每个数求平方。

count

@Test
public void testCount() {
    Flux.range(1,10)
            .count()
            .subscribe(System.out::println);
}

计数。输出10

distinct

@Test
public void testDistinct() {
    Flux.just(1,1,2,3)
            .distinct()
            .subscribe(System.out::println);
}

去重。输出1,2,3

zip

@Test
public void testZip() {
    Flux.range(1,5)
            .zipWith(Flux.range(6,10))
            .subscribe(System.out::println);
}

zipWith类似拉链的作用,将两个Flux像拉链一样组合起来。用于将两个不同流速的Flux统一流速。输出:

[1,6]
[2,7]
[3,8]
[4,9]
[5,10]

 

@Test
public void testZip1() {
    Flux.range(1,5)
            .zipWith(Flux.range(6,12))
            .subscribe(System.out::println);
}

第二个Flux的数量多余第一个Flux数量。将第二个Flux多出的元素丢弃。输出:

[1,6]
[2,7]
[3,8]
[4,9]
[5,10]

 

@Test
public void testZip2() {
    Flux.range(1,8)
            .zipWith(Flux.range(6,10))
            .subscribe(System.out::println);
}

第一个Flux的数量多余第二个Flux数量。在这里将第二个Flux不足的元素继续递增。输出:

[1,6]
[2,7]
[3,8]
[4,9]
[5,10]
[6,11]
[7,12]
[8,13]