Reactor接口之二

发布时间 2023-05-06 17:03:03作者: shigp1

merge

@Test
public void testMerge() {
    Flux.merge(Flux.just(1,2,3),Flux.range(5,6))
            .subscribe(System.out::println);
}

merge将多个Flux合并成一个Flux。

 

@Test
public void testMerge1() {
    Flux.range(1,5)
            .mergeWith(Flux.just(8,9,10))
            .subscribe(System.out::println);
}

mergeWith和merge相同。

 

@Test
public void testMerge2() {
    Flux.range(0,10)
            .mergeComparingWith(Flux.just(11,-1,-2,-3,-4),Integer::compareTo)
            .subscribe(System.out::println);
}

每个Flux之间排序后合并成一个Flux。输出如下:

mergeComparingWith以Flux的第一个元素比较并排序,是Flux之间的排序,不会改变单个Flux之间的元素顺序。改成:

@Test
public void testMerge2() {
    Flux.range(0,10)
            .mergeComparingWith(Flux.just(-1,-2,-3,-4),Integer::compareTo)
            .subscribe(System.out::println);
}

输出:

 

@Test
public void testMerge3() {
    Flux.merge(Flux.just(Flux.just(1,2,3),Flux.range(-10,5)))
            .subscribe(System.out::println);
}

Flux的元素是Flux,也能合并成一个Flux。嵌套的Flux合并成一个Flux。

 

@Test
public void testMerge4() {
    Flux.mergePriority(Flux.range(1,10),Flux.just(-8,-1,-9))
            .subscribe(System.out::println);
}

mergePriority以自然顺序排序后合并成一个Flux。但是用int类型没有排序。

 

@Test
public void testMerge5() {
    Flux.mergeSequential(Flux.range(1,10),Flux.just(-8,-1,-9))
            .subscribe(System.out::println);
}

与merge不同,mergeSequential发出的值按订阅顺序合并到最终序列中。

concat

@Test
public void testConcat() {
    Flux.concat(Flux.range(1,10),Flux.just("a","b","c"))
            .subscribe(System.out::println);
}

将多个Flux连接成一个Flux。

 

 @Test
public void testConcat1() {
    Flux.concat(Flux.range(1,10),Flux.error(new RuntimeException()),Flux.just("a","b","c"))
            .subscribe(System.out::println);
}

如果遇到异常则停止处理。只打印1到10。

 

@Test
public void testConcat3() {
    Flux.range(1,10)
            .concatWith(t -> {
                t.onNext(20);
                t.onNext(30);
            })
            .subscribe(System.out::println);
}

concatWith和concat相同。
 

@Test
public void testConcat4() {
    Flux.range(1,10)
            .concatWithValues(20,30)
            .subscribe(System.out::println);
}

concatWithValues和concat相同。

 

@Test
public void testConcatDelayError() {
    Flux.concatDelayError(Flux.range(1,10), Flux.error(new RuntimeException()), Flux.just(20,30))
            .subscribe(System.out::println);
}

@Test
public void testConcatDelayError1() {
    Flux.concatDelayError(Flux.range(1,10), Flux.just(20,30),Flux.error(new RuntimeException()))
            .subscribe(System.out::println);
}

上面两个单测运行结果相同,都是1到10,20,30。concatDelayError延迟异常的抛出,如果处理遇到异常,要等处理完后才抛出异常。