Reactor之Schedulers,publishOn 和 subscribeOn

发布时间 2023-05-08 20:23:16作者: shigp1

转发自:https://blog.csdn.net/get_set/article/details/79480172

Schedulers类似Executor,是Reactor的线程调度接口。提供以下几种线程执行环境:

  • 当前线程(Schedulers.immediate());
  • 可重用的单线程(Schedulers.single())。注意,这个方法对所有调用者都提供同一个线程来使用, 直到该调度器被废弃。如果你想使用独占的线程,请使用Schedulers.newSingle();
  • 弹性线程池(Schedulers.elastic())。它根据需要创建一个线程池,重用空闲线程。线程池如果空闲时间过长 (默认为 60s)就会被废弃。对于 I/O 阻塞的场景比较适用。Schedulers.elastic()能够方便地给一个阻塞 的任务分配它自己的线程,从而不会妨碍其他任务和资源;
  • 固定大小线程池(Schedulers.parallel()),所创建线程池的大小与CPU个数等同;
  • 自定义线程池(Schedulers.fromExecutorService(ExecutorService))基于自定义的ExecutorService创建 Scheduler(虽然不太建议,不过你也可以使用Executor来创建)。

Schedulers类已经预先创建了几种常用的线程池:使用single()、elastic()和parallel()方法可以分别使用内置的单线程、弹性线程池和固定大小线程池。如果想创建新的线程池,可以使用newSingle()、newElastic()和newParallel()方法。

Executors提供的几种线程池在Reactor中都支持:

  • Schedulers.single()Schedulers.newSingle()对应Executors.newSingleThreadExecutor()
  • Schedulers.elastic()Schedulers.newElastic()对应Executors.newCachedThreadPool()
  • Schedulers.parallel()Schedulers.newParallel()对应Executors.newFixedThreadPool()

Schedulers提供的以上三种调度器底层都是基于ScheduledExecutorService的,因此都是支持任务定时和周期性执行的;
Flux和Mono的调度操作符subscribeOnpublishOn支持work-stealing

 

最新版本中elastic被废弃了,重新提供了boundedElastic

Schedulers#boundedElastic

public static Scheduler boundedElastic() {
	return cache(CACHED_BOUNDED_ELASTIC, BOUNDED_ELASTIC, BOUNDED_ELASTIC_SUPPLIER);
}

static final Supplier<Scheduler> BOUNDED_ELASTIC_SUPPLIER =
		() -> newBoundedElastic(DEFAULT_BOUNDED_ELASTIC_SIZE, DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
				BOUNDED_ELASTIC, BoundedElasticScheduler.DEFAULT_TTL_SECONDS, true);

DEFAULT_BOUNDED_ELASTIC_SIZE表示全局bounddElastic()调度器的最大线程数,DEFAULT_BOUNDED_ELASTIC_SIZE由属性reactor.schedulers.defaultBoundedElasticSize设置,若未设置则初始化为10倍处理器数。
 
DEFAULT_BOUNDED_ELASTIC_QUEUESIZE表示全局bounddElastic()调度器的无法创建更多线程时要排队的最大任务数。DEFAULT_BOUNDED_ELASTIC_QUEUESIZE由属性reactor.schedulers.defaultBoundedElasticQueueSize设置,若未设置则初始化为100000。

publishOn 和 subscribeOn

publishOn 和 subscribeOn都是在指定的Scheduler中运行。当某些操作执行慢,阻碍运行速度时可以在指定的Scheduler中执行。