Java线程池在项目实战开发遇到的问题和思考

发布时间 2023-10-31 21:31:30作者: cdfive

背景

项目开发中经常会用到多线程,比如批量数据处理任务。
通过多线程并行处理,能够有效提高处理的效率和缩短处理时长。
假设某项任务需要处理1分钟,有1000个任务要处理,如果单线程每个任务顺序执行,处理时长为1000分钟(约16.67小时)。
如果10个线程同时处理,则时间缩短10倍,即100分钟(约1.67小时)处理完成。
Java开发中创建多线程,通常会使用Java并发包下提供的线程池,用它来创建和管理多线程。
如上述场景创建10个线程,有的项目中可能看到这样的代码:

int threadSize = 10;
ExecutorService es = Executors.newFixedThreadPool(threadSize);

使用了固定线程池,指定线程数为10。
这里有一个潜在的OOM风险,因为Executors#newFixedThreadPool实际是:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}

ThreadPoolExecutor构造方法的最后一个参数使用了不带参数的LinkedBlockingQueue无界队列,由于线程池核心线程数(corePoolSize)
和最大线程数(maximumPoolSize)相同,当10个线程正在运行中,新提交的任务会添加到队列中,由于队列是无界(大小为Integer.MAX_VALUE)的,队列数据会不断增长。
如果总任务数量大,且任务添加快,则队列会快速增长,内存资源占用持续增加,可能造成项目OOM

思路

在《阿里巴巴Java开发手册》中关于线程池创建有一条:

【强制】线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
说明:Executors返回的线程池对象弊端如下:
(1) FixedThreadPool和SingleThreadPool:
允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM。
(2) CachedThreadPool:
允许的创建线程数量为Integer.MAX_VALUE,可能会创建的线程,从而导致OOM。

既然Executors#newFixedThreadPool固定线程池有弊端和风险,推荐使用ThreadPoolExecutor的方式,具体应该如何使用呢?

考虑到需求是10个线程同时执行,那么ThreadPoolExecutor构造方法里的前4个参数,应该跟Executors#newFixedThreadPool是一致的。
即线程池核心线程数、最大线程数为10,空闲线程存活时间keepAliveTime为0, 单位TimeUnit.MILLISECONDS不变。

这里遇到的风险是:任务请求堆积添加到队列导致内存不断增长。
首先想到的是能否不用无界队列,指定队列的容量大小,限制内存占用,如队列参数指定new LinkedBlockingQueue<Runnable>(50)

但这样问题是任务堆积数超过了队列大小怎么办?有可能总任务数很大、或者未知、或者不好评估设定队列大小。

根据ThreadPoolExecutor原理,当最大线程数、队列都满了,会执行RejectedExecutionHandler拒绝策略。
如果策略不指定,默认是AbortPolicy,任务拒绝执行并抛出一个RejectedExecutionException

ThreadPoolExecutor类的源码里可看到,拒绝策略RejectedExecutionHandler接口有4个实现类。

  • AbortPolicy 终止执行并抛出异常。
  • DiscardPolicy 丢弃该任务。
  • DiscardOldestPolicy 将最早加入队列的任务丢弃,队列加入该任务。
  • CallerRunsPolicy 由调用者线程去执行该任务。

在实际项目开发中,任务被终止或丢弃有时是不可接受的,因为这样可能导致数据不一致或者必要的业务没有处理。

注意到CallerRunsPolicy这个策略,没有丢弃任务,由调用者线程去执行该任务。那么用它怎么样?
查看CallerRunsPolicy源码:

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
        r.run();
    }
}

线程池没有关闭,则执行调用请求线程的run方法,由于线程是调用方通过execute或submit提交的,即在调用方的线程来执行该线程的任务(即run方法)。

结合上面的场景,修改创建线程池的代码如下

int threadSize = 10;
ExecutorService es = new ThreadPoolExecutor(threadSize, threadSize, 0L, TimeUnit.MILLISECONDS
    , new LinkedBlockingQueue<Runnable>(50), new ThreadPoolExecutor.CallerRunsPolicy());

