转发自:https://blog.csdn.net/get_set/article/details/79480172
Schedulers类似Executor,是Reactor的线程调度接口。提供以下几种线程执行环境:
- 当前线程(
Schedulers.immediate()
); - 可重用的单线程(
Schedulers.single()
)。注意,这个方法对所有调用者都提供同一个线程来使用, 直到该调度器被废弃。如果你想使用独占的线程,请使用Schedulers.newSingle(); - 弹性线程池(
Schedulers.elastic()
)。它根据需要创建一个线程池,重用空闲线程。线程池如果空闲时间过长 (默认为 60s)就会被废弃。对于 I/O 阻塞的场景比较适用。Schedulers.elastic()能够方便地给一个阻塞 的任务分配它自己的线程,从而不会妨碍其他任务和资源; - 固定大小线程池(
Schedulers.parallel()
),所创建线程池的大小与CPU个数等同; - 自定义线程池(
Schedulers.fromExecutorService(ExecutorService)
)基于自定义的ExecutorService创建 Scheduler(虽然不太建议,不过你也可以使用Executor来创建)。
Schedulers类已经预先创建了几种常用的线程池:使用single()、elastic()和parallel()方法可以分别使用内置的单线程、弹性线程池和固定大小线程池。如果想创建新的线程池,可以使用newSingle()、newElastic()和newParallel()方法。
Executors提供的几种线程池在Reactor中都支持:
Schedulers.single()
和Schedulers.newSingle()
对应Executors.newSingleThreadExecutor()
;Schedulers.elastic()
和Schedulers.newElastic()
对应Executors.newCachedThreadPool()
;Schedulers.parallel()
和Schedulers.newParallel()
对应Executors.newFixedThreadPool()
;
Schedulers提供的以上三种调度器底层都是基于ScheduledExecutorService
的,因此都是支持任务定时和周期性执行的;
Flux和Mono的调度操作符subscribeOn
和publishOn
支持work-stealing
。
最新版本中elastic被废弃了,重新提供了boundedElastic
。
Schedulers#boundedElastic
public static Scheduler boundedElastic() {
return cache(CACHED_BOUNDED_ELASTIC, BOUNDED_ELASTIC, BOUNDED_ELASTIC_SUPPLIER);
}
static final Supplier<Scheduler> BOUNDED_ELASTIC_SUPPLIER =
() -> newBoundedElastic(DEFAULT_BOUNDED_ELASTIC_SIZE, DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
BOUNDED_ELASTIC, BoundedElasticScheduler.DEFAULT_TTL_SECONDS, true);
DEFAULT_BOUNDED_ELASTIC_SIZE表示全局bounddElastic()调度器的最大线程数,DEFAULT_BOUNDED_ELASTIC_SIZE由属性reactor.schedulers.defaultBoundedElasticSize设置,若未设置则初始化为10倍处理器数。
DEFAULT_BOUNDED_ELASTIC_QUEUESIZE表示全局bounddElastic()调度器的无法创建更多线程时要排队的最大任务数。DEFAULT_BOUNDED_ELASTIC_QUEUESIZE由属性reactor.schedulers.defaultBoundedElasticQueueSize设置,若未设置则初始化为100000。
publishOn 和 subscribeOn
publishOn 和 subscribeOn都是在指定的Scheduler
中运行。当某些操作执行慢,阻碍运行速度时可以在指定的Scheduler中执行。