Java线程池使用浅谈

发布时间 2023-12-19 23:14:41作者: 空慧居士

1. 线程池相关基本概念

  1. 任务(Task):任务是线程池中要执行的工作单元。任务可以是实现了 Runnable 接口或 Callable 接口的对象。Runnable 任务没有返回值,而 Callable 任务可以返回一个结果。

  2. 线程池管理器(ThreadPool Manager):线程池管理器是用于创建和管理线程池的组件。它负责创建线程池,控制线程的创建和销毁,并调度任务的执行。

  3. 工作线程(Worker Threads):工作线程是线程池中实际执行任务的线程。线程池中可以有多个工作线程,它们并行地从任务队列中获取任务并执行。

  4. 任务队列(Task Queue):任务队列是用于存储待执行的任务的数据结构。当线程池中的工作线程空闲时,它们会从任务队列中获取任务并执行。

  5. 拒绝策略(Rejection Policy):拒绝策略定义了当任务队列已满且无法继续接收新的任务时,线程池应该如何处理新的任务。常见的拒绝策略包括丢弃任务、丢弃最早的任务、抛出异常等。

  6. 线程池大小(Pool Size):线程池大小指定了线程池中工作线程的数量。线程池的大小可以是固定的,也可以是根据需要自动调整的。

  7. 核心线程数(Core Pool Size):核心线程数是线程池中保持活动状态的最小工作线程数量。即使线程处于空闲状态,核心线程也不会被销毁。

  8. 最大线程数(Maximum Pool Size):最大线程数是线程池中允许的最大工作线程数量。当任务队列已满且活动线程数达到最大线程数时,线程池可能会创建新的线程来执行任务。

  9. 闲置线程回收时间(Keep-Alive Time):闲置线程回收时间是指当线程池中的线程数超过核心线程数,并且空闲一段时间后,多余的线程会被销毁。

  10. 线程工厂(Thread Factory):线程工厂用于创建线程池中的工作线程。它负责创建线程,并可以自定义线程的属性和命名方式。

线程池的设计目的是提高系统的性能和资源利用率。通过重用线程和控制并发线程的数量,线程池可以减少线程创建和销毁的开销,避免资源耗尽,并提供更好的任务调度和执行控制。

在使用线程池时,我们可以根据任务的类型和系统的需求来选择适当的线程池大小、拒绝策略和其他参数,以实现最佳的性能和可扩展性。

2. 线程池主要处理流程

  • 判断核心线程池是否已满,如果不是,则创建线程执行任务
  • 如果核心线程池满了,判断队列是否满了,如果队列没满,将任务放在队列中
  • 如果队列满了,则判断线程池是否已满,如果没满,创建线程执行任务
  • 如果线程池也满了,则按照拒绝策略对任务进行处理

更进一步的里层核心类处理流程:

当我们使用 Java 中的 ThreadPoolExecutor 类来创建线程池时,其处理流程如下:

  1. 任务提交:外部调用者通过调用 ThreadPoolExecutor 的 execute() 或 submit() 方法将任务提交给线程池。

  2. 任务接收ThreadPoolExecutor 接收到任务后,首先检查线程池的状态。如果线程池已经关闭,就不再接收新的任务。

  3. 任务排队:线程池将接收到的任务放入内部的任务队列中等待执行。任务队列可以是有界队列(如 ArrayBlockingQueue)或无界队列(如 LinkedBlockingQueue 或 SynchronousQueue)。

  4. 工作线程获取任务:线程池中的工作线程从任务队列中获取任务。如果任务队列为空,工作线程可能会阻塞等待新任务的到来,或者在等待一定时间后退出。

  5. 任务执行:工作线程获取到任务后,调用任务对象的 run() 方法执行任务的具体逻辑。

  6. 任务完成:任务执行完成后,可以返回一个结果(对于 Callable 任务),或者不返回任何结果(对于 Runnable 任务)。

  7. 任务状态更新ThreadPoolExecutor 会更新任务的状态,包括任务的执行进度、执行结果等信息。

  8. 结果返回:如果任务是 Callable 任务,ThreadPoolExecutor 会将任务的执行结果封装在 Future 对象中返回给调用者。调用者可以通过 Future 对象获取任务的执行结果。

  9. 继续处理下一个任务:工作线程完成当前任务后,会继续从任务队列中获取下一个任务进行处理。

  10. 线程回收:如果线程池中的线程处于空闲状态,并且空闲时间超过一定阈值,ThreadPoolExecutor 可能会回收这些空闲线程,以避免资源的浪费。

  11. 异常处理ThreadPoolExecutor 会捕获任务执行过程中的异常,并根据预定义的异常处理策略进行处理,比如记录日志、统计异常次数等。

  12. 线程池关闭:当不再需要线程池时,调用 ThreadPoolExecutor 的 shutdown() 或 shutdownNow() 方法来停止线程池的运行。关闭线程池的过程包括不再接收新任务、等待已提交的任务执行完成、销毁工作线程等操作。

ThreadPoolExecutor 是 Java 中用于创建和管理线程池的核心类,通过其灵活的配置参数,可以实现对线程池的各种行为和特性进行定制。这个类提供了丰富的方法和选项,用于控制线程池的大小、任务队列类型、拒绝策略、线程工厂等,以满足不同场景下的需求。

3. 线程池实现的主要步骤

3.1 创建线程池

在 Java 中,可以使用 Executors 类提供的静态方法创建线程池。以下是几种常见的创建线程池的方法:

3.1.1 创建固定大小的线程池

固定大小的线程池将在初始化时创建指定数量的线程,并且不会增加或减少线程的数量。

