一、生成Flux
range
@Test
public void testRange() {
Flux.range(1,10)
.subscribe(System.out::println);
}
range第一个参数是起始的数字,第二个参数是要生成数字的数量。Flux.range(1,10)生成从1到10的数字。subscribe对上游的数据进行消费处理。
generate
生成一个无限流:
@Test
public void testGenerate() {
AtomicInteger i = new AtomicInteger();
Flux.generate(t -> {
t.next(i.getAndIncrement());
})
.subscribe(System.out::println);
}
生成一个有限流:
@Test
public void testGenerate1() {
AtomicInteger i = new AtomicInteger();
Flux.generate(t -> {
t.next(i.getAndIncrement());
if (i.get() > 10000) {
t.complete();
}
})
.subscribe(System.out::println);
}
调用complete时停止生成元素。
@Test
public void testGenerate2() {
Flux.generate(()->0,(a,b) -> {
b.next(a++);
if(a>=1000){
b.complete();
}
return a;
})
.subscribe(System.out::println);
}
生成0到999.第一个参数设置初始值,第二个参数生成下一个值。调用complete结束生成,否则一直运行。且只能调用一次next。
create
@Test
public void testCreate() {
final int[] i = {0};
Flux.create(t -> {
t.next(i[0]++);
t.next(i[0]++);
})
.subscribe(System.out::println);
}
生成0,1。能调用多次next。调用几次next就生成几个数字。
@Test
public void testCreate1() {
final int[] i = {0};
Flux.create(t -> {
while (true) {
t.next(i[0]++);
}
})
.subscribe(System.out::println);
}
生成无限流。
@Test
public void testCreate2() {
final int[] i = {0};
Flux.create(t -> {
t.next(i[0]++);
t.next(i[0]++);
}, FluxSink.OverflowStrategy.IGNORE)
.subscribe(System.out::println);
}
指定背压策略是IGNORE。
背压策略由OverflowStrategy枚举指定:
- IGNORE:完全忽略下游背压请求。
- ERROR:当下游无法跟上时,发出IllegalStateException信号。
- DROP:如果下游没有准备好接收传入信号,则丢弃传入。
- LATEST:下游将仅获得来自上游的最新信号。
- BUFFER:如果下游跟不上,缓冲所有信号。
from
@Test
public void testFrom() {
final int[] i = {0};
Flux.from(t -> {
t.onNext(i[0]++);
t.onNext(i[0]++);
})
.subscribe(System.out::println);
}
生成0和1。可以多次调用onNext。
@Test
public void testFromArray() {
Integer[] ints = {1,2,3,4,5};
Flux.fromArray(ints)
.subscribe(System.out::println);
}
从数组生成数据。
@Test
public void testIterable() {
List<Integer> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {
list.add(i);
}
Flux.fromIterable(list)
.subscribe(System.out::println);
}
从迭代器生成数据。
@Test
public void testStream() {
List<Integer> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {
list.add(i);
}
Flux.fromStream(list.stream())
.subscribe(System.out::println);
}
从流生成数据。
push
@Test
public void testPush() {
final int[] i ={0};
Flux.push(t -> {
t.next(i[0]++);
t.next(i[0]++);
})
.subscribe(System.out::println);
}
向下游推送数据。生成0和1。可以多次调用next。
just
@Test
public void testJust() {
Flux.just(1)
.subscribe(System.out::println);
}
@Test
public void testJust1() {
Flux.just(1,2,3)
.subscribe(System.out::println);
}
通过常量向下游推送数据。
二、特殊Flux
empty
@Test
public void testEmpty() {
Flux.empty()
.subscribe(System.out::println);
}
生成空的Flux。
never
@Test
public void testNever( ) {
Flux.never()
.subscribe(System.out::println);
}
创建一个Flux,它永远不会发出任何数据、错误或完成信号。
error
@Test
public void testError() {
Flux.error(new RuntimeException())
.subscribe(System.out::println);
}
创建错误的Flux。