批处理数据加强版,试过是时间最短的 处理100w数据

发布时间 2023-09-14 01:12:45作者: findlisa
public void excuteSingMul() throws InterruptedException {

        long start = System.currentTimeMillis();

        // 定义每批处理的记录数
        int batchSize = 5000;

        // 获取总记录数
        int totalRows = (int) studentsDao.count(null);

        // 计算批数
        int batches = totalRows % batchSize == 0 ? totalRows / batchSize : totalRows / batchSize + 1;
        System.out.println("开始批处理===============》");
        System.out.println("共" + batches + "批");

        AtomicInteger atomicInteger = new AtomicInteger(0);

        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                10, // 核心线程数
                20, // 最大线程数
                60, TimeUnit.SECONDS, // 空闲线程存活时间
                new ArrayBlockingQueue<>(50), // 阻塞队列
                new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略
        );

        CountDownLatch countDownLatch = new CountDownLatch(batches);

        for (int i = 0; i < batches; i++) {

//            System.out.println("第" + (i+1) + "批");

            int finalI = i;
            executor.submit(()->{
                List<Students> students = queryBatch(finalI, batchSize);

                // 处理name
                students.forEach(s -> s.setName(s.getName() + "*#"));

                // 更新这批记录
                studentsDao.insertOrUpdateBatch(students);

                // 记录更新数
                System.out.println("第" + atomicInteger.addAndGet(students.size()) +"条");

                countDownLatch.countDown();
            });
        }

        countDownLatch.await();
        long end = System.currentTimeMillis();
        long timeTaken = end - start;

        // 将毫秒转换为秒
        double timeInSeconds = timeTaken / 1000.0;

        System.out.println("共更新" + atomicInteger + "条");

        System.out.println("Total time taken: " + timeInSeconds/60 + " mins");

奇怪的时候线程数5,10 和 10,20处理效率是一样的,可能已经已经达到资源上限了