ExecutorService executorService = Executors.newFixedThreadPool(10);//创建一个固定大小为 10 的线程池
3.1.2 创建单个线程的线程池

单个线程的线程池只会创建一个工作线程来执行任务。

ExecutorService executorService = Executors.newSingleThreadExecutor();
3.1.3 创建可根据需要自动调整大小的线程池

可根据需要自动调整大小的线程池将根据任务的数量动态地增加或减少线程的数量。

ExecutorService executorService = Executors.newCachedThreadPool();
3.1.4 手动按自己需求创建线程池

在 Java 中,可以手动创建线程池,而不仅仅依赖于内置的线程池实现。手动创建线程池的主要原因是为了更好地控制线程池的行为、特性和参数配置,以满足特定的需求,比如特定的任务队列类型需求,特定的拒绝策略需求

根据ThreadPoolExecutor构造方法可知,需要准备以下参数:

  1. 核心线程数(corePoolSize):核心线程数是线程池中保持活动状态的线程数。即使这些线程处于空闲状态,它们也不会被回收。线程池会根据任务的数量和任务队列的状态来动态调整线程池中的线程数量。

  2. 最大线程数(maximumPoolSize):最大线程数指定了线程池中允许存在的最大线程数量。当任务数量超过核心线程数并且任务队列已满时,线程池会创建新的线程来处理任务,直到达到最大线程数。如果达到最大线程数后仍有任务到来,采用拒绝策略处理新任务。

  3. 空闲线程存活时间(keepAliveTime):当线程池中的线程数超过核心线程数,并且处于空闲状态时,空闲线程存活时间指定了它们在没有接收到新任务时的存活时间。超过存活时间后,空闲线程将被回收,直到线程池中的线程数不超过核心线程数。

  4. 时间单位(unit):用于指定时间参数的单位,可以是秒、毫秒、微秒等。

  5. 任务队列(workQueue):任务队列用于存储等待执行的任务。可以选择合适的队列类型,如有界队列(如 ArrayBlockingQueue)或无界队列(如 LinkedBlockingQueue)。

  6. 线程工厂(threadFactory):线程工厂用于创建线程对象。可以自定义线程工厂类,实现创建线程的逻辑。

  7. 拒绝策略(rejectedExecutionHandler):当任务队列已满并且线程池中的线程数达到最大线程数时,拒绝策略指定了如何处理新的任务。可以选择预定义的拒绝策略,如抛出异常、丢弃任务等,或者自定义拒绝策略。

import java.util.concurrent.*;

// 自定义拒绝策略类
class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
        // 自定义拒绝策略的逻辑
        System.out.println("Task Rejected: " + runnable.toString());
        // 可根据需求进行不同的处理方式,如抛出异常、丢弃任务、调用者执行等
    }
}

// 自定义线程工厂类
class CustomThreadFactory implements ThreadFactory {
    @Override
    public Thread newThread(Runnable runnable) {
        // 自定义线程工厂的逻辑
        Thread thread = new Thread(runnable);
        // 可以进行一些线程属性的配置,如设置线程名称、优先级等
        thread.setName("CustomThread");
        thread.setPriority(Thread.NORM_PRIORITY);
        return thread;
    }
}

public class ManualThreadPoolCreationExample {
    public static void main(String[] args) {
        // 创建自定义的拒绝策略实例
        RejectedExecutionHandler rejectionHandler = new CustomRejectedExecutionHandler();

        // 创建自定义的线程工厂实例
        ThreadFactory threadFactory = new CustomThreadFactory();

        // 创建任务队列(这里使用无界队列)
        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();

        // 创建线程池并进行手动配置
        int corePoolSize = 10;
        int maxPoolSize = 20;
        long keepAliveTime = 60;
        TimeUnit timeUnit = TimeUnit.SECONDS;
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
                corePoolSize,
                maxPoolSize,
                keepAliveTime,
                timeUnit,
                workQueue,
                threadFactory,
                rejectionHandler
        );
    }
}

3.2 提交任务给线程池执行

创建线程池之后,可以将任务提交给线程池执行。任务可以是实现了 Runnable 接口的对象,也可以是实现了 Callable 接口的对象。

3.2.1  提交 Runnable 任务

executorService.execute(new Runnable() {
    @Override
    public void run() {
        // 任务逻辑
    }
});

或者使用 Lambda 表达式:

executorService.execute(() -> {
    // 任务逻辑
});

3.2.2 提交 Callable 任务

Callable 任务可以返回一个结果

Future<SomeResult> future = executorService.submit(new Callable<SomeResult>() {
    @Override
    public SomeResult call() throws Exception {
        // 任务逻辑
        return someResult;
    }
});

或者使用 Lambda 表达式:


Future<SomeResult> future = executorService.submit(() -> {
    // 任务逻辑
    return someResult;
});

3.3 关闭线程池

当不再需要线程池时,应该显式地关闭它,以释放资源。关闭线程池两种方式

executorService.shutdown(); // 不再接受新的任务,但会等待已提交的任务执行完成。

executorService.shutdownNow(); //希望立即关闭线程池,并尝试中断正在执行的任务

3.4 处理任务执行结果

当提交任务给线程池执行后,可以通过 Future 对象来获取任务的执行结果。

Future<SomeResult> future = executorService.submit(...);

try {
    SomeResult result = future.get();
    // 处理结果
} catch (InterruptedException e) {
    // 处理中断异常
} catch (ExecutionException e) {
    // 处理执行异常
}

使用 future.get() 方法可以阻塞当前线程,直到任务执行完成并返回结果。get() 方法可能会抛出 InterruptedException 和 ExecutionException 异常,需要进行适当的异常处理。