【Java 并发】【十】【JUC数据结构】【七】ConcurrentHashMap前置篇HashMap原理

发布时间 2023-04-09 22:36:35作者: 酷酷-

1  前言

前几节我们分析了一些并发安全的数据结构,分别是CopyOnWrite系列的CopyOnWriteArrayList、BlockingQueue阻塞队列系列的LinkedBlockingQueue、ArrayBlockingQueue、DelayQueue。接下来我们要讲解一个很重要的并发安全的数据结构,ConcurrentHashMap。在Java的数据结构里面平时我们最常使用的数据结构就是List、Map、Set这几种数据结构了。List类的并发安全的数据结构之前讲解了CopyOnWriteArrayList,而Set类型的数据结构是基于Map类来进行构建的,所以我们下面的重点是看看Map类型并发安全数据结构。由于ConcurrentHashMap和HashMap这两个数据结构很相似,在用法、特性、底层原理上有很多共同之处,所以我们本章先来详细讲解一下HashMap这个数据结构。然后再来讲解并发安全版本的Map数据结构ConcurrentHashMap,它是怎么做到并发安全的,是怎么基于分段锁的机制来并发过程的锁冲突来提高并发性能的。

2  HashMap介绍

HashMap底层采用数组+链表+红黑树的形式来存储数据,如下图所示:

(1)首先插入数据之前,会对使用一个hash算法计算key的hash值,然后根据此hash值对数组长度进行取模,得出该key存储在数组的第几个元素上。
(2)如上图黄色部分k = 1,v = 2,刚开始数组的此位置未存储元素,则此时不存在hash冲突,直接存储在此数组的位置即可。
(3)加入存在hash冲突,如上红色部分所示,则首先使用链表来解决hash冲突,新存入的元素放在链表末尾
(4)如果hash冲突变多,此时使用链表查询数据时间复杂度是O(n),为了提高查询性能,而将链表进行树化,将转化成一颗红黑树,此时查询性能为O(logN),另外链表转成红黑树的条件是,当链表上的节点数量不少于8个的时候。

3  HashMap内部源码

接下来继续分析一下它内部的源码是怎么实现这个设计的。

3.1  内部属性

// 默认的初始化容量16
static final int DEFAULT_INITIAL_CAPACITY = 1 << 4; // aka 16
// 最大容量2的30次方
static final int MAXIMUM_CAPACITY = 1 << 30;
// 负载因子0.75,比如数组的长度是16,当数组里元素的个数打到了数组大小的这个0.75比率,
// 也就是12个的时候,就会进行数组的扩容
static final float DEFAULT_LOAD_FACTOR = 0.75f;
// 链表红黑树化的条件,长度是8
static final int TREEIFY_THRESHOLD = 8;
// 非树化的添加,长度小于等于6
static final int UNTREEIFY_THRESHOLD = 6;
// 数化之前的最小容量必须打到64,否则优先进行扩容而不是进行树化
static final int MIN_TREEIFY_CAPACITY = 64;
// HashMap内部的基础数组,HashMap是使用数组+聊表+红黑树的方式来存储数据
transient Node<K,V>[] table;

3.2  构造方法

public HashMap() {
    // 默认构造函数,只是初始化了一个负载因子,其它啥都没干
    this.loadFactor = DEFAULT_LOAD_FACTOR;
}
// 设置容量的构造方法
public HashMap(int initialCapacity) {
    this(initialCapacity, DEFAULT_LOAD_FACTOR);
}
// 设置容量和负载因子的构造方法
public HashMap(int initialCapacity, float loadFactor) {
    if (initialCapacity < 0)
        throw new IllegalArgumentException("Illegal initial capacity: " +
                                           initialCapacity);
    // 容量不可超过最大允许值 MAXIMUM_CAPACITY
    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;
    if (loadFactor <= 0 || Float.isNaN(loadFactor))
        throw new IllegalArgumentException("Illegal load factor: " +
                                           loadFactor);
    this.loadFactor = loadFactor;
    // 调用tableSizeFor方法,根据传入的容量再进行一轮计算,算出应该分配的容量多少
    this.threshold = tableSizeFor(initialCapacity);
}

3.2.1  tableSizeFor(int cap)方法

