理解ConcurrentHashMap的多线程执行

发布时间 2023-05-25 23:10:40作者: 编程爱好者-java

理解ConcurrentHashMap的多线程执行

多线程下ConcurrentMap单个操作的顺序性/原子性

结论:ConcurrentHashMap单个操作,例如 get/put/remove都有原子性,即使操作同一个key,在底层会通过synchronized锁去排队执行。所以多线程下,任意的执行结果都是合理的。

lab1:三个线程,操作同一个ConcurrentHashMap,且get/put/remove同一个key。

public class ExpWithConcurrent {
    public static void main(String[] args) {
        // 3 threads, operate with ConcurrentHashMap
        ConcurrentHashMap<Integer, String> map = new ConcurrentHashMap<>();

        Thread t1 = new Thread(() -> {
            // put
            randomSleep();
            String ans = map.put(1, "one");
            System.out.println("T1 put: " + ans);
        }, "T1");
        Thread t2 = new Thread(() -> {
            // remove
            randomSleep();
            String ans = map.remove(1);
            System.out.println("T2 remove: " + ans);
        }, "T2");
        Thread t3 = new Thread(() -> {
            // get
            randomSleep();
            System.out.println("T3 get: " + map.get(1));
        }, "T3");

        t1.start();
        t2.start();
        t3.start();
    }

    static void randomSleep() {
        int time = new Random().nextInt(1000);
        try {
            System.out.println(Thread.currentThread().getName() + " will sleep " + time + " ms");
            TimeUnit.MILLISECONDS.sleep(time);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

输出结果如下:

T1 will sleep 956 ms
T2 will sleep 367 ms
T3 will sleep 222 ms
T3 get: null
T2 remove: null
T1 put: null

T2 will sleep 286 ms
T1 will sleep 415 ms
T3 will sleep 795 ms
T2 remove: null
T1 put: null
T3 get: one

T1 will sleep 534 ms
T2 will sleep 131 ms
T3 will sleep 217 ms
T2 remove: null
T3 get: null
T1 put: null

T3 will sleep 43 ms
T2 will sleep 719 ms
T1 will sleep 136 ms
T3 get: null
T1 put: null
T2 remove: one

….

通过分析可以知道,三个线程执行的顺序,总共有6种。在并发的情况下都有可能发生。网上有人建议测试并发程序,可以使用随机sleep,让线程乱序,观察执行的结果。对于这个lab来说,怎么操作都是正确的,没有一个严格的限制。

应用程序要维护一致性

lab2:如果要维护一个大小始终是3个元素的map(例如LRU cache),然后,多个线程执行put,如果当前等于3,就随机删除一个,然后再put。这个检查大小size、remove、put不是一个原子操作,应该会有问题(map的size可能超过3),代码如下:

public class Exp2WithConcurrent {
    public static void main(String[] args) {
        // 3 threads, operate with ConcurrentHashMap
        // initially we have 3 items in map.
        ConcurrentHashMap<Integer, String> map = new ConcurrentHashMap<>();
        map.put(1, "one");
        map.put(2, "two");
        map.put(3, "three");

        // we must maintain the size, but also put new item concurrently
        Runnable task = () -> {
            randomSleep();

            String name = Thread.currentThread().getName();
            System.out.println(name + "---size:" + map.size());

            if (map.size() > 3) {
                System.out.println("yeah! inconsistent found!!!!");
            }

            if (map.size() >= 3) {
                map.remove(map.keys().nextElement());
            }
            int id = new Random().nextInt(10000);
            String ans = map.put(id, Thread.currentThread().getName());
            System.out.println(Thread.currentThread().getName() + " put: " + ans);
            System.out.println(name + "---size(after):" + map.size());
            if (map.size() > 3) {
                System.out.println("yeah! inconsistent found!!!!");
            }
        };

        for (int i = 0; i < 200; i++) {
            new Thread(task, "T" + i).start();
        }
    }

    static void randomSleep() {
        int time = new Random().nextInt(1000);
        try {
            System.out.println(Thread.currentThread().getName() + " will sleep " + time + " ms");
            TimeUnit.MILLISECONDS.sleep(time);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

结果:当线程数比较多时,map的size,会比较大。

T44 will sleep 956 ms
T45 will sleep 893 ms
T1---size:3
T48 will sleep 122 ms
T31 will sleep 130 ms
T1 put: null
T1---size(after):3

…….

T7---size(after):4
yeah! inconsistent found!!!!
T165---size:4
yeah! inconsistent found!!!!
T165 put: null
T165---size(after):4
yeah! inconsistent found!!!!
T179---size:4
yeah! inconsistent found!!!!
T179 put: null
T179---size(after):4
yeah! inconsistent found!!!!
T110---size:4
yeah! inconsistent found!!!!
T110 put: null
T110---size(after):4
yeah! inconsistent found!!!!

那么针对lab2,如何保证应用程序的一致性呢?让size始终是3呢? 这个需要保证原子性了,通过加锁。

核心代码:
     Runnable task = () -> {
            randomSleep();

            synchronized (map) { // 加锁,让多线程去排队执行,保证 size(), remove(), put()的原子性
                String name = Thread.currentThread().getName();
                System.out.println(name + "---size:" + map.size());

                if (map.size() > 3) {
                    System.out.println("yeah! inconsistent found!!!!");
                }

                if (map.size() >= 3) {
                    map.remove(map.keys().nextElement());
                }
                int id = new Random().nextInt(10000);
                String ans = map.put(id, Thread.currentThread().getName());
                System.out.println(Thread.currentThread().getName() + " put: " + ans);
                System.out.println(name + "---size(after):" + map.size());
                if (map.size() > 3) {
                    System.out.println("yeah! inconsistent found!!!!");
                }
            }
        };

思考:

  • 既然都加锁了,还有必要用ConcurrentHashMap吗?直接使用普通的Map?
    答案:可以的!因为sync块里面,始终只有一个线程执行。

  • 或者不加锁,使用同步类的Hashtable,或者是Collections.synchronizedMap(map)也可以?
    答案:不可以,这就跟使用ConcurrentHashMap不加锁,是一样的,只能保证Map的单个操作的原子性,不能保证多个组合操作的原子性。

public class Exp2WithConcurrent { // 使用普通Map + synchronized 可以。正确!
    public static void main(String[] args) {
        // 3 threads, operate with ConcurrentHashMap
        // initially we have 3 items in map.
        Map<Integer, String> map = new HashMap<>();
        map.put(1, "one");
        map.put(2, "two");
        map.put(3, "three");

        // we must maintain the size, but also put new item concurrently
        Runnable task = () -> {
            randomSleep();

            synchronized (map) {
                String name = Thread.currentThread().getName();
                System.out.println(name + "---size:" + map.size());

                if (map.size() > 3) {
                    System.out.println("yeah! inconsistent found!!!!");
                }

                if (map.size() >= 3) {
                    map.remove(map.keySet().iterator().next());
                }
                int id = new Random().nextInt(10000);
                String ans = map.put(id, Thread.currentThread().getName());
                System.out.println(Thread.currentThread().getName() + " put: " + ans);
                System.out.println(name + "---size(after):" + map.size());
                if (map.size() > 3) {
                    System.out.println("yeah! inconsistent found!!!!");
                }
            }
        };

        for (int i = 0; i < 200; i++) {
            new Thread(task, "T" + i).start();
        }
    }

    static void randomSleep() { ... }
}