Reactor接口之五

发布时间 2023-05-07 17:06:19作者: shigp1

defaultIfEmpty

@Test
public void testDefaultIfEmpty() {
    Flux.range(1,10)
            .defaultIfEmpty(30)
            .subscribe(System.out::println);
}

@Test
public void testDefaultIfEmpty1() {
    Flux.empty()
            .defaultIfEmpty(30)
            .subscribe(System.out::println);
}

defaultIfEmpty为空Flux提供默认值。

last

@Test
public void testLast() {
    Flux.range(1,10)
            .last()
            .subscribe(System.out::println);
}

获取最后一个元素。输出10

skip

@Test
public void testSkip() {
    Flux.range(1,10)
            .skip(5)
            .subscribe(System.out::println);
}

skip跳过n个元素。输出6到10。

@Test
public void testSkip1() {
    Flux.range(1,10)
            .skip(5)
            .subscribe(new Subscriber<Integer>() {
                @Override
                public void onSubscribe(Subscription subscription) {
                    subscription.request(2);
                }

                @Override
                public void onNext(Integer integer) {
                    System.out.println(integer);
                }

                @Override
                public void onError(Throwable throwable) {
                    if (throwable != null) {
                        throwable.printStackTrace();
                    }
                }

                @Override
                public void onComplete() {
                    System.out.println("complete");
                }
            });
}

调用subscription.request来实现limit功能。再加上skip可以实现分页功能。

log

@Test
public void testLog() {
    Flux.range(1,10)
            .log()
            .subscribe();
}

log类似日志打印。

doOn系列函数

@Test
public void testDoOn() {
    Flux.range(1,10)
            .doFirst(() -> System.out.println("first"))
            .doOnNext(System.out::println)
            .doOnRequest(t -> {
                System.out.println("request:" + t);
            })
            .doOnCancel(() -> System.out.println("cancel"))
            .doOnComplete(() -> System.out.println("complete"))
            .doOnTerminate(() -> System.out.println("terminate"))
            .subscribe();
}

输出:

first
request:9223372036854775807
1
2
3
4
5
6
7
8
9
10
complete
terminate

首先执行doFirst,下游向上游请求数据时执行doOnRequest且执行一次。有数据时执行doOnNext,发生了cancel执行doOnCancel。请求完成执行doOnComplete。最后执行doOnTerminate。doOnRequest默认是向上游请求 Long.MAX_VALUE个数据。

 

@Test
public void testDoOn1() {
    Flux.range(1,10)
            .doFirst(() -> System.out.println("first"))
            .doOnNext(System.out::println)
            .doOnRequest(t -> {
                System.out.println("request:" + t + ":"+(t==Long.MAX_VALUE));
            })
            .doOnCancel(() -> System.out.println("cancel"))
            .doOnComplete(() -> System.out.println("complete"))
            .doOnTerminate(() -> System.out.println("terminate"))
            .doOnSubscribe(t -> {
                t.request(9);
                t.cancel();
            })
            .subscribe();
}

输出:

first
request:9:false
1
2
3
4
5
6
7
8
9
cancel
request:9223372036854775807:true

doOnSubscribe执行了cancel后执行了doOnCancel。cancel执行后没有执行doOnComplete和doOnTerminate。

 

@Test
public void testDoOn3() {
    Flux.just(1,Flux.error(new RuntimeException()))
            .doOnError(e -> System.out.println(e.getClass()))
            .doOnNext(System.out::println)
            .subscribe();
}

当发生异常时执行doOnError。

 

@Test
public void testDoOn4() {
    Flux.just(1,Flux.error(new RuntimeException()))
            .doFirst(() -> System.out.println("first"))
            .doOnError(e -> System.out.println(e.getClass()))
            .doOnNext(System.out::println)
            .doOnRequest(t -> {
                System.out.println("request:" + t + ":"+(t==Long.MAX_VALUE));
            })
            .doOnCancel(() -> System.out.println("cancel"))
            .doOnComplete(() -> System.out.println("complete"))
            .doOnTerminate(() -> System.out.println("terminate"))
            .doOnSubscribe(t -> {
                t.request(9);
                t.cancel();
            })
            .subscribe();
}

输出:

first
request:9:false
1
FluxError
complete
terminate
cancel
cancel

执行出现异常时,先执行doOnError,在执行doOnComplete,最后执行doOnTerminate。

@Test
public void testDoOn5() {
    Flux.just(1,Flux.error(new RuntimeException()))
            .doOnEach(System.out::println)
            .subscribe();
}

输出:

doOnEach_onNext(1)
doOnEach_onNext(FluxError)
onComplete()

有元素时执行doOnNext,出现异常时执行doOnError,最后执行doOnComplete。doOnEach相当于简化了doOnNext,doOnError,doOnComplete的调用。

onErrorResume

@Test
public void testOnErrorResume() {
   Flux.range(1,10)
           .map(k -> 4/(k-2))
            .onErrorResume(e -> Flux.error(new RuntimeException(e.getMessage())))
            .subscribe(System.out::println);
}

当发生异常时,封装成另一个异常。

 

@Test
public void testOnErrorResume1() {
    Flux.range(1,10)
            .map(k -> 4/(k-2))
            .onErrorResume(e -> Flux.just(0))
            .subscribe(System.out::println);
}

.
发生异常时给个默认值。输出-4,0