java并发之线程池

发布时间 2024-01-02 21:01:44作者: wangzhilei

创建线程池参数

参数名

类型

含义

corePoolSize

int

核心线程数,详解见下文

maxPoolSize

int

最大线程数,详解见下文

keepAliveTime

long

保持存活时间

workQueue

BlockingQueue

任务存储队列

threadFactory

ThreadFactory

当线程池需要新的线程的时
候,会使用threadFactory来
生成新的线程

Handler

RejectedExecutionHandler

由于线程池无法接受你所提
交的任务的拒绝策略

corePoolSize指的是核心线程数:线程池在完成初始化后,默认情况下,线程池中并没有任何线程,线程池会等待有任务到来时再创建新线程去执行任务。

maxPoolSize指线程池有可能会在核心线程数的基础上,额外增加一些线程,但是这些新增加的线程数有一个上限,这就是最大量

keepAliveTime

  如果线程池当前的线程数多于corePoolSize,那么如果多余的线程空闲时间超过keepAliveTime,它们就会被终止.

ThreadFactory

  新的线程是由ThreadFactory创建的,默认使用Executors.defaultThreadFactory(),创建出来的线程都在同一个线程组,拥有同样的NORM_PRIORITY优先级并且都不是守护线程。如果自己指定ThreadFactory,那么就可以改变线程名、线程组、优先级、是否是守护线程等。

workQueue

  常见的队列类型:

    1)直接交接:SynchronousQueue

    2)无界队列:LinkedBlockingQueue

    3)有界的队列:ArrayBlockingQueue

    4)延迟队列:DelayedWorkQueue

添加线程规则

1.如果线程数小于corePoolSize,即使其他工作线程处于空闲状态,也会创建一个新线程来运行新任务。

2.如果线程数等于(或大于)corePoolSize但少于maximumPoolSize,则将任务放入队列。

3.如果队列已满,并且线程数小于maxPoolSize,则创建一个新线程来运行任务

4.如果队列已满,并且线程数大于或等于maxPoolSize,则拒绝该任务。

增减线程的特点

  1. 通过设置corePoolSize和maximumPoolSize相同,就可以创建固定大小的线程池。
  2. 线程池希望保持较少的线程数,并且只有在负载变得很大时才增加它。
  3. 通过设置maximumPoolSize为很高的值,例如Integer.MAX_VALUE,可以允许线程池容纳任意数量的并发任务。
  4. 是只有在队列填满时才创建多于corePoolSize的线程,所以如果你使用的是无界队列(例如LinkedBlockingQueue),那么线程数就不会超过corePoolSize。

线程池线程数量设定

  • CPU密集型(加密、计算hash等):最佳线程数为CPU核心数的1-2倍左右。
  • 耗时IO型(读写数据库、文件、网络读写等):最佳线程数一般会大于cpu核心数很多倍,以JVM线程监控显示繁忙情况为依据,保证线程空闲可以衔接上,参考Brain Goetz推荐的计算方法:线程数=CPU核心数*(1+平均等待时间/平均工作时间)

拒绝任务

  • 拒绝时机
  1. 当Executor关闭时,提交新任务会被拒绝。
  2. 以及当Executor对最大线程和工作队列容量使用有限边界并且已经饱和时
  • 4种拒绝策略
  1. AbortPolicy
  2. DiscardPolicy
  3. DiscardOldestPolicy
  4. CallerRunsPolicy

钩子方法

每个任务执行前后实现相应功能,比如,日志,统计等。

/**
 * 描述:每个任务执行前后放钩子函数
 */
public class PauseableThreadPool extends ThreadPoolExecutor {

    private final ReentrantLock lock = new ReentrantLock();
    private Condition unpaused = lock.newCondition();
    private boolean isPaused;


    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
            TimeUnit unit,
            BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
            TimeUnit unit, BlockingQueue<Runnable> workQueue,
            ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }

    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
            TimeUnit unit, BlockingQueue<Runnable> workQueue,
            RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }

    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
            TimeUnit unit, BlockingQueue<Runnable> workQueue,
            ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory,
                handler);
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        lock.lock();
        try {
            while (isPaused) {
                unpaused.await();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    private void pause() {
        lock.lock();
        try {
            isPaused = true;
        } finally {
            lock.unlock();
        }
    }

    public void resume() {
        lock.lock();
        try {
            isPaused = false;
            unpaused.signalAll();
        } finally {
            lock.unlock();
        }
    }
}