Reactor接口之四

发布时间 2023-05-07 15:55:46作者: shigp1

interval

@Test
public void testInterval() {
    CountDownLatch countDownLatch = new CountDownLatch(1);
    Flux.range(1,10)
            .zipWith(Flux.interval(Duration.ofSeconds(1)))
            .subscribe(System.out::println, null ,countDownLatch::countDown);

    try {
        countDownLatch.await();
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}

interval定时生成一个Flux。输出:

[1,0]
[2,1]
[3,2]
[4,3]
[5,4]
[6,5]
[7,6]
[8,7]
[9,8]
[10,9]

 

@Test
public void testInterval1() {
    CountDownLatch countDownLatch = new CountDownLatch(1);
    Flux.interval(Duration.ofSeconds(1))
            .zipWith(Flux.range(1,10))
            .subscribe(System.out::println, null ,countDownLatch::countDown);

    try {
        countDownLatch.await();
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}

每个1秒输出一个Flux。每个1秒输出一组数据:

[0,1]
[1,2]
[2,3]
[3,4]
[4,5]
[5,6]
[6,7]
[7,8]
[8,9]
[9,10]

 

@Test
public void testInterval2() {
    CountDownLatch countDownLatch = new CountDownLatch(1);
    Flux.interval(Duration.ofSeconds(1), Duration.ofMillis(200))
            .zipWith(Flux.range(1,10))
            .subscribe(System.out::println, null ,countDownLatch::countDown);

    try {
        countDownLatch.await();
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}

和上面类似。Flux.interval第一个参数是第一次调用时延迟时间,第二个参数是定时隔间时间。

sort

@Test
public void testSort() {
    CountDownLatch countDownLatch = new CountDownLatch(1);
    Flux.create(t -> {
                for (int i = 0; i < 10; i++) {
                    t.next(random.nextInt(100));
                }
                t.complete();
            })
            .sort()
            .subscribe(System.out::println,null, countDownLatch::countDown);

    try {
        countDownLatch.await();
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}

排序。

all

@Test
public void testAll() {
    Flux.create(t -> {
                for (int i = 0; i < 10; i++) {
                    t.next(random.nextInt(100));
                }
                t.complete();
            })
            .all(k -> (int)k > 50)
            .subscribe(System.out::println);
}

all表示所有元素都满足条件则返回true,否则返回false。

any

@Test
public void testAny() {
    Flux.create(t -> {
                for (int i = 0; i < 10; i++) {
                    t.next(random.nextInt(100));
                }
                t.complete();
            })
            .any(k -> (int)k > 50)
            .subscribe(System.out::println);
}

any表示有一个元素返回true则返回true,否则返回false。

buffer

@Test
public void testBuffer() {
    Flux.range(1,10)
            .buffer()
            .subscribe(System.out::println);
}

buffer将所有传入的值收集到单个List缓冲区中,该缓冲区将在该Flux完成后由返回的Flux发出。

cache

@Test
public void testCache() {
    Flux.range(1,10)
            .cache()
            .subscribe(System.out::println);
}

cache将此Flux转换为一个热点源,并缓存最后发出的信号以供下一个Subscriber使用。将保留无限量的onNext信号。完成和错误也将被重播。

collectSortedList

@Test
public void testClooectSortedList() {
    Flux.create(t -> {
                for (int i = 0; i < 10; i++) {
                    t.next(random.nextInt(100));
                }
                t.complete();
            })
            .collectSortedList()
            .subscribe(System.out::println);
}

collectSortedList排序并组合成List。