《Java 并发编程的艺术》实验02-1 JUC 并发容器和框架的使用

发布时间 2023-10-31 16:31:10作者: Ba11ooner

JUC 并发容器和框架的使用

ConcurrentHashmap

简介

线程安全的哈希表,Hashmap 在 多线程环境下的替代

实验

实验目的:了解并发容器 ConcurrentHashmap 的使用方法

实验内容:

  • 基础:直接使用 Thread 实现

    • 多个线程同时往ConcurrentHashMap中添加键值对。

    • 多个线程同时从ConcurrentHashMap中获取指定键的值。

    • 多个线程同时从ConcurrentHashMap中删除指定键值对。

    • 进阶:使用线程池来管理线程,并通过ExecutorService来控制线程的并发数量。

代码
线程实现

像用 HashMap 一样用 ConcurrentHashMap

public class ConcurrentHashMapDemo {
    static void show(ConcurrentHashMap<String, Integer> hashMap) {
        for (Map.Entry<String, Integer> entry : hashMap.entrySet()) {
            System.out.println(entry.getKey() + "  : " + entry.getValue());
        }
    }

    static void sleep(){
        //等待所有元素初始化
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        ConcurrentHashMap<String, Integer> hashMap = new ConcurrentHashMap<>();
        List<Thread> threads = new ArrayList<>();
        //初始化线程,线程任务是往 HashMap 中插入数据
        for (int i = 0; i < 10; i++) {
            final String str = "thread-" + i;
            final Integer integer = i;
//            System.out.println(str + " " + integer);
            threads.add(new Thread(() -> {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                hashMap.put(str, integer);
            }));
        }
        for (Thread t : threads) {
            t.start();
        }

        sleep();
        show(hashMap);

        List<Thread> threadsToRead = new ArrayList<>();
        for (int i = 0; i < 5; i++) {
            threadsToRead.add(new Thread(() -> {
                System.out.println(hashMap.get("thread-2"));
            }));
        }

        for (Thread t : threadsToRead) {
            t.start();
        }

        List<Thread> threadsToDelete = new ArrayList<>();
        for (int i = 0; i < 5; i++) {
            final String str = "thread-" + 1;
            threadsToDelete.add(new Thread(() -> {
                hashMap.remove(str);
            }));
        }

        for (Thread t : threadsToDelete) {
            t.start();
        }

        sleep();
        show(hashMap);
    }

}
线程池
public class ConcurrentHashMapDemo {
    static void show(ConcurrentHashMap<String, Integer> hashMap) {
        for (Map.Entry<String, Integer> entry : hashMap.entrySet()) {
            System.out.println(entry.getKey() + "  : " + entry.getValue());
        }
    }

    static void sleep(){
        //等待所有元素初始化
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        ConcurrentHashMap<String, Integer> hashMap = new ConcurrentHashMap<>();
        List<Thread> threads = new ArrayList<>();
        //初始化线程,线程任务是往 HashMap 中插入数据
        for (int i = 0; i < 10; i++) {
            final String str = "thread-" + i;
            final Integer integer = i;
//            System.out.println(str + " " + integer);
            threads.add(new Thread(() -> {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                hashMap.put(str, integer);
            }));
        }
        for (Thread t : threads) {
            t.start();
        }

        sleep();
        show(hashMap);

        List<Thread> threadsToRead = new ArrayList<>();
        for (int i = 0; i < 5; i++) {
            threadsToRead.add(new Thread(() -> {
                System.out.println(hashMap.get("thread-2"));
            }));
        }

        for (Thread t : threadsToRead) {
            t.start();
        }

        List<Thread> threadsToDelete = new ArrayList<>();
        for (int i = 0; i < 5; i++) {
            final String str = "thread-" + 1;
            threadsToDelete.add(new Thread(() -> {
                hashMap.remove(str);
            }));
        }

        for (Thread t : threadsToDelete) {
            t.start();
        }

        sleep();
        show(hashMap);
    }

}
线程实现 ? 线程池实现
  • 线程实现:操作对象是 ThreadThread 既是工作单元也是执行机制
  • 线程池实现:操作对象是 ExecutorRunnableExecutor 是执行机制,Runnable 是工作单元

BlockingQueue

简介

继承自 Queue,本质上是 Queue Plus,增强部分体现在

  • 提供支持阻塞的插入方法:队列满时,阻塞插入元素的线程直至不满
  • 提供支持阻塞的移除方法:队列空时,阻塞获取元素的线程直至非空

阻塞队列的主要用途是在多线程环境下进行数据交换和协调。它提供了一种线程安全的机制,使得多个生产者线程可以并发地向队列中插入元素,同时多个消费者线程也可以并发地从队列中获取元素,而不需要显式地进行同步操作。通常被用于实现生产者-消费者模型、任务调度和线程池等场景。

实验

实验目的:通过实验学习阻塞队列的基本用法,理解多线程编程中的同步问题和解决方案。

实验内容:

  1. 设计一个生产者线程类 ProducerThread,其中有一个方法 produce() 用于不断产生任务,将任务放入阻塞队列。
  2. 设计一个消费者线程类 ConsumerThread,其中有一个方法 consume() 用于从阻塞队列中取出任务并执行。
  3. 创建一个阻塞队列对象,例如使用 ArrayBlockingQueue,设置队列容量为 10。
  4. 创建多个生产者线程和消费者线程实例,启动它们,并观察线程之间的交互情况。

