如何实现线程安全的LRU缓存

发布时间 2023-05-26 07:50:13作者: 编程爱好者-java

如何实现线程安全的LRU缓存?

不考虑缓存大小

方法1:使用ConcurrentHashMap即可。并发度高。【推荐】

    class LRU<K, V> { // 正确:之所以不直接使用ConcurrentHashMap,是因为要减少暴漏的接口
        private Map<K, V> cache = new ConcurrentHashMap<>();

        public V get(K key) {
            return cache.get(key);
        }

        // return null if key not exists, otherwise return old value
        public V put(K key, V val) {
            return cache.put(key, val);
        }
    }

方法2:使用读写锁+普通的Map,get操作是并发的,put操作是串行化的。并发性能没有方法1好。

    class LRU<K, V> {
        private Map<K, V> cache = new HashMap<>();
        private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
        private Lock readLock = readWriteLock.readLock();
        private Lock writeLock = readWriteLock.writeLock();

        public V get(K key) {
            readLock.lock();
            try {
                return cache.get(key);
            } finally {
                readLock.unlock();
            }
        }

        // return null if key not exists, otherwise return old value
        public V put(K key, V val) {
            writeLock.lock();
            try {
                return cache.put(key, val);
            } finally {
                writeLock.unlock();
            }
        }
    }

方法3:加锁,或使用同步类Hashtable, 或者Collections.synchronizedMap这样性能更差,get和put操作都是串行化了。

// 加锁:
    class LRU<K, V> {
        private Map<K, V> cache = new HashMap<>();

        public synchronized V get(K key) {
            return cache.get(key);
        }

        // return null if key not exists, otherwise return old value
        public synchronized V put(K key, V val) {
            return cache.put(key, val);
        }
    }

// 使用同步包装类:
   class LRU<K, V> {
        private Map<K, V> cache = Collections.synchronizedMap(new HashMap<>());

        public V get(K key) {
            return cache.get(key);
        }

        // return null if key not exists, otherwise return old value
        public V put(K key, V val) {
            return cache.put(key, val);
        }
    }
// 使用同步类 Hashtable
    class LRU<K, V> {
        private Map<K, V> cache = new Hashtable<>();

        public V get(K key) {
            return cache.get(key);
        }

        // return null if key not exists, otherwise return old value
        public V put(K key, V val) {
            return cache.put(key, val);
        }
    }

考虑缓存大小

lab1: 直接使用ConcurrentHashMap,没有加锁正确同步,导致出问题。本质是 put操作里面,size()+remove() 不是原子操作,有可能导致多个线程进入到这个if里面,同时都去remove了,导致size小于capacity - 1。多线程put也会导致出问题,size超过了capacity,这是无法接收的。【错误】

public class Exp3WithConcurrent {

    static class LRU<K, V> {
        private final int capacity;
        private Map<K, V> cache = new ConcurrentHashMap<>();

        public LRU(int capacity) {
            this.capacity = capacity;
        }

        public V get(K key) {
            return cache.get(key);
        }

        // return null if key not exists, otherwise return old value
        public V put(K key, V val) {
            if (cache.containsKey(key)) {
                return cache.put(key, val);
            }
            if (cache.size() >= capacity) {
                // random select one
                ArrayList<K> keys = new ArrayList<>(cache.keySet());
                int i = new Random().nextInt(keys.size());
                cache.remove(keys.get(i));
                if (cache.size() < capacity - 1) {
                    System.out.println("concurrent remove cause issue!!");
                }
            }
            V ans = cache.put(key, val);
            if (cache.size() > capacity) {
                System.out.println("concurrent put cause issue!!");
            }
			return ans;
        }
    }

    public static void main(String[] args) {
        LRU<Integer, String> cache = new LRU<>(3);
        Runnable task = () -> {
            for (int i = 0; i < 100; i++) {
                randomSleep();
                String name = Thread.currentThread().getName();
                int id = new Random().nextInt(10000);
                String ans = cache.put(id, name);
                // System.out.println(Thread.currentThread().getName() + " put: " + ans);
            }
        };

        for (int i = 0; i < 1000; i++) {
            new Thread(task, "T" + i).start();
        }
    }

