13、百万数据分割颗粒度与异步线程实现

发布时间 2023-04-02 19:55:29作者: 爱文(Iven)

声明百万数据List集合:

    /**
     * 声明百万数据
     * */
    private static List<QueryVO> dataList(){
        List<QueryVO> list = new ArrayList<>();
        QueryVO queryVO = null;
        int j = 0;
        for (int i = 0; i < 1000000; i++) {
            queryVO = QueryVO.builder().build();
            queryVO.setTestNum(i);
            //设置今天凌晨时间
            queryVO.setTime(DateUtil.getNowDate(0));
            if(i%100000 == 0){
                j++;
                //设置今天i刻时间
                queryVO.setTime(DateUtil.getNowDate(j));
            }
            list.add(queryVO);
        }
        return list;
    }
    /**
     * 获取今天凌晨后i刻的时间
     * @return Date
     * */
    public static Date getNowDate(int i) {
        DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        Calendar calendar = Calendar.getInstance();
        calendar.set(Calendar.HOUR_OF_DAY, 0);
        calendar.set(Calendar.MINUTE, 0);
        calendar.set(Calendar.SECOND, 0);
        calendar.set(Calendar.MILLISECOND, 0);
        calendar.add(Calendar.MINUTE, i * 15);
        Date date = calendar.getTime();
        return date;
    }

 

一、百万数据分割颗粒度:

1、方式一:自定义分割List集合颗粒度方法:

    /**
     * 将集合按照splitSize分成多个子集合返回
     * @param sourceList 原集合
     * @param splitSize 子集合长度
     */
    private static<T> List<List<T>> splitList(List<T> sourceList, int splitSize) {
        List<List<T>> splitList = new ArrayList<>();
        if(!CollectionUtils.isEmpty(sourceList) && splitSize > 0) {
            int sourceSize = sourceList.size();
            int size = sourceSize / splitSize;
            int left = sourceSize % splitSize;
            for (int i = 0; i <= size; i++) {
                if (i == size) {
                    if(left !=0){
                        splitList.add(sourceList.subList(i * splitSize, sourceSize));
                    }
                } else {
                    splitList.add(sourceList.subList(i * splitSize, (i + 1) * splitSize));
                }
            }
        }
        return splitList;
    }

    /**
     * 自定义List集合分割颗粒度
     * @Method splitList()
     * */
    private static void customDemo(List<QueryVO> list){
        List<List<QueryVO>> dataList = splitList(list, 100000);
        System.out.println("自定义——splitList()实现List集合分割颗粒度:");
        System.out.println("总数据集合量:"+dataList.size());
        System.out.println("分支量:"+dataList.get(2).size());
    }

 

2、方式二:基于google guava工具实现List集合分割颗粒度:

(1)、maven:

<!--google guava-->
<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>21.0</version>
</dependency>

(2)、实现方式:

    /**
     * google guava实现List集合分割颗粒度
     * @Method Lists.partition()
     * */
    private static void guavaListsDemo(List<QueryVO> list){
        List<List<QueryVO>> dataList = Lists.partition(list, 100000);
        System.out.println("google guava——Lists.partition()实现List集合分割颗粒度:");
        System.out.println("总数据集合量:"+dataList.size());
        System.out.println("分支量:"+dataList.get(2).size());
    }

 

3、方式三:基于apache commons collection工具实现List集合分割颗粒度:

(1)、maven:

<!--apache commons collection-->
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-collections4</artifactId>
    <version>4.4</version>
</dependency>

(2)、实现方式:

    /**
     * apache commons collection实现List集合分割颗粒度
     * @Method ListUtils.partition()
     * */
    private static void apacheListUtilsDemo(List<QueryVO> list){
        List<List<QueryVO>> dataList = ListUtils.partition(list, 100000);
        System.out.println("apache commons collection——ListUtils.partition()实现List集合分割颗粒度:");
        System.out.println("总数据集合量:"+dataList.size());
        System.out.println("分支量:"+dataList.get(2).size());
    }

 