这里队列的大小改为50,指定CallerRunsPolicy的拒绝策略。

考虑往线程池里提交线程、线程池的执行和该拒绝策略的执行,发现一个问题:
当线程数满且队列满时,由于是调用方的线程来执行该任务,因此这时在该任务没执行完成前,线程池不会被提交新的线程。
用一段项目中常见的代码来看:

int threadSize = 10;
int queueSize = 50;
ExecutorService es = new ThreadPoolExecutor(threadSize, threadSize, 0L, TimeUnit.MILLISECONDS
                         , new LinkedBlockingQueue<Runnable>(queueSize), new ThreadPoolExecutor.CallerRunsPolicy());
int pageNum = 1;
int pageSize = 50;
while (true) {
    Page page = xxxRepository.queryPageList(pageNum++, pageSize);
    List list = page.getList();
    if (CollectionUtils.isEmpty(list)) { 
        break;
    }
    for (int i = 0; i < list.size(); i++) {
        // 当执行CallerRunsPolicy策略时,当前主线程来运行该任务,此时主线程阻塞了,等任务执行完成才进如下一个循环
        es.submit(new Task(list.get(i)); 
    }
    ...
    if (!page.hasNextPage()) {
       break;
    }
} 

本来线程池里10个线程在同时执行任务,由于CallerRunsPolicy拒绝策略由调用方主线程来执行任务,这个时候一共11个线程同时执行任务,
有可能任务执行时间不定,10个线程里的某些任务执行完成了,而调用方主线程里的任务还未执行完,这时会导致线程有闲置。

以生活来举例:
食堂有10个窗口,可以10个人同时打饭,这种拒绝策略相当于临时多了1个窗口,这时11个人同时打饭,但如果这多的一个窗口在处理时,
其它有些窗口速度快完成了,只能停止窗口服务,等临时的这个窗口处理完成后才能继续。
这样显然是不合理的,因为浪费了资源,极端情况下可能其它任务都已执行完成,都在等调用方线程任务的执行。

再回到我们最开始的需求,通过多线程并行处理任务提高效率。
如指定线程数10,希望是尽可能10个线程一直在同时执行任务。(任务刚开始提交或最后几个任务执行除外)

沿着这个思路,即希望10个线程多时执行,新提交的任务等待,当某个线程完成后立即取新的任务来执行。
其实Executors#newFixedThreadPool固定线程池的方式,已实现了这一需求,但它有OOM的风险。
以它为参考,考虑使用同步队列进行改进如下:

int threadSize = 10;
ExecutorService es = new ThreadPoolExecutor(threadSize, threadSize, 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        try {
            executor.getQueue().put(r);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
});

SynchronousQueue同步队列也是阻塞队列,且本身没有容量大小,这里自定义了RejectedExecutionHandler的实现类,在rejectedExecution的实现方法中,调用队列的put方法。
这样线程池满时10个线程同时执行,新的线程添加会阻塞,等待有执行完任务的线程取出。

为了更方便使用,可参考Executors类,考虑把构造线程池的方法封装起来,如:

/**
 * @author cdfive
 */
public class ThreadUtil {

    public static ThreadPoolExecutor newFixedBlockingThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads, 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                try {
                    executor.getQueue().put(r);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
    }

    public static ThreadPoolExecutor newFixedBlockingThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads, 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory, new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                try {
                    executor.getQueue().put(r);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
    }

    public static ThreadPoolExecutor newFixedBlockingThreadPool(int nThreads, String threadNamePrefix) {
        return newFixedBlockingThreadPool(nThreads, new CustomizableThreadFactory(threadNamePrefix));
    }
}

注:其中第2个方法,增加了ThreadFactory参数的构造方法;第3个方法,使用Spring的CustomizableThreadFactory类便于给线程名称增加前缀。

测试

/**
 * @author cdfive
 */
public class ThreadUtilTest {

    private static final int threadNum = 10;
    private static final int totalTaskNum = 100;
    public static final AtomicInteger unProcessedThreadNum = new AtomicInteger(totalTaskNum);

    // May OOM, if too many tasks are submitted
    @Test
    public void testCase1() {
        doTestCase(defaultFixedThreadPool(threadNum));
    }

    // Avoid risk of OOM, but some tasks may not been executed,
    // since they are aborted, if tasks are submitted too fast
    @Test
    public void testCase2() {
        doTestCase(fixedThreadPoolWithAbortPolicy(threadNum));
    }

    // Avoid risk of OOM, but not high effective,
    // since some threads may wait for submit and have no task to execute at some time
    @Test
    public void testCase3() {
        doTestCase(fixedThreadPoolWithCallerRunsPolicy(threadNum));
    }

    // Avoid risk of OOM, high effective use of all threads
    @Test
    public void testCase4() {
        doTestCase(custormFixedThreadPool(threadNum));
    }

    public static void doTestCase(ThreadPoolExecutor threadPoolExecutor) {
        long start = System.currentTimeMillis();


        Thread statThread = new Thread(() -> printThreadPoolInfo(threadPoolExecutor));
        statThread.start();

        for (int i = 1; i <= totalTaskNum; i++) {
            try {
                threadPoolExecutor.submit(new Task(i));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        threadPoolExecutor.shutdown();
        try {
            if (!threadPoolExecutor.awaitTermination(5, TimeUnit.MINUTES)) {
                threadPoolExecutor.shutdownNow();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        statThread.interrupt();

        // wait 3 second to exit
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("ThreadUtilTest done,cost=" + (System.currentTimeMillis() - start) + "ms");
    }

    public static ThreadPoolExecutor defaultFixedThreadPool(int threadNum) {
        return (ThreadPoolExecutor) Executors.newFixedThreadPool(threadNum);
    }

    public static ThreadPoolExecutor fixedThreadPoolWithAbortPolicy(int threadNum) {
        return new ThreadPoolExecutor(threadNum, threadNum,
                0L, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
    }

    public static ThreadPoolExecutor fixedThreadPoolWithCallerRunsPolicy(int threadNum) {
        return new ThreadPoolExecutor(threadNum, threadNum,
                0L, TimeUnit.MILLISECONDS,
                new SynchronousQueue<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
    }

    public static ThreadPoolExecutor custormFixedThreadPool(int threadNum) {
        return ThreadUtil.newFixedBlockingThreadPool(threadNum);
    }

    public static void printThreadPoolInfo(ThreadPoolExecutor executor) {
        boolean running = true;
        while (running && !Thread.currentThread().isInterrupted()) {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                // Ignore exception
                running = false;
            }

            System.out.println("activeThreadNum=" + executor.getActiveCount() + ",queueSize=" + executor.getQueue().size()
                    + ",unProcessedThreadNum=" + ThreadUtilTest.unProcessedThreadNum.get());
        }
    }

    private static class Task implements Runnable {

        private int taskIndex;

        public Task(int taskIndex) {
            this.taskIndex = taskIndex;
        }

        @Override
        public void run() {
//            System.out.println(Thread.currentThread().getName() + "=>task[" + taskIndex + "] start");
            long start = System.currentTimeMillis();

            try {
                TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(1, 5));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            ThreadUtilTest.unProcessedThreadNum.decrementAndGet();
//            System.out.println(Thread.currentThread().getName() + "=>task[" + taskIndex + "] end,cost=" + (System.currentTimeMillis() - start) + "ms");
        }
    }
}

编写测试代码来测试不同线程池来处理任务,通过观察输出可看到线程资源的使用情况。
注:这里使用ThreadPoolExecutor#getActiveCount()方法,可获取到线程池中正在执行任务的线程数。

总结

  • 项目开发常用的Executors.newFixedThreadPool固定线程池,因为使用无界队列有OOM的风险

  • 阻塞策略ThreadPoolExecutor.CallerRunsPolicy由调用者线程去执行该任务,可能阻塞提交线程导致线程池资源浪费

  • 理解ThreadPoolExecutor的原理和参数,有助于在项目实战开发中灵活选择和运用

  • 思路要灵活,Executors.newFixedThreadPool并不是不可用,在合适的场景也可使用,如:本地单元测试、总任务数不多没有OOM风险的场景