BlockingQueue读取文本内容,多线程处理数据

发布时间 2023-04-09 12:19:52作者: 彭于晏码分晏

现在有一个txt文本,每个文本中每行的内容是:id,商品id。

要求:启动一个线程去读取文本的内容,把每行的内容通过使用BlockingQueue发送到队列里面,然后多线程,最好是10个线程,从BlockingQueue队列里面取出来,将地址作为请求参数,请求api接口,把返回的内容解析出来,把原内容id,商品id,结果集写入到新的文本里面。

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.FileWriter;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class ceshi {

    public static void main(String[] args) throws Exception {
        String inputFilePath = "/Users/flyme/data/input-100.txt"; // 输入文件路径
        String outputFilePath = "/Users/flyme/data/output-100.txt"; // 输出文件路径
        int numThreads = 100; // 线程数量

        BlockingQueue<String> queue = new LinkedBlockingQueue<>(); // 创建一个阻塞队列
        Thread readerThread = new Thread(new Reader(inputFilePath, queue)); // 创建读取线程
        readerThread.start(); // 启动读取线程

        Thread[] workerThreads = new Thread[numThreads]; // 创建工作线程
        for (int i = 0; i < numThreads; i++) {
            workerThreads[i] = new Thread(new Worker(queue, outputFilePath)); // 创建工作线程
            workerThreads[i].start(); // 启动工作线程
        }

        readerThread.join(); // 等待读取线程结束
        for (int i = 0; i < numThreads; i++) {
            workerThreads[i].join(); // 等待所有工作线程结束
        }
    }

    /**
     * 读取线程,将文件内容放到阻塞队列中。
     */
    static class Reader implements Runnable {
        private String filePath;
        private BlockingQueue<String> queue;

        public Reader(String filePath, BlockingQueue<String> queue) {
            this.filePath = filePath;
            this.queue = queue;
        }

        public void run() {
            try (BufferedReader reader = new BufferedReader(new FileReader(filePath))) {
                String line;
                while ((line = reader.readLine()) != null) {
                    queue.put(line); // 将每行内容放入队列
                    System.out.println(Thread.currentThread().getName() + ": 发送到队列中:" + line);
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    queue.put("EOF"); // 放置一个结束标记
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /**
     * 工作线程,从阻塞队列中读取内容并处理,将结果写入到文件中。
     */
    static class Worker implements Runnable {
        private BlockingQueue<String> queue;
        private String outputFilePath;

        public Worker(BlockingQueue<String> queue, String outputFilePath) {
            this.queue = queue;
            this.outputFilePath = outputFilePath;
        }

        public void run() {
            try (FileWriter writer = new FileWriter(outputFilePath, true)) { // 文件追加写入
                String line;
                while (true) {
                    line = queue.poll(1, TimeUnit.SECONDS); // 从队列中取出一条记录,等待1秒钟
                    if (line != null) {
                        if (line.equals("EOF")) { // 如果遇到结束标记,则退出循环
                            break;
                        }
                        String[] fields = line.split(",");
                        String id = fields[0];
                        String address = fields[1];
                        String result = callAPI(address); // 调用API接口,获取结果集
                        String[] coords = result.split(",");
                        writer.write(String.format("%s,%s,%s,%s\n", id, address, coords[0], coords[1])); // 将结果写入输出文件
                    } else {
                        if (isQueueReadyToTerminate(queue)) { // 判断队列是否准备结束
                            break;
                        } else {
                            Thread.sleep(100); // 防止CPU占用率过高
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        /**
         * 模拟API接口调用,返回结果数据。
         */
        private String callAPI(String address) {
            return "a1,a2";
        }

        /**
         * 判断队列是否已准备结束。如果队列为空或者只剩下"EOF"标记,则说明队列已准备结束。
         */
        private boolean isQueueReadyToTerminate(BlockingQueue<String> queue) {
            return queue.isEmpty() || (queue.size() == 1 && queue.contains("EOF"));
        }
    }
}
  • 从队列中取出一条记录,为什么要等待1秒钟

等待1秒钟是为了防止工作线程无限制地轮询队列,这样会让CPU占用率很高,影响系统的性能。在poll方法中加入等待时间,可以让线程在没有任何任务时进入休眠状态,降低CPU的占用率,提高系统的响应速度。

在代码中,工作线程使用了queue.poll(1, TimeUnit.SECONDS)方法来从队列中取出数据,这个方法会等待1秒钟,如果在等待期间未能从队列中成功取出数据,则方法会返回null,线程就可以进行下一个判断了。如果队列中有新的数据,则可以直接读取,不需要等待。如果队列为空,线程也不会继续执行,并且在等待1秒钟后,会再次尝试读取队列中的数据,避免过度消耗系统资源。