4、方式四:基于stream流实现List集合分割颗粒度:

    /**
     * stream流实现List集合分割颗粒度
     * @Method grouping by
     * */
    private static void streamGroupDemo(List<QueryVO> list){
        Map<Date, List<QueryVO>> groups = list.stream().collect(Collectors.groupingBy(QueryVO::getTime));
        List<List<QueryVO>> dataList = new ArrayList<>(groups.values());
        System.out.println("stream流——grouping by()实现List集合分割颗粒度:");
        System.out.println("总数据集合量:"+dataList.size());
        System.out.println("分支量:"+dataList.get(2).size());

        /**
         * 遍历方式一:
         *
         * //Iterator itLists = groups.entrySet().iterator();
         * //while (itLists.hasNext()) {
         * //    Map.Entry<Integer, List<QueryVO>> queryVOList = (Map.Entry) itLists.next();
         * //    //queryVOList.getValue().size();
         * //}
         *
         * */

        /***
         * 遍历方式二:
         *
         * //Map<Date, List<QueryVO>> groups = list.stream().collect(Collectors.groupingBy(QueryVO::getTime));
         * //List<List<QueryVO>> dataList = new ArrayList<>(groups.values());
         * //for (List<QueryVO> queryVOList: dataList) {//业务}
         *
         */
    }

 

二、百万数据的多线程方式实现:

声明线程池:

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

@Configuration
public class TaskPoolConfig {
    /**
     * ThreadPoolTaskExecutor是spring core包中的,而ThreadPoolExecutor是JDK中的JUC。
     * ThreadPoolTaskExecutor是对ThreadPoolExecutor进行了封装处理。
     *
     * 拒绝策略:
     * (1)、CallerRunsPolicy: 当触发拒绝策略,只要线程池没有关闭的话,则使用调用 线程直接运行任务。一般并发比较小,性能要求不高,不允许失败。
     *     但是,由于调用者自己运行任务,如果任务提交速度过快,可能导致程序阻塞,性能效率上必然的损失较大
     * (2)、AbortPolicy: 丢弃任务,并抛出拒绝执行
     * (3)、RejectedExecutionException 异常信息。线程池默认的拒绝策略。必须处理好抛出的异常,否则会打断当前的执行流程,影响后续的任务执行。
     * (4)、DiscardPolicy: 直接丢弃,其他啥都没有
     * (5)、DiscardOldestPolicy: 当触发拒绝策略,只要线程池没有关闭的话,丢弃阻塞队列 workQueue 中最老的一个任务,并将新任务加入
     */
    @Value("${async.thread.concurrency.coreSize:10}")
    private int coreSize;

    @Value("${async.thread.concurrency.maxSize:20}")
    private int maxSize;

    @Value("${async.thread.concurrency.queueCapacity:1000}")
    private int queueCapacity;

    @Value("${async.thread.concurrency.keepAliveSeconds:10000}")
    private int keepAliveSeconds;

    @Bean
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(coreSize); //核心线程数
        executor.setMaxPoolSize(maxSize);   //最大线程数
        executor.setQueueCapacity(queueCapacity);   //最大等待队列数
        executor.setKeepAliveSeconds(keepAliveSeconds); //除核心线程,其他线程的保留时间
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());    //等待队列满后的拒绝策略
        executor.initialize();  //执行初始化
        executor.setThreadNamePrefix("async-executor-");    //线程前缀名称
        return executor;
    }
}

使用:

    @Autowired
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;  //线程池配置声明

 

