《Java 并发编程的艺术》实验02-4 JUC Executor的使用

发布时间 2023-10-31 16:31:10作者: Ba11ooner

Executor 框架的使用

ThreadPoolExecutor

简介

Executor 机制实现了工作单元和执行机制的分离

ThreadPoolExecutor 实现了 Executor 接口,是 Java 线程池的根本实现类之一。它提供了更丰富的配置参数,例如核心线程数、最大线程数、线程空闲时间等,使得开发人员能够更加灵活地配置和管理线程池。通过合理地设置这些参数,可以有效地控制线程池的并发度,提高系统的性能和稳定性。

  • corePoolSize为核心线程数,表示线程池中始终保持的活动线程数
  • maximumPoolSize为最大线程数,表示线程池中能够容纳的最大线程数
  • keepAliveTime为非核心线程的存活时间
  • unit为存活时间的时间单位
  • workQueue表示线程池中的任务队列
  • threadFactory为线程工厂,用于创建新线程
  • handler为拒绝执行的处理器,即拒绝策略
实验
实验目的

掌握 ThreadPoolExecutor 类的基本使用,理解七大参数的含义,感受 Executor 对于工作单元和执行机制的分离机制,尝试从 “线程池 + 任务” 的视角进行多线程编程

实验内容

高并发系统通常需要处理大量的用户请求,使用线程池可以实现线程的复用,减少线程的创建和销毁开销,提高系统的并发能力。

本实验中,我们以在线商城的商品搜索为例。用户在搜索框中输入关键字后,系统会开启一个新的线程来处理搜索请求,查询数据库,返回搜索结果。

实验过程
  1. 创建 ThreadPoolExecutor
  2. 创建任务
  3. 向 ThreadPoolExecutor 上传任务

注意事项:Executor 接口只声明了 execute() 方法,如果要在面向接口编程的限制下使用完整的线程池功能,需要用 ExecutorService 接口承接 ThreadPoolExecutor 这一实现类

示例代码
public class ThreadPoolExecutorDemo {

    static class Task implements Runnable {
        private final int id;

        public Task(int id) {
            this.id = id;
        }

        @Override
        public void run() {
            try {
                System.out.println("Task-" + id + " 开始执行");
                // 模拟任务执行耗时
                Thread.sleep((long) (Math.random() * 1000));
                System.out.println("Task-" + id + " 执行完毕");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        //线程池参数展开
        int corePoolSize = 1;
        int maximumPoolSize = 3;
        long keepAliveTime = 10L;
        TimeUnit timeUnit = TimeUnit.SECONDS;
        BlockingQueue blockingQueue = new LinkedBlockingQueue<>(100);
        ThreadFactory threadFactory = Executors.defaultThreadFactory();
        RejectedExecutionHandler policy = new ThreadPoolExecutor.AbortPolicy();

        //声明线程池
        ExecutorService executor = new ThreadPoolExecutor(
                corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                timeUnit,
                blockingQueue,
                threadFactory,
                policy
        );

        for (int i = 0; i < 10; i++) {
            executor.submit(new Task(i));
        }
      	
				//需要手动关闭线程池
        executor.shutdown();
      
       //超时机制
        try {
            // 等待一段时间,等待所有任务执行完毕
            if (!executor.awaitTermination(2, TimeUnit.SECONDS)) {
                // 如果超时仍有任务未完成,强制关闭 Executor
                executor.shutdownNow();
            }
        } catch (InterruptedException e) {
            // 异常处理逻辑
            e.printStackTrace();
        }
    }

}

ScheduledPoolExecutor

简介

ScheduledExecutorService 是 Java 提供的一个用于周期性执行任务的线程池。它继承自 ExecutorService 接口,可以在指定的延迟时间后执行任务,或者周期性地执行任务。实现类是 ScheduledThreadPoolExecutor

ScheduledExecutorService 的主要用途是在需要定时执行任务的场景下,可以方便地创建、执行和管理多个定时任务,并可以灵活地控制任务的执行时间和频率

ScheduledExecutorService 的主要方法包括:

  • schedule():在给定的延迟时间之后,执行指定的任务,并返回一个 ScheduledFuture 对象。
  • scheduleAtFixedRate():按照给定的时间间隔,以固定的频率执行任务。
  • scheduleWithFixedDelay():在每次执行任务完成后,等待指定的延迟时间,然后再次执行任务。
实验
实验目的

学习并掌握 ScheduledThreadPoolExecutor 的使用方法,了解如何使用定时线程池来实现定时任务的调度。

实验内容

在某个电商平台中,需要定期统计一天内不同商品的订单数量,并生成相应的报表。为了避免对系统性能的影响,可以在夜间闲时进行统计,然后将结果保存到数据库中。

实验过程
  1. 创建一个商品订单统计任务类,实现 Runnable 接口。在 run() 方法中编写统计逻辑,获取当日订单数量,然后将统计结果保存到数据库中。
  2. 创建一个定时线程池并定时执行任务。可以使用 ScheduledThreadPoolExecutor 来实现定时线程池。
  3. 编译并运行代码,定时线程池会在每天凌晨1点开始执行统计任务,每个商品的订单数量将会被统计并保存到数据库中。
示例代码
public class ScheduledPoolExecutorDemo {
    //任务
    static class Task implements Runnable {
        private final String product; // 商品

        public Task(String product) {
            this.product = product;
        }

        private static final int MAX_NUM_ORDERS = 100; // 一天内订单数量的最大值

