Reactor接口之一

发布时间 2023-05-06 15:38:22作者: shigp1

一、生成Flux

range

@Test
public void testRange() {
    Flux.range(1,10)
            .subscribe(System.out::println);
}

range第一个参数是起始的数字,第二个参数是要生成数字的数量。Flux.range(1,10)生成从1到10的数字。subscribe对上游的数据进行消费处理。

generate

生成一个无限流:

@Test
public void testGenerate() {
    AtomicInteger i = new AtomicInteger();
    Flux.generate(t -> {
        t.next(i.getAndIncrement());
    })
    .subscribe(System.out::println);
}

 

生成一个有限流:

@Test
public void testGenerate1() {
    AtomicInteger i = new AtomicInteger();
    Flux.generate(t -> {
                t.next(i.getAndIncrement());

                if (i.get() > 10000) {
                    t.complete();
                }
            })
            .subscribe(System.out::println);
}

调用complete时停止生成元素。

 

@Test
public void testGenerate2() {
    Flux.generate(()->0,(a,b) -> {
        b.next(a++);
        if(a>=1000){
            b.complete();
        }
        return a;
    })
    .subscribe(System.out::println);
}

生成0到999.第一个参数设置初始值,第二个参数生成下一个值。调用complete结束生成,否则一直运行。且只能调用一次next。

create

@Test
public void testCreate() {
    final int[] i = {0};
    Flux.create(t -> {
        t.next(i[0]++);
        t.next(i[0]++);
    })
     .subscribe(System.out::println);
}

生成0,1。能调用多次next。调用几次next就生成几个数字。
 

@Test
public void testCreate1() {
    final int[] i = {0};
    Flux.create(t -> {
                while (true) {
                    t.next(i[0]++);
                }
            })
            .subscribe(System.out::println);
}

生成无限流。

 

@Test
public void testCreate2() {
    final int[] i = {0};
    Flux.create(t -> {
                t.next(i[0]++);
                t.next(i[0]++);
            }, FluxSink.OverflowStrategy.IGNORE)
            .subscribe(System.out::println);
}

指定背压策略是IGNORE。

 

背压策略由OverflowStrategy枚举指定:

  • IGNORE:完全忽略下游背压请求。
  • ERROR:当下游无法跟上时,发出IllegalStateException信号。
  • DROP:如果下游没有准备好接收传入信号,则丢弃传入。
  • LATEST:下游将仅获得来自上游的最新信号。
  • BUFFER:如果下游跟不上,缓冲所有信号。

from

@Test
public void testFrom() {
    final int[] i = {0};

    Flux.from(t -> {
        t.onNext(i[0]++);
        t.onNext(i[0]++);
    })
    .subscribe(System.out::println);
}

生成0和1。可以多次调用onNext。

 

@Test
public void testFromArray() {
    Integer[] ints = {1,2,3,4,5};

    Flux.fromArray(ints)
            .subscribe(System.out::println);
}

从数组生成数据。

 

@Test
public void testIterable() {
    List<Integer> list = new ArrayList<>();
    for (int i = 0; i < 10; i++) {
        list.add(i);
    }

    Flux.fromIterable(list)
            .subscribe(System.out::println);
}

从迭代器生成数据。

 

@Test
public void testStream() {
    List<Integer> list = new ArrayList<>();
    for (int i = 0; i < 10; i++) {
        list.add(i);
    }

    Flux.fromStream(list.stream())
            .subscribe(System.out::println);
}

从流生成数据。

push

@Test
public void testPush() {
    final int[] i ={0};

    Flux.push(t -> {
        t.next(i[0]++);
        t.next(i[0]++);
    })
   .subscribe(System.out::println);
}

向下游推送数据。生成0和1。可以多次调用next。

just

@Test
public void testJust() {
    Flux.just(1)
            .subscribe(System.out::println);
}

@Test
public void testJust1() {
    Flux.just(1,2,3)
            .subscribe(System.out::println);
}

通过常量向下游推送数据。

二、特殊Flux

empty

@Test
public void testEmpty() {
    Flux.empty()
            .subscribe(System.out::println);
}

生成空的Flux。

 

never

@Test
public void testNever( ) {
    Flux.never()
            .subscribe(System.out::println);
}

创建一个Flux,它永远不会发出任何数据、错误或完成信号。

 

error

@Test
public void testError() {
    Flux.error(new RuntimeException())
            .subscribe(System.out::println);
}

创建错误的Flux。