static final int tableSizeFor(int cap) {
    int n = cap - 1;
    n |= n >>> 1;
    n |= n >>> 2;
    n |= n >>> 4;
    n |= n >>> 8;
    n |= n >>> 16;
    return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}

这个方法看起来很多位运算,其实没什么,也就是保证传入一个cap的值,最会返回的值一定是2的n次方,因为HashMap要保证底层的数组长度必须是2的N次方。

比如传入cap为7,则返回的是8;传入13得到的是16等,根据传入的cap值取得一个靠近cap的2的N次方的结果。同时确保得到的结果不能大于限定的MAXIMUM_CAPACITY 最大容量。

3.3  put方法

public V put(K key, V value) {
    // 1. 首先使用hash(key)方法进行hash计算得到一个hash值
    // 2. 然后在调用putVal方法插入元素
    return putVal(hash(key), key, value, false, true);
}

3.3.1  hash方法

static final int hash(Object key) {
    int h;
    // 1. 如果key == null直接返回0,这也就是为啥HashMap能使用null作为key的原因
    // 2. (h = key.hashCode()) 获取key的hashCode保存在h变量中
    // 3. h ^ (h >>> 16) 这个代码的意思是将一个32位的高16位和低16位进行异或运算
    return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}

我们画个图理解一下:

相当于高16位不变,低的16位变成了原先的高16位和低16位的异或结果。
这里可能有个疑问:为什么不直接取key的hashCode,而是要进行这样的运算,这样做的好处是什么?
假如有72、40:

进行了高4位和低4位异或运算之后,76 % 8 = 4, 42 % 8 = 2 ,此时不存在hash冲突了。
假如不进行高4位和低4位的异或运算,假设此时数组长度是8,72 % 8 = 0,40%5 = 0 ,此时就存在hash冲突了。
上面只是列举了4位异或的情况,同样的道理HashMap中的高16位和低16位进行异或运算也是一样的,结果都是让得到的hash值中让高16位和低16位都参与运算,这样得到的结果更加随机一点,能更加有效的减少hash冲突了。

3.3.2  hash方法

接下来继续看下putVal方法的源码:

final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
               boolean evict) {
    Node<K,V>[] tab; Node<K,V> p; int n, i;
    // 加入此时table数组是null,获取数组长度是0,说明还没进行初始化,需要先进行数组的初始化
    if ((tab = table) == null || (n = tab.length) == 0)
        // 进行数组的初始化
        n = (tab = resize()).length;
    // n 为数组的长度,当n为2的x次方时候(n - 1) & hash 的结果与 hash % n 相等
    // 这也就是为什么数组的长度要是2的x次方的原因,使用 & 位运算 代替 % 取模运算提高性能
    if ((p = tab[i = (n - 1) & hash]) == null)
        // 这里根据(n-1) & hash定位到数组的i位元素
        // 如果tab[i] == null,说明之前此下标的数组没存储过元素,直接将k、v存储在此数组位置即可
        tab[i] = newNode(hash, key, value, null);
    else {
        // 当tab[i] != null,走到这里,说明存在hash冲突
        Node<K,V> e; K k;
        // 判断一下此节点的Key和传入的Key是否一样,如果一样,直接替换value的值即可
        if (p.hash == hash &&
            ((k = p.key) == key || (key != null && key.equals(k))))
            e = p;
        // 如果p节点是一个红黑树,按照红黑树的方式插入或者替换一个节点
        else if (p instanceof TreeNode)
            e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
        else {
            // 走到这里说明p是一个链表,使用链表的方式插入或者替换一个节点
             // 这里从链表头结点开始遍历整个链表
            for (int binCount = 0; ; ++binCount) {
                // 这里是走到链表的尾部,未发现该链表的任何节点的Key与传入的key一致
                // 说明需要新插入一个节点
                if ((e = p.next) == null) {
                    // 这里就是新插入一个节点
                    p.next = newNode(hash, key, value, null);
                    // 如果插入一个节点后,该链表长度达到了8,需要将链表转化成红黑树,提升搜索性能
                    if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
                        treeifyBin(tab, hash);
                    break;
                }
                // 这里就是遍历链表过程中发现有节点的Key跟传入的Key一直
                // 此时直接替换该节点的Value值即可
                if (e.hash == hash &&
                    ((k = e.key) == key || (key != null && key.equals(k))))
                    break;
                p = e;
            }
        }
        if (e != null) { // existing mapping for key
            // 获取修改前旧节点的value值,返回旧的值
            V oldValue = e.value;
            if (!onlyIfAbsent || oldValue == null)
                e.value = value;
            afterNodeAccess(e);
            return oldValue;
        }
    }
    // modCount记录HashMap修改的次数,每次put方法之后修改次数自增1
    ++modCount;
    // size表示HashMap中元素个数,threshould表示HashMap扩容的阈值
    // 插入元素之后size > threshould表示达到扩容阈值了,此时HashMap触发扩容操作
    if (++size > threshold)
        // 进行扩容
        resize();
    //
    afterNodeInsertion(evict);
    return null;
}