实验要求:

  1. 在 ProducerThread 类中,使用 put() 方法将任务放入阻塞队列,并输出相关信息。
  2. 在 ConsumerThread 类中,使用 take() 方法从阻塞队列中取出任务,并输出相关信息。
  3. 设置适当的线程休眠时间,模拟生产者线程和消费者线程的不同速度。
  4. 使用 synchronized 关键字来保证线程之间的互斥访问。

实验结果:

运行实验后,可以观察到生产者线程不断将任务放入阻塞队列,当队列满时,生产者线程被阻塞等待。消费者线程从队列中取出任务并执行,当队列为空时,消费者线程也被阻塞等待。通过阻塞队列实现了生产者和消费者之间的线程安全数据交换,避免了同步问题的发生。

注意事项:

  1. 在使用阻塞队列时,需要处理可能的 InterruptedException 异常。
  2. 阻塞队列的选择和初始化需要根据实际应用场景进行合理设置。
  3. 实验中使用的队列容量和线程数目可以根据具体需求进行调整。
代码
public class BlockingQueueDemo {
    static int TIMES = 5;

    //生产者线程
    static class ProducerThread implements Runnable {
        final private BlockingQueue<Integer> queue;

        public ProducerThread(BlockingQueue<Integer> queue) {
            this.queue = queue;
        }

        public synchronized void produce() throws InterruptedException {
            int task = (int) (Math.random() * 100);
            queue.put(task);
            System.out.println("Producer thread put task: " + task);
            //模拟任务生产时间
            Thread.sleep(1000);
        }

        @Override
        public void run() {
            int i = TIMES;
            while (i > 0) {
                try {
                    produce();
                    i--;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    //消费者线程
    static class ConsumerThread implements Runnable {
        final private BlockingQueue<Integer> queue;

        public ConsumerThread(BlockingQueue<Integer> queue) {
            this.queue = queue;
        }

        public void consume() throws InterruptedException {
            int task = queue.take();
            System.out.println("Consumer thread take task: " + task);
            // 模拟任务的消耗时间
            Thread.sleep(2000);
        }

        @Override
        public void run() {
            int i = TIMES;
            while (i > 0) {
                try {
                    consume();
                    i--;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    static void normal() {
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(2);

        Thread producer1 = new Thread(new ProducerThread(queue));
        Thread producer2 = new Thread(new ProducerThread(queue));
        Thread consumer1 = new Thread(new ConsumerThread(queue));
        Thread consumer2 = new Thread(new ConsumerThread(queue));

        producer1.start();
        producer2.start();
        consumer1.start();
        consumer2.start();
    }

    static void pool() {
        ExecutorService executor = Executors.newFixedThreadPool(4);
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(2);

        for (int i = 0; i < 2; i++) {
            executor.execute(new ProducerThread(queue));
        }

        for (int i = 0; i < 2; i++) {
            executor.execute(new ConsumerThread(queue));
        }

        executor.shutdown();
    }


    public static void main(String[] args) {
        normal();
        pool();
    }
}

Fork / Join

简介

Fork / Join 框架是 Java 并发库中的一部分,可以用于实现并行计算任务。它通过递归地将一个大任务分割为多个小任务,并通过多线程并行执行这些小任务。最后将它们的结果合并得到最终结果。

实验

实验目的:练习使用 Fork/Join 框架

实验要求:用 Fork/Join 框架 实现 1到 100 的并行加法

代码
public class ForkJoinDemo {
    static class SumTask extends RecursiveTask<Integer> {
        //粒度阈值:若当前任务粒度大于粒度阈值,则拆分任务,否则直接执行任务
        private static final int THRESHOLD = 10;
        private int start;
        private int end;

        public SumTask(int start, int end) {
            this.start = start;
            this.end = end;
        }

        @Override
        protected Integer compute() {
            if(end - start <= THRESHOLD){
                int sum = 0;
                for(int i = start; i <= end; i++){
                    sum += i;
                }
                return sum;
            }else{
                int mid = start + (end - start) / 2;
                //[ , ]
                SumTask leftTask = new SumTask(start, mid);
                SumTask rightTask = new SumTask(mid + 1, end);
                leftTask.fork();
                rightTask.fork();
                int left = leftTask.join();
                int right = rightTask.join();
                return left + right;
            }
        }

    }

    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        SumTask sumTask = new SumTask(1, 100);
        int res = forkJoinPool.invoke(sumTask);
        System.out.println(res);
    }
}
用法总结
  1. 创建任务(Task):继承RecursiveTask(用于有返回值)或RecursiveAction(用于无返回值)类,实现compute()方法。这是需要并行执行的任务。
  2. 判断切分任务条件:在compute()方法中,如果当前任务需要继续切分成子任务执行,可以创建新的子任务,并将其提交给Fork/Join池。
  3. 子任务的执行:如果任务不再继续切分,即不需要并行执行了,则执行任务的具体逻辑,或者执行其他需要并行处理的操作。
  4. 合并子任务结果:如果任务切分成了若干个子任务执行并返回结果,主任务需要将这些子任务的结果进行合并。
  5. 完成任务:任务执行完毕后,可以返回结果或者做其他收尾工作。