    static void randomSleep() {
        int time = new Random().nextInt(1000);
        try {
            System.out.println(Thread.currentThread().getName() + " will sleep " + time + " ms");
            TimeUnit.MILLISECONDS.sleep(time);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

运行结果:

线程数量100,然后单个线程put 100次。

T17 will sleep 776 ms
T65 will sleep 737 ms
T77 will sleep 944 ms
concurrent remove cause issue!!
T4 will sleep 942 ms
T56 will sleep 781 ms
T18 will sleep 933 ms
T82 will sleep 17 ms
T30 will sleep 616 ms
T101 will sleep 80 ms
concurrent put cause issue!!
T677 will sleep 605 ms
concurrent put cause issue!!

线程数量1000,然后单个线程put 100次。

T524 will sleep 333 ms
T480 will sleep 294 ms
concurrent remove cause issue!!
T666 will sleep 173 ms
T444 will sleep 963 ms
concurrent put cause issue!!
T889 will sleep 54 ms
concurrent put cause issue!!
T191 will sleep 716 ms
concurrent put cause issue!!
T606 will sleep 134 ms
concurrent put cause issue!!
T417 will sleep 50 ms

lab2: 对put加锁。能保证正确性,但是扩展的性能很差,put操作变成串行化了。【正确】

修改put方法即可:public synchronized V put(K key, V val) {... } 或者使用ReentrantLock。

lab3: 使用读写锁且考虑访问顺序。写操作都是互斥的了,没有问题【正确】

    static class LRU<K, V> {
        private final int capacity;
        private Map<K, V> cache;
        private ReadWriteLock lock;

        public LRU(int capacity) {
            this.capacity = capacity;
            cache = new LinkedHashMap<K, V>(capacity, 0.75f, true) {
                protected boolean removeEldestEntry(Map.Entry<K, V> eldestEntry) {
                    return size() > capacity;
                }
            };
            lock = new ReentrantReadWriteLock();
        }

        public V get(K key) {
            lock.readLock().lock();
            try {
                return cache.get(key);
            } finally {
                lock.readLock().unlock();
            }
        }

        // return null if key not exists, otherwise return old value
        public V put(K key, V val) {
            lock.writeLock().lock();
            try {
                V ans = cache.put(key, val);
                if (cache.size() > capacity) {
                    System.out.println("concurrent put cause issue!!");
                }
                return ans;
            } finally {
                lock.writeLock().unlock();
            }
        }

        public V remove(K key) {
            lock.writeLock().lock();
            try {
                return cache.remove(key);
            } finally {
                lock.writeLock().unlock();
            }
        }
    }

观察到很神奇的现象,在get方法里,如果在加读锁之前,判断size() 是否超过 capacity,那么结果是可能会超过的。

        public V get(K key) {
            if (cache.size() > capacity) {
                System.out.println("concurrent put cause over-size issue!!");
            }

            lock.readLock().lock();
            try {
                return cache.get(key);
            } finally {
                lock.readLock().unlock();
            }
        }

// 下面是添加了并发读的测试代码
    public static void main(String[] args) {
        LRU<Integer, String> cache = new LRU<>(3);
        Runnable task = () -> {
            for (int i = 0; i < 100; i++) {
                randomSleep();
                String name = Thread.currentThread().getName();
                int id = new Random().nextInt(10000);
                String ans = cache.put(id, name);
                // System.out.println(Thread.currentThread().getName() + " put: " + ans);
            }
        };

        for (int i = 0; i < 1000; i++) {
            new Thread(task, "T" + i).start();
        }
        for (int i = 0; i < 10; i++) { // test concurrent read
            new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    randomSleep();
                    cache.get(1);
                }
            }, "R" + i).start();
        }
    }

    static void randomSleep() {
        int time = new Random().nextInt(100);
        try {
            TimeUnit.MILLISECONDS.sleep(time);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

// got some output:
concurrent put cause over-size issue!!
concurrent put cause over-size issue!!
concurrent put cause over-size issue!!

然而如果把检测size()是否超过capacity的代码,放到读锁加锁之后,那么不会出现over-size的问题。这说明什么问题?对于get()操作的线程,只有在获取到read lock之后,意味着这时没有并发的写线程操作map,这样读是正常的。读到的数据就是一致的,size()就不会超过capacity。在get方法执行后,没有获取到lock之前,这时如果去读size()可能会得到一个不一致的结果,因为写线程可能正在做put操作。

        public V get(K key) {
            lock.readLock().lock();
            try {
                if (cache.size() > capacity) {
                    System.out.println("concurrent put cause over-size issue!!");
                }
                return cache.get(key);
            } finally {
                lock.readLock().unlock();
            }
        }

问题:还有更好的方式?或者并发更好的?锁粒度更细。

参考源码:ConcurrentLinkedHashMap (https://github.com/ben-manes/concurrentlinkedhashmap)

这里提到,LinkedHashMap实现的LRU,底层链表和Map,分开考虑,Map需要同步,每次读总能读到最新的结果。access 链表并不会对外暴露,因此可以考虑缓存。直到下次write,或者缓存超过一个阈值。好处是能避免get操作,获取list的写锁,每次都要修改list。

An alternative approach is to realize that the data structure can be split into two parts: a synchronous view and an asynchronous view. The hash-table must be synchronous from the perspective of the caller so that a read after a write returns the expected value. The list, however, does not have any visible external properties.

This observation allows the operations on the list to be applied lazily by buffering them. This allows threads to avoid needing to acquire the lock and the buffer can be drained in a non-blocking fashion. Instead of incurring lock contention, the penalty of draining the buffer is amortized across threads. This drain must be performed when either the buffer exceeds a threshold size or a write is performed.