Java线程池分批调用

发布时间 2023-08-30 16:00:36作者: JaxYoun

Java线程池分批调用

原文:https://www.cnblogs.com/hapjin/p/17568676.html

前言

本文记录 Java分批、并发处理数据的写法。虽然分批并发调用的写法很多,但向线程池提交任务执行、某批次执行失败如何处理、某批次的执行结果如何与原task对应等细节问题在实践中仍需考虑。因此,记录下较好的写法:

写法一

public class InvokeAllTest {
    public static void main(String[] args) {
        ExecutorService threadPool = Executors.newFixedThreadPool(4);

        //一组待查询的 poi, 假设1k个
        List<Long> poiNeed2queryList = new ArrayList<>();

        //拆分成 每批100个 查询
        List<List<Long>> allShardPoiList = Lists.partition(poiNeed2queryList, 100);
        List<Callable<List<String>>> tasks = new ArrayList<>();
        for (List<Long> shardPoiList : allShardPoiList) {
            Callable<List<String>> task = () -> queryName(shardPoiList);
            tasks.add(task);
        }

        try {
            //future 的顺序 与 tasks 中任务顺序保持一致
            List<Future<List<String>>> futureList = threadPool.invokeAll(tasks, 100, TimeUnit.MILLISECONDS);
            for (Future<List<String>> future : futureList) {
                try {
                    //获取到这一批次的 查询结果
                    List<String> poiNameList = future.get();
                    //后续 处理逻辑
                } catch (Exception e) {
                    //获取某一批次结果失败,做一些处理
                }
            }
        } catch (Exception e) {
            //所有批次调用失败,做一些处理
        }
    }

    //模拟根据 poiId 查 poi名称
    private static List<String> queryName(List<Long> poiList) {
        return new ArrayList<>();
    }
}
  1. guava的 Lists.partition能够方便地对数据集做切片。

  2. 采用invokeAll将多个批次的查询,提交线程池,并发执行。并可以指定各批次的超时时间(100ms 超时, future.get时会抛出异常),具体地:

1.任务执行过程中出现了异常,future.get 抛出:ExecutionException。
2.任务被取消(比如 调用 future.cancel ),则 future.get 抛出:CancellationException。
3.执行该任务的线程被其它线程请求中断,则抛出:InterruptedException。

  1. 线程池执行结果 futureList 中 future 的顺序和 tasks 列表中的 task 顺序是保持一致的,方便:某一批次的任务与该批次的执行结果对应起来。

  2. 需要考虑某一批次执行失败了,如何处理?for 循环中 try-catch 获取某一批次的执行结果,若获取失败,执行处理逻辑

  3. 需要考虑所有批次执行失败了,如何处理?

写法二

如果某一批次的任务执行完成了,可以立即获取执行结果,并基于“执行”结果进行处理。但是无法指定每一批次的执行超时时间

//无法指定某一批次任务的超时超时
private static void completionServiceTest() {
    ExecutorService threadPool = Executors.newFixedThreadPool(4);
    ExecutorCompletionService<List<String>> completionService = new ExecutorCompletionService<>(threadPool);

    //一组待查询的 poi, 假设1k个
    List<Long> poiNeed2queryList = new ArrayList<>();
    //拆分成 每批100个 查询
    List<List<Long>> allShardPoiList = Lists.partition(poiNeed2queryList, 100);

    for (List<Long> shardPoiList : allShardPoiList) {
        Callable<List<String>> task = () -> queryName(shardPoiList);
        //提交 一个批次 查询任务
        completionService.submit(task);
    }

    try {
        for (int i = 0; i < allShardPoiList.size(); i++) {
            //若某一批次执行完成,能够立即获取结果. 如果所有批次都未执行完成, 则阻塞
            final Future<List<String>> future = completionService.take();
            try {
                //在这里 future.get() 不会出现阻塞 
                final List<String> poiNameList = future.get();
                
                //后续处理逻辑, 处理 poiNameList
            } catch (Exception e) {
                //获取某一批次结果失败,做一些处理
            }
        }
    } catch (InterruptedException e) {
        //take 方法异常处理

    }
}

这里有更多的写法示例:http://www.javabyexamples.com/submit-tasks-in-batch-using-executorservice