我们画个图理解一下:

(1)首先查看用来存储数据的基础数组,table是否为空或者长度length是否为0;如果是则先需要对数组进行初始化。
(2)根据寻址算法,(n-1)& hash ,其实也就是 hash % n,定位到数组的某个元素tab[i],如果tab[i]位置不存在元素,可以直接存储在tab[i]位置
(3)如果tab[i] !=null,说明存在hash冲突,此时需要解决hash冲突。判断tab[i]位置节点是链表节点还是红黑树节点。
(4)如果是红黑树节点,则调用putTreeVal对红黑树遍历,查找是否有Key与传入Key一致,有则替换该节点Value值即可,没有则插入新节点
(5)如果是链表节点,则从头开始遍历链表,对比链表的每个节点的Key是否与传入的Key一致,如果一致直接替换该节点的Value节点即可
(6)如果不一致,则在链表尾部插入一个新的节点,Key和Value存储在该节点中。同时判断新插入一个节点之后,该链表的长度是否达到8,如果是则需要将链表转成红黑树,提高查询效率。
(7)插入新的元素之后,HashMap的大小size自增1,同时判断size > threshould是否成立,也就是HashMap是否达到了扩容的阈值,如果是则需要调用resize()方法进行HashMap的扩容

3.3.3  resize方法

我们继续看看HashMap如何初始化以及容量达到扩容阈值的时候,是怎么进行扩容的:

final Node<K,V>[] resize() {
    // 获取内部的基础数组
    Node<K,V>[] oldTab = table;
    // 获取旧数组长度
    int oldCap = (oldTab == null) ? 0 : oldTab.length;
    // 扩容的阈值
    int oldThr = threshold;
    int newCap, newThr = 0;
    if (oldCap > 0) {
        // 如果当前数组长度已经MAXIMUM_CAPACITY,说明数组长度非常大了
        if (oldCap >= MAXIMUM_CAPACITY) {
            // 此时再进行扩容,直接就赋予Integer的最大长度
            threshold = Integer.MAX_VALUE;
            return oldTab;
        }
        // 扩容一倍之后长度 < MAXIMUM_CAPACITY 且大于初始化长度
        // 那么此种情况,基础数组长度扩容一倍
        else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY &&
                 oldCap >= DEFAULT_INITIAL_CAPACITY)
            newThr = oldThr << 1; // double threshold
    }
    // 走到这里说明oldCap == 0,oldThr > 0
    else if (oldThr > 0) // initial capacity was placed in threshold
        newCap = oldThr;
    else {
        // 走到这里说明oldCap == 0 && oldThr == 0,此时就是设置长度为初始化长度
        // 也就是16
        newCap = DEFAULT_INITIAL_CAPACITY;
        // 新的达到扩容的阈值为 长度 * 负载因子 = 长度 * 0.75
        newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY);
    }
    // 如果扩容阈值为0,需要重新计算
    if (newThr == 0) {
        float ft = (float)newCap * loadFactor;
        // 计算扩容阈值,结合容量限制等情况
        newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ?
                  (int)ft : Integer.MAX_VALUE);
    }
    // 赋值扩容阈值
    threshold = newThr;
    // 新创建一个基础数组,长度为上面的新容量长度newCap
    Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap];
    table = newTab;
    // 这里就是数据的移动了,将旧数组移动到扩容后的新数组上去
    if (oldTab != null) {
        // 遍历整个数组的每个位置
        for (int j = 0; j < oldCap; ++j) {
            Node<K,V> e;
            // 获取oldTab[j] 位置元素,赋值给e
            if ((e = oldTab[j]) != null) {
                // 将旧数组此位置引用置位null,方便进行垃圾回收
                oldTab[j] = null;
                // 如果e == null,说明此位置是一个单元素,既不是链表也不是红黑树
                if (e.next == null)
                    // 这种情况就好办,直接使用寻址算法移动到新数组上即可
                    newTab[e.hash & (newCap - 1)] = e;
                // 如果是一个红黑树
                else if (e instanceof TreeNode)
                    // 调用split方法将红黑树拆散移动到新数组上
                    ((TreeNode<K,V>)e).split(this, newTab, j, oldCap);
                // 如果是一个链表,则执行一下方式进行数据移动
                else { // preserve order
                    // 这里是先生成两个链表
                    // 链表lo的头节点是loHead,尾节点是loTail
                    Node<K,V> loHead = null, loTail = null;
                    // 链表hi的头结点是hiHead,尾节点是hiTail
                    Node<K,V> hiHead = null, hiTail = null;
                    Node<K,V> next;
                    do {
                        next = e.next;
                        // 如果旧链表上元素Key.hash & oldCap == 0
                        // 则将此节点放入到链表lo中
                        if ((e.hash & oldCap) == 0) {
                            if (loTail == null)
                                loHead = e;
                            else
                                loTail.next = e;
                            loTail = e;
                        }
                        // 如果旧链表上元素Key.hash & oldCap != 0
                        // 则将次节点放入hi链表中
                        else {
                            if (hiTail == null)
                                hiHead = e;
                            else
                                hiTail.next = e;
                            hiTail = e;
                        }
                    } while ((e = next) != null);
                    // 上面就将旧数组tab[j]位置的链表平均拆成了两个聊表lo和hi
                    
                    // 新数组的newTab[j]位置存放lo链表
                    if (loTail != null) {
                        loTail.next = null;
                        newTab[j] = loHead;
                    }
                    // 新数组的newTab[j+oldCap]位置存放链表hi
                    if (hiTail != null) {
                        hiTail.next = null;
                        newTab[j + oldCap] = hiHead;
                    }
                }
            }
        }
    }
    return newTab;
}

我们画个图来理解一下:

(1)首先就是计算需要扩容的容量是多少,计算得到新的容量为newCap
(2)然后就是创建一个新的数组了,新数组长度为newCap
(3)然后就是遍历旧的数组,将数据移动到新数组长度了
(4)如果旧的数组tab[j]位置只有一个元素,那么直接移动到新数组即可,移动到新数组的位置使用寻址算法(newCap -1) & hash计算得到
(5)如果旧的数组tab[j]位置是一个链表,则需要将这个链表拆成两个,分别为lo和hi链表。旧链表元素的 hash & oldCap == 0 则移动到lo链表,如果hash & oldCap != 0 则移动到hi链表。
(6)lo链表存放入新数组newTable[j]的位置,hi链表存放入newTable[j+oldCap]的位置
(7)如果旧数组的tab[j]位置是一个红黑树,也需要将这颗红黑树拆成两个链表,也是根据hash & oldCap 是否为0的方式,跟链表的迁移方式是一样的,不同的是,迁移完成之后,需要判断一下,如果迁移后链表长度还是大于等于8,需要将链表再次转成红黑树。
(8)至此一个一个按照上述的方式迁移旧数组的每个元素,扩容就完成了。

3.4  remove方法

public V remove(Object key) {
        Node<K,V> e;
        return (e = removeNode(hash(key), key, null, false, true)) == null ?
            null : e.value;
    }

remove跟put方法是一样的,也是通过hash & (n - 1)的寻址方法找到位于数组的第几个元素。然后如果是链表则遍历链表,对比Key是否一样,如果找到则删除这个节点。如果是红黑树也是需要在红黑树中查找,对比Key是一样,如果一样删除这个节点,基本都是一样的,这里就不具体看了哈。

4  小结

好了,本节我们HashMap基本就到这里了,下一节我们就要开始研究ConcurrentHashMap了,有理解不对的地方欢迎指正哈。