        @Override
        public void run() {

            // 获取当日订单数量,此处随机生成
            int numOrders = ThreadLocalRandom.current().nextInt(MAX_NUM_ORDERS);

            // 将统计结果保存到数据库中
            String sql = String.format("INSERT INTO order_statistics(product, date, num_orders) " +
                    "VALUES('%s', '%s', %d)", product, LocalDate.now(), numOrders);

            // 执行 SQL 语句...
        }
    }

    private static long computeInitialDelay() {
        // 获取当前时间
        LocalTime now = LocalTime.now();
        int currentHour = now.getHour();
        int currentMinute = now.getMinute();

        // 如果当前时间在凌晨1点之前,则将任务执行时间设为凌晨1点;否则设为第二天凌晨1点
        int targetHour = currentHour < 1 ? 1 : 25;

        // 计算与目标时间的时间差,单位为分钟
        int minutesUntilTarget = (targetHour - currentHour) * 60 - currentMinute;

        // 将时间差转换为毫秒作为初始延迟时间
        long initialDelay = TimeUnit.MINUTES.toMillis(minutesUntilTarget);
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm");
        System.out.println("当前时间:" + now.format(formatter));
        long hours = initialDelay / 1000 / 60 / 60;
        long minus = (initialDelay - hours * 60 * 60 * 1000) / 1000 / 60;
        System.out.println("距离下一次执行任务还有: "
                + hours + " 小时 "
                + minus + " 分钟"
        );
        return initialDelay;
    }

    public static void main(String[] args) {
        int NUM_PRODUCTS = 10; // 商品数量

        ScheduledExecutorService executorService
                = new ScheduledThreadPoolExecutor(NUM_PRODUCTS);

        // 每天凌晨1点开始执行任务
        long initialDelay = computeInitialDelay();

        // 每个商品都创建一个统计任务并定时执行
        for (int i = 0; i < NUM_PRODUCTS; i++) {
            String product = "Product" + (i + 1);
            executorService.scheduleAtFixedRate(new Task(product), initialDelay, 1, TimeUnit.DAYS);
        }

        try {
            //保持线程池运行
            executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
            executorService.shutdown();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

FutureTask

简介
Future

Future 接口是 Java 标准库中的一部分,它表示一个异步计算的结果。Future 接口可以用于提交可调度的任务,并且可以通过调用其 get 方法来获取任务的结果。

Future 接口具有以下主要特点:

  • get 方法是阻塞的,直到任务完成并返回结果。
  • 通过使用 isDone 方法可以确定任务是否已经完成。
  • 通过使用 cancel 方法可以取消任务的执行。
  • get 方法还可以设置超时时间,如果任务在超时时间内未完成,则抛出 TimeoutException 异常。

Future 接口的目的是使得在执行任务的同时可以进行其他操作,而不必等待任务完成。它提供了一种更灵活的方式来处理任务的结果。

FutureTask

FutureTask是Java中的一个类,位于java.util.concurrent包中。它是RunnableFuture接口的一个实现,同时也是Future接口的一个实现。

FutureTask主要用于表示一个可取消的异步计算任务。在多线程编程中,如果需要执行一个耗时的操作,并且希望在操作完成后获取返回结果,就可以使用FutureTask来处理这个任务。当任务完成时,可以通过get()方法获取任务的返回结果。

引入FutureTask的主要原因是为了解决某些场景下的线程阻塞问题。当一个线程调用FutureTaskget()方法获取结果时,如果任务还没有完成,线程将会被阻塞住,直到任务完成并返回结果。这样可以避免线程空转的浪费。

另外,FutureTask还可以用于任务的取消和周期性任务的执行。通过调用cancel()方法,可以取消一个FutureTask的执行。通过submit()方法和ScheduledExecutorService等配合使用,可以实现周期性任务的执行。

实验
实验目的

掌握 FutureTask 的使用方法,了解它在多线程编程中的应用场景。

实验内容

通过实验,了解使用 FutureTask 来执行可返回结果的任务,并了解如何处理任务的返回结果。

实验过程
  1. 在 Java 中,可以通过 FutureTask 类来代表一个可能还没有完成的计算。比如,我们有一个计算任务,但是暂时无法得到计算结果,可以通过 FutureTask 的对象来代表这个任务并返回给调用者。
  2. 首先,需要创建一个 Callable 对象,它是一个带返回值的任务。可以使用匿名内部类的方式创建一个 Callable 对象,重写其 call() 方法,实现具体的任务逻辑。
  3. 然后,创建 FutureTask 对象,将该 Callable 对象作为参数传入。
  4. 可以使用 FutureTask 的 run() 方法来执行任务,也可以将 FutureTask 对象传递给一个线程来执行。
  5. 调用 FutureTask 的 get() 方法可以获取任务的返回值,如果任务还未完成,则 get() 方法会阻塞直到任务完成并返回结果。
示例代码
public class FutureTaskDemo {

    static class Task implements Callable<Integer> {
        int a;
        int b;

        public Task(int a, int b) {
            this.a = a;
            this.b = b;
        }

        @Override
        public Integer call() throws Exception {
            System.out.println("运算进行中");
            Thread.sleep(2);
            return a + b;
        }
    }

    public static void main(String[] args) {
        Task task = new Task(1, 2);
        FutureTask<Integer> futureTask = new FutureTask<>(task);
        Thread thread = new Thread(futureTask);
        thread.start();

        while (!futureTask.isDone()) {
            try {
                System.out.println("执行其他任务,等待异步计算结果");
                Thread.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        System.out.println("任务已完成");
        
        try {
            System.out.println("获取异步计算结果:" + futureTask.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}