1、方式一:普通多线程实现:

    /**
     * 普通多线程实现
     * */
    private List<QueryVO> comhandleList(List<QueryVO> list){
        List<QueryVO> dataList = new ArrayList<>();
        //百万List数据分割
        List<List<QueryVO>> list1 = Lists.partition(list, 100000);

        //获取List0数据线程
        Callable<List<QueryVO>> query1List = () -> {
            List<QueryVO> dataQueryList = new ArrayList<>();
            Optional.ofNullable(list1.get(0)).ifPresent(ps -> {
                        ps.stream().forEach(p -> {
                            QueryVO queryVO = QueryVO.builder()
                                    .testNum(p.getTestNum())
                                    .time(p.getTime())
                                    .avgGrade(100.0)
                                    .build();
                            dataQueryList.add(queryVO);
                        });
            });
            return dataQueryList;
        };
        Future<List<QueryVO>> data1Future = threadPoolTaskExecutor.submit(query1List);

        //获取List1数据线程
        Callable<List<QueryVO>> query2List = () -> {
            List<QueryVO> dataQueryList = new ArrayList<>();
            Optional.ofNullable(list1.get(1)).ifPresent(ps -> {
                ps.stream().forEach(p -> {
                    QueryVO queryVO = QueryVO.builder()
                            .testNum(p.getTestNum())
                            .time(p.getTime())
                            .avgGrade(100.0)
                            .build();
                    dataQueryList.add(queryVO);
                });
            });
            return dataQueryList;
        };
        Future<List<QueryVO>> data2Future = threadPoolTaskExecutor.submit(query2List);

        try {
            dataList.addAll(data1Future.get());
            dataList.addAll(data2Future.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        return dataList;
    }

 

2、方式二:CompletableFuture异步多线程实现:

(1)、普通方式:

    /**
     * 异步线程实现
     * */
    private List<QueryVO> handleList(List<QueryVO> list){
        List<QueryVO> dataList = new ArrayList<>();
        //百万List数据分割
        List<List<QueryVO>> list1 = Lists.partition(list, 100000);
        List<CompletableFuture> cfList = new ArrayList<>();
        CompletableFuture<List<QueryVO>> list1Future = CompletableFuture
                .supplyAsync(() ->  {
                    List<QueryVO> dataQueryList = new ArrayList<>();
                    Optional.ofNullable(list1.get(0)).ifPresent(ps -> {
                        ps.stream().forEach(p -> {
                            QueryVO queryVO = QueryVO.builder()
                                    .testNum(p.getTestNum())
                                    .time(p.getTime())
                                    .avgGrade(100.0)
                                    .build();
                            dataQueryList.add(queryVO);
                        });
                    });
                    dataList.addAll(dataQueryList);
                    return dataList;
                }, threadPoolTaskExecutor);

        CompletableFuture<List<QueryVO>> list2Future = CompletableFuture
                .supplyAsync(() ->  {
                    List<QueryVO> dataQueryList = new ArrayList<>();
                    Optional.ofNullable(list1.get(1)).ifPresent(ps -> {
                        ps.stream().forEach(p -> {
                            QueryVO queryVO = QueryVO.builder()
                                    .testNum(p.getTestNum())
                                    .time(p.getTime())
                                    .avgGrade(100.0)
                                    .build();
                            dataQueryList.add(queryVO);
                        });
                    });
                    dataList.addAll(dataQueryList);
                    return dataList;
                }, threadPoolTaskExecutor);
        cfList.add(list1Future);
        cfList.add(list2Future);
        //等待全部完成
        CompletableFuture.allOf(cfList.toArray(new CompletableFuture[]{})).join();
        return dataList;
    }

(2)、全量方式:

    /**
     * 百万数据按数据大小分割颗粒度异步线程实现
     * */
    private List<QueryVO> sizeHandleList(List<QueryVO> list){
        List<QueryVO> dataList = new ArrayList<>();
        //百万List数据分割
        List<List<QueryVO>> list1 = Lists.partition(list, 100000);
        List<CompletableFuture> cfList = new ArrayList<>();
        for (List<QueryVO> queryVOList : list1) {
            cfList.add(CompletableFuture
                    .supplyAsync(() -> {
                        List<QueryVO> dataQueryList = new ArrayList<>();
                        Optional.ofNullable(queryVOList).ifPresent(ps -> {
                            ps.stream().forEach(p -> {
                                QueryVO queryVO = QueryVO.builder()
                                        .testNum(p.getTestNum())
                                        .time(p.getTime())
                                        .avgGrade(100.0)
                                        .build();
                                dataQueryList.add(queryVO);
                            });
                        });
                        return dataList.addAll(dataQueryList);
                    }, threadPoolTaskExecutor));
        }
        //等待全部完成
        CompletableFuture.allOf(cfList.toArray(new CompletableFuture[]{})).join();
        return dataList;
    }

 

三、学习参考:

CompletableFuture使用详解

ThreadPoolTaskExecutor线程并发