线程池

发布时间 2023-11-27 17:16:26作者: kiper

1. 线程池

作用

  • 提升资源使用率,避免无意义的线程重复创建销毁成本

  • 提升反应速度,已提前创建线程

  • 方便管理线程资源,如可控制并发量、批量中断等

参数

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
		...
	}
  • corePoolSize
    核心线程数最大值。
    添加任务时,只要当前核心线程数小于此值,都会新建线程来执行。并且,在任务结束后超过keepAliveTime时间也不会被回收。
    核心线程从空的阻塞队列取任务时,会走workQueue.take(),基本调用Condition实例notEmpty.await()方法持续阻塞;非核心线程则走 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS),基本调用Condition实例notEmpty.awaitNanos(nanos)进行超时等待,若超过keepAliveTime后还无任务,便会返回null结束并被回收。

    Runnable r = timed ?
       workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
       workQueue.take();
    
  • maximumPoolSize
    最大线程数。当线程池当前所有线程都在执行任务,且等待队列已满时,会持续根据需要创建新的线程,并到此线程数为止。

  • keepAliveTime
    核心线程如果处于闲置状态超过该值,就会被销毁。如果设置allowCoreThreadTimeOut(true),则会也作用于核心线程。

  • unit
    时间单位,可设置纳秒、微秒、毫秒、秒、分、时、天。

  • workQueue
    阻塞线程,用于缓存任务。

    阻塞线程 类型 特点
    ArrayBlockingQueue 有界阻塞队列 数组实现,需要指定队列大小,支持公平锁和非公平锁
    LinkedBlockingQueue 无界阻塞队列 链表实现,默认容量Interge.MAX_VALUE,也可以指定大小。也是newFixedThreadPool()和newSingleThreadExecutor()的等待队列。
    LinkedBlockingDeque 有界阻塞队列 链表实现,具有双向存取功能。在高并发下,相比于LinkedBlockingQueue可以将锁竞争降低最多一半。
    PriorityBlockingQueue 无界阻塞队列 优先级排序,默认使用自然排序
    SynchronousQueue 无界阻塞队列 无缓冲的阻塞队列,没有任何内部容量,甚至连一个队列的容量都没有。
    公平模式底层使用的是先进先出的队列TransferQueue,非公平模式底层采用了后进先出的TransferStack栈来实现。
    LinkedTransferQueue 无界阻塞队列 链表实现
    DelayQueue 无界延时队列 延迟队列,该队列中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素 。底层数据是数组实现的堆结构。
  • threadFactory
    创建线程的工厂。ThreadFactory类是一个接口,只有newThread()方法。如果需要自定义线程名称格式,可以自定义类实现此接口。

    public interface ThreadFactory {
      Thread newThread(Runnable r);
    }
    
  • handler
    拒绝处理策略。
    如果核心线程数、等待队列、最大线程数都满了,那么会采取拒绝策略进行处理。拒绝策略的调用是在主线程处理的。
    默认的四种策略,均是ThreadPoolExecutor类中公共静态类:

    • AbortPolicy
      默认拒绝处理策略,丢弃任务并抛出RejectedExecutionException异常。

    • DiscardPolicy
      丢弃新来的任务,但是不抛出异常

    • DiscardOldestPolicy
      丢弃等待队列头部(最旧的)的任务,然后重新尝试执行程序,将任务添加到队列中(如果再次失败,重复此过程)

    • CallerRunsPolicy
      由调用线程处理该任务。

    也可以自定义拒绝处理策略,实现RejectedExecutionHandler接口并重写rejectedExecution(Runnable r, ThreadPoolExecutor executor)方法即可。

    public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {
      @Override
      public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
          System.out.println(r.toString() + " is rejected");
      }
    }
    

执行过程

待补充。

不建议使用Executors创建线程池

方法 特点
newFixedThreadPool 线程数固定,阻塞队列为无界队列LinkedBlockingQueue,队列过大可能内存溢出
newCachedThreadPool 核心线程数为0,阻塞队列为SynchronousQueue,但是最大线程数为Integer.MAX_VALUE,非核心线程数过多可能内存溢出
newScheduledThreadPool 阻塞队列为DelayedWorkQueue,但是最大线程数为Integer.MAX_VALUE,线程过多可能内存溢出
newSingleThreadExecutor 线程数为1,阻塞队列为无界队列LinkedBlockingQueue,队列过大可能内存溢出

2. 线程数多少合适

使用多线程的目的,是为了提升性能,落地到具体指标:

  • 吞吐量
    单位时间内能处理的请求数量

  • 延迟
    从发出请求到收到响应的时间

  • 并发量
    能同时处理的请求数量

一般随着并发量的增加,延迟也会增加。所以延迟这个指标,一般都会是基于并发量来说的。

CPU密集型

最佳线程数 = CPU逻辑核心数 + 1

当线程因为偶尔的内存页失效或其他原因导致阻塞时,这个额外的线程(+1)可以顶上,从而保证 CPU 的利用率。

IO密集型

最佳线程数 = 1 +(I/O 耗时 / CPU 耗时)

当线程 A 执I/O 耗时 / CPU 耗时行 IO 操作时,另外N个线程正好执行完各自的 CPU 计算。这样 CPU 的利用率就达到了 100%。但是实际情况很难评估具体的I/O耗时和CPU耗时,需要实际测试调整。

动态化线程池

针对涉及IO等阻塞操作的模型来说,评估实际的IO或者CPU耗时不太现实,故若是可以监控到相应指标,便可以利用ThreadPoolExecutor设置核心线程数、最大线程数等实例方法进行动态调整。
针对IO密集型任务,可以根据活跃线程数/最大线程数的比例或者RejectedExecut异常次数的阈值进行判断,适当增加动态线程数及最大线程数。
针对高吞吐量任务,可以根据等待队列中的任务数量/等待队列的长度的比例或者RejectedExecut异常次数的阈值进行判断,适当增加动态线程数及最大线程数。