ConcurrentHashMap的原理

发布时间 2023-08-03 17:00:39作者: _mcj

1.ConcurrentHashMap的结构

ConcurrentHashMap在jdk1.7版本的结构是通过Segments数组+HashEntry数组+链表构成,其主要是通过分段锁来保证安全性。在修改数据的时候,通过加在Segment上的锁来锁住当前数据所在的Segment来保证其在修改的过程中不会被其他的线程修改。
ConcurrentHashMap在jdk1.8版本的结构则是数组+红黑树+链表构成的,而此时其主要是通过cas,volatile,synchronized加锁来保证其安全性的。volatile能够保证可见性,在使用get方法获取值的时候由于val和next使用了volatile关键字进行修饰,因此其可以不用加锁获取key的实时值。另外这里改为使用CAS+synchronized关键字来进行加锁,是因为synchronized关键字被优化了,具有锁升级能力,同时其只需要对红黑树的根节点进行加锁就行,降低了锁的粒度,增大了并发度,降低了消耗,减少了资源浪费。

2.ConcurrentHashMap与HashMap的区别

基于JDK1.8 是否线程安全 key/value是否允许为null 结构
HashMap 非线程安全的 key和value允许为null 数组+链表+红黑树,红黑树使用的为treeNode
ConcurrentHashMap 线程安全的 key和value不允许为null 数组+链表+红黑树,红黑树使用的是treeBin和treeNode,treeBin记录红黑树的根节点,记录的有锁状态

说明:ConcurrentHashMap中key和value为什么不允许为null:因为当ConcurrentHashMap的value为空的话可能会带来二义性,在通过一个key获取到的值为null时不知道是不存在这个key还是这个key的值就是null,而HashMap允许为null则是因为其能通过contions方法进行判断是否包含这个key。而如果ConcurrentHashMap也用该方法进行判断的话则其就不是线程安全的了,当该线程处于获取该key的值,判断是否有该key之间时有另一个线程插入了该key的值,此时的结果就不正确了。
ConcurrentHashMap中的TreeBin是当链表转化为红黑树后保存红黑树的root及锁状态。其中记录的锁状态主要有三种:
WRITER = 1:写锁,二进制为001
WAITER = 2:等待写锁,二进制为010
READER = 4:读锁,二进制为100

3.ConcurrentHashMap的基本元素

ConcurrentHashMap的一些字段:

// 最大容量
private static final int MAXIMUM_CAPACITY = 1 << 30;
// 默认容量
private static final int DEFAULT_CAPACITY = 16;
// 最大可能的数组大小
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
// 默认的并发级别
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
// 默认加载因子(当容量达到设置容量*加载因子后会进行扩容)
private static final float LOAD_FACTOR = 0.75f;
// 链表转红黑树阈值
static final int TREEIFY_THRESHOLD = 8;
// 红黑树退化为链表的阈值
static final int UNTREEIFY_THRESHOLD = 6;
// 转为红黑树时,table数组的最小值
static final int MIN_TREEIFY_CAPACITY = 64;
// 转移节点的hash值
static final int MOVED     = -1;
// 红黑树根节点的hash值
static final int TREEBIN   = -2;
// 临时保留的hash值
static final int RESERVED  = -3;
// 浦融节点hash的可用位
static final int HASH_BITS = 0x7fffffff;
// 第一次新增元素时初始化,始终是2的幂
transient volatile Node<K,V>[] table;
// 扩容时用
private transient volatile Node<K,V>[] nextTable;
// 当前节点的数量
private transient volatile long baseCount;
// 控制table初始化和扩容的字段
//-1:表示有线程正在创建table
//-N:表示有N-1个线程正在复制table
//在table被初始化之前,表示根据构造函数传入的值计算出的应被初始化的大小
//在table被初始化后,则被设置为table大小的75%,代表table的容量(数组容量)
private transient volatile int sizeCtl;

ConcurrentHashMap的组成元素:
Node:链表中的元素,是链表的一个节点,内部存储了key,value以及next(下一个节点)。
ForwardingNode:当链表进行扩容,链表进行迁移时使用这个,其不保存key和value,只保存了扩容后哈希表(nextTable)的引用。扩容时查找相应的code是取nextTable中查找。
TreeBin:链表转为红黑树后,数组中保存的是TreeBin,其内记录的有锁状态,红黑树的根节点,异界TreeNode的first节点。
TreeNode:红黑树的节点。

4.ConcurrentHashMap的put方法

ConcurrentHashMap是直接调用的putVal方法,这里直接看putVal方法为:

final V putVal(K key, V value, boolean onlyIfAbsent) {
	if (key == null || value == null) throw new NullPointerException();
	// 计算要插入数据的key的hash值
	int hash = spread(key.hashCode());
	int binCount = 0;
	for (Node<K,V>[] tab = table;;) {
		Node<K,V> f; int n, i, fh;
		// 如果当前concurrentHashMap为null或者长度为0,则进行初始化
		if (tab == null || (n = tab.length) == 0)
			tab = initTable();
		// 获取当前key的hash值所在节点数组中的位置,如果为null则使用cas创建新的节点,此时没有加锁
		else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
			if (casTabAt(tab, i, null,
						 new Node<K,V>(hash, key, value, null)))
				break;                   // no lock when adding to empty bin
		}
		// 当前需要扩容则帮助其进行扩容
		else if ((fh = f.hash) == MOVED)
			tab = helpTransfer(tab, f);
		else {
			V oldVal = null;
			// 向该节点后添加数据,对该节点加锁
			synchronized (f) {
				// 判断当前节点是否还是这个值
				if (tabAt(tab, i) == f) {
					if (fh >= 0) {
						binCount = 1;
						for (Node<K,V> e = f;; ++binCount) {
							K ek;
							// 当该key已经存在,则对该key的值进行覆盖修改
							if (e.hash == hash &&
								((ek = e.key) == key ||
								 (ek != null && key.equals(ek)))) {
								oldVal = e.val;
								if (!onlyIfAbsent)
									e.val = value;
								break;
							}
							// 当该key不存在时,则在其后面新增一个该key的值
							Node<K,V> pred = e;
							if ((e = e.next) == null) {
								pred.next = new Node<K,V>(hash, key,
														  value, null);
								break;
							}
						}
					}
					else if (f instanceof TreeBin) {
						// 当首节点是红黑树的根节点时,则使用添加红黑树的方法进行添加
						Node<K,V> p;
						binCount = 2;
						if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
													   value)) != null) {
							oldVal = p.val;
							if (!onlyIfAbsent)
								p.val = value;
						}
					}
				}
			}
			if (binCount != 0) {
				// 当该节点的数量大于转为红黑树要求的数量时(值默认为8),则将该节点列表转为红黑树
				if (binCount >= TREEIFY_THRESHOLD)
					// 判断是进行扩容还是转化为红黑树
					treeifyBin(tab, i);
				if (oldVal != null)
					return oldVal;
				break;
			}
		}
	}
	// 将个数加1,并判断是否需要扩容,需要的话则进行扩容
	addCount(1L, binCount);
	return null;
}

从上面的代码可以看出其主要步骤为:

  • 1:计算key的hash值
  • 2:进行自旋
    • 2.1:判断当前的ConcurrentHashMap是否没有初始化,没有初始化的话则进行初始化
    • 2.2:判断该hash值所在的节点是否还没有数据,如果没有则说明该数据是第一个,则使用CAS进行添加数据
    • 2.3:判断当前是否在扩容,如果在扩容则帮其进行扩容
    • 2.4:不满足上面的条件时,则添加数据,会判断是使用树的形式还是链表的形式进行添加或者覆盖,当数据添加之后还需要判断是否要将链表转化为红黑树,如果需要转化的话还需要将其转化为红黑树
  • 3:将个数加1,并判断是否需要扩容,需要扩容的话进行扩容

ConcurrentHashMap的初始化方法为:

// 初始化方法
private final Node<K,V>[] initTable() {
	Node<K,V>[] tab; int sc;
	// 自旋
	while ((tab = table) == null || tab.length == 0) {
		// 进行检查是否已有线程正在初始化
		if ((sc = sizeCtl) < 0)
			// 当已有线程正在初始化时,将当前线程从可执行状态改为就绪状态
			Thread.yield(); // lost initialization race; just spin
		else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
			// 使用CAS方式将sizeCtl修改为-1,保证当前只有一个线程在初始化
			try {
				// 再次检查是否还未初始化,双重检测(double-check)
				if ((tab = table) == null || tab.length == 0) {
					// 如果sc大于0,则使用sc的值,否则使用默认值16
					int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
					// 初始化
					@SuppressWarnings("unchecked")
					Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
					table = tab = nt;
					// sc设置为容量大小的3/4
					sc = n - (n >>> 2);
				}
			} finally {
				//sizeCtl设置为table大小的3/4
				sizeCtl = sc;
			}
			break;
		}
	}
	return tab;
}

从上面的代码中可以看到其使用了自旋+CAS+双重检测机制来进行保证线程安全。其具体的步骤为:

  • 1:自旋判断是否已经初始化
  • 2:判断是否已经有线程正在初始化,如果已经有线程正在初始化则将当前线程的状态从可执行状态改为就绪状态
  • 3:如果没有线程在初始化则使用CAS将sizeCtl修改为-1,表示此线程在初始化,确保只能有一个线程在初始化
  • 4:再次检测是否还没有初始化,双重检测,检测通过的话则进行初始化
  • 5:设置sizeCtl为table大小的3/4
    sizeCtl的值的意义可以看上面关于sizeCtl字段的描述。

ConcurrentHashMap的判断是否扩容还是转为红黑树的方法treeifyBin()方法如下:

private final void treeifyBin(Node<K,V>[] tab, int index) {
	Node<K,V> b; int n, sc;
	if (tab != null) {
		// 判断哈希表长度是否小于64,小于64则进行扩容为当前的2倍
		if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
			tryPresize(n << 1);
		else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
			// 加锁
			synchronized (b) {
				// 将node节点转化为treeNode节点
				if (tabAt(tab, index) == b) {
					TreeNode<K,V> hd = null, tl = null;
					for (Node<K,V> e = b; e != null; e = e.next) {
						TreeNode<K,V> p =
							new TreeNode<K,V>(e.hash, e.key, e.val,
											  null, null);
						if ((p.prev = tl) == null)
							hd = p;
						else
							tl.next = p;
						tl = p;
					}
					// 将红黑树的根节点放在treeBin中,并保存到hash表的index位置
					setTabAt(tab, index, new TreeBin<K,V>(hd));
				}
			}
		}
	}
}

从上面的源码可以看出其思想为首先判断哈希表的长度是否小于64,如果小于64的话则对哈希表进行2倍的扩容,否则将其进行转化为红黑树。其中扩容的方法则是tryPresize()方法,每次扩容会增大2倍。
ConcurrentHashMap的扩容方法tryPresize()的源码如下:

private final void tryPresize(int size) {
	// 计算扩容后的数组容量
	int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
		tableSizeFor(size + (size >>> 1) + 1);
	int sc;
	// sc=sizeCtl为容量的3/4
	while ((sc = sizeCtl) >= 0) {
		Node<K,V>[] tab = table; int n;
		// 这里判断要处理的table是否还未初始化,如果还未初始化则对其进行初始化。
		// 主要是针对putAll方法,这是因为其不调用initTable()直接调用tryPresize方法
		if (tab == null || (n = tab.length) == 0) {
			n = (sc > c) ? sc : c;
			if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
				try {
					if (table == tab) {
						@SuppressWarnings("unchecked")
						Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
						table = nt;
						sc = n - (n >>> 2);
					}
				} finally {
					sizeCtl = sc;
				}
			}
		}
		// 扩容已经达到要扩的容量,则结束扩容
		else if (c <= sc || n >= MAXIMUM_CAPACITY)
			break;
		// 要处理的table已存在
		else if (tab == table) {
			int rs = resizeStamp(n);
			// sc小于0,表示已有线程在扩容
			if (sc < 0) {
				Node<K,V>[] nt;
				// 判断当前扩容的线程是否已达上限,如果达到上限则退出
				if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
					sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
					transferIndex <= 0)
					break;
				// 如果未达上限,则帮助其扩容
				if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
					transfer(tab, nt);
			}
			// 如果当前没有线程在扩容,则进行扩容,transfer方法第二个参数传null表示需要新建扩容后的哈希表
			else if (U.compareAndSwapInt(this, SIZECTL, sc,
										 (rs << RESIZE_STAMP_SHIFT) + 2))
				transfer(tab, null);
		}
	}
}

从上面的代码可以看出步骤大致为:

  • 1:首先计算扩容后的容量
  • 2:判断table是否未初始化,如果未初始化则进行初始化,该步骤主要是针对putAll方法直接调用tryPresize方法未对table进行初始化
  • 3:判断容量是否已达到要扩容的容量,则结束扩容
  • 4:如果table已经初始化并且未达到扩容的容量
    • 4.1:如果当前已有线程在扩容,则判断当前扩容的线索是否已达上限,未达到是则帮助扩容(传参为扩容后的哈希表),达到时则推出
    • 4.2:如果当前没有线程在扩容,则进行扩容(传参为null,需要进行新建哈希表)

从上面的步骤可以看出其主要的逻辑是transfer()扩容的方法,其在ConcurrentHashMap中的源码为:

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
	// 旧的数组长度
	int n = tab.length, stride;
	// 如果当前cpu的核数不大于1,则不进行并发扩容
	// 设置每个线程迁移数据的步长,每个线程负责的最小步长为16,也就是负责迁移桶的个数最少为16个
	if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
		stride = MIN_TRANSFER_STRIDE; // subdivide range
	// 如果新数组为null,则进程初始化新数组长度为老数组的2倍
	if (nextTab == null) {            // initiating
		try {
			@SuppressWarnings("unchecked")
			Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
			nextTab = nt;
		} catch (Throwable ex) {      // try to cope with OOME
			sizeCtl = Integer.MAX_VALUE;
			return;
		}
		nextTable = nextTab;
		// 设置开始数据迁移的桶的下标,因为n直接取得数组的长度,所以具体的下标要减1
		transferIndex = n;
	}
	// 新数组的长度
	int nextn = nextTab.length;
	// 声明转移节点,表示该节点正在扩容
	ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
	// 是否还有桶的迁移没有完成
	boolean advance = true;
	// 扩容是否已经完成
	boolean finishing = false; // to ensure sweep before committing nextTab
	// i当前线程需要转移的桶的下表,bound表示当前线程负责的桶的最小下标
	for (int i = 0, bound = 0;;) {
		Node<K,V> f; int fh;
		while (advance) {
			int nextIndex, nextBound;
			// 如果一个桶的下标还在其负责的桶的最小下表之后或者扩容已经完成则不需要重新截取新的段
			if (--i >= bound || finishing)
				advance = false;
			// transferIndex用来表示这个下标及其后面的桶都已经被其他线程处理了,新的线程需要从transferIndex往前截取自己需要负责的桶,如果transferIndex小于等于0说明桶都已经转移完毕,不需要再处理了 
			else if ((nextIndex = transferIndex) <= 0) {
				i = -1;
				advance = false;
			}
			// 以nextIndex(在上面已经赋值为transferIndex)为起始位置,往数组头部方向截取相应步长的段来转移数据,通过cas将transferIndex设置到新的下标
			else if (U.compareAndSwapInt
					 (this, TRANSFERINDEX, nextIndex,
					  nextBound = (nextIndex > stride ?
								   nextIndex - stride : 0))) {
				// cas成功后,设置当前线程负责的下标边界(比如负责下标32到48的桶,那么这个bound就是32)
				bound = nextBound;
				// cas成功后,设置当前线程开始处理的桶的下标(比如负责下标32到48的桶,那么这个i就是48)
                // transferIndex默认是从tab.length开始取值,所以要减1来表示正确的下标
				i = nextIndex - 1;
				// cas成功则表示当前线程已经成功截取了自己需要负责的一段数据了,不需要再往前截取了
				advance = false;
			}
		}
		// i是要转移的桶的下表,i<0表示桶已经拷贝完成
		if (i < 0 || i >= n || i + n >= nextn) {
			int sc;
			// 节点拷贝完成
			if (finishing) {
				nextTable = null;
				// 直接将新的table赋值给table,因为已经拷贝的节点会标记转移节点,不会再进行修改
				table = nextTab;
				// 将其设为容量的3/4
				sizeCtl = (n << 1) - (n >>> 1);
				return;
			}
			// 并发扩容时,每当有一个线程扩容完成,则将sc-1
			if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
				// 判断是否是最后一个线程扩容完成,至于为什么减2则是因为上面增加一个线程扩容是加2
				if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
					return;
				// 当前线程的扩容已完成
				finishing = advance = true;
				i = n; // recheck before commit
			}
		}
		// 当前桶为空桶,则将该桶标记为fwd,当有别的线程操作该桶时进行帮助扩容
		else if ((f = tabAt(tab, i)) == null)
			advance = casTabAt(tab, i, null, fwd);
		// 该桶已转移
		else if ((fh = f.hash) == MOVED)
			advance = true; // already processed
		else {
			// 加锁
			synchronized (f) {
				// 再次检测该桶的数据是否相同
				if (tabAt(tab, i) == f) {
					Node<K,V> ln, hn;
					// 进行节点拷贝,如果节点不是红黑树节点
					if (fh >= 0) {
						// 由于n为2的幂次方,所以此时fh & n的值要么为0,要么为n,
						// 因为下标节点是通过(n - 1) & hash计算得到的,所以不同的hash值会计算出同一个下标值的 比如hash值为15和31时
						// 比如n为16时,01111(15) & 11111(31) 和 01111(15) & 01111(15) 都为01111
						// n扩大一倍为31时,011111(31) & 011111(31) 和 011111(31) & 0011111(15) 则为31和15,之间的差值则证号为扩容的容量
						// 而31和15的hash & n 的值则分别为n和0,所以旧的下表为i的桶中的数据迁移到新的桶会在下表为i或者i+n的桶中
						int runBit = fh & n;
						Node<K,V> lastRun = f;
						// 遍历整个链表获取到从链表尾计算最长的不需要改变指针的链表,既然不需要改变指针则可以直接将其整体的迁移到新的桶中
						for (Node<K,V> p = f.next; p != null; p = p.next) {
							int b = p.hash & n;
							if (b != runBit) {
								runBit = b;
								lastRun = p;
							}
						}
						// hash & n 的值为0时则表示迁移到新的桶时还是在下标为i的
						if (runBit == 0) {
							ln = lastRun;
							hn = null;
						}
						// hash & n的值为n时表示迁移到新的桶后是在下标为i+n的
						else {
							hn = lastRun;
							ln = null;
						}
						// 遍历链表,将移到i桶和i+n桶中的链表从旧的桶中获取链接好
						for (Node<K,V> p = f; p != lastRun; p = p.next) {
							int ph = p.hash; K pk = p.key; V pv = p.val;
							// 拼接需要拷贝到i桶中的链表
							if ((ph & n) == 0)
								ln = new Node<K,V>(ph, pk, pv, ln);
							// 拼接需要拷贝到i+n桶中的链表
							else
								hn = new Node<K,V>(ph, pk, pv, hn);
						}
						// 设置i桶的值
						setTabAt(nextTab, i, ln);
						// 设置i+n桶的值
						setTabAt(nextTab, i + n, hn);
						// 将i桶标记fwd,用来标识该下标的桶已经拷贝
						setTabAt(tab, i, fwd);
						// 该桶处理完成,将字段设置为true
						advance = true;
					}
					// 如果节点是红黑树节点,则按照红黑树节点进行拷贝,和链表的处理逻辑差不多,节点向i桶和i+n桶中拷贝
					else if (f instanceof TreeBin) {
						TreeBin<K,V> t = (TreeBin<K,V>)f;
						TreeNode<K,V> lo = null, loTail = null;
						TreeNode<K,V> hi = null, hiTail = null;
						int lc = 0, hc = 0;
						// 遍历
						for (Node<K,V> e = t.first; e != null; e = e.next) {
							int h = e.hash;
							TreeNode<K,V> p = new TreeNode<K,V>
								(h, e.key, e.val, null, null);
							if ((h & n) == 0) {
								if ((p.prev = loTail) == null)
									lo = p;
								else
									loTail.next = p;
								loTail = p;
								++lc;
							}
							else {
								if ((p.prev = hiTail) == null)
									hi = p;
								else
									hiTail.next = p;
								hiTail = p;
								++hc;
							}
						}
						// 如果处理后的数的节点小于转化为链表的限制,则转化为链表
						ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
							(hc != 0) ? new TreeBin<K,V>(lo) : t;
						hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
							(lc != 0) ? new TreeBin<K,V>(hi) : t;
						// 将数据拷贝到新桶的i桶中
						setTabAt(nextTab, i, ln);
						// 将数据拷贝到新桶的i+n桶中
						setTabAt(nextTab, i + n, hn);
						// 将老桶的i桶标记为fwd
						setTabAt(tab, i, fwd);
						advance = true;
					}
				}
			}
		}
	}
}

从上面的源码可以看出其步骤大概为:

  • 1:首先判断当前cpu的核数是否大于1,如果不大于则不进行并发扩容,同时设置每个线程迁移数据的步长,每个线程负责的步长最小为16
  • 2:判断新数组是否为null,如果为null则进行初始化新数组,长度为老数组的2倍,同时设置开始数据迁移的桶的下表,由于是直接取得数组的长度,所以具体的下表要减1
  • 3:声明转移节点,桶的迁移是否完成,扩容是否完成变量
  • 4:使用循环进行数据迁移
    • 4.1:如果还有桶没有迁移完成,进行截取新的迁移的段
      • 4.1.1:如果扩容已经完成则不需要截取新的段直接返回
      • 4.1.2:如果所有的桶数据已经有其它线程进行迁移,则不截取新的段
      • 4.1.3:如果不满足上面的两种情况,则截取新的负责的段
    • 4.2:判断当前桶是否已经全部迁移
      • 4.2.1:节点拷贝完成,则直接将新的桶赋值给桶,将sizeCtl设置为容量的3/4
      • 4.2.2:如果没有拷贝完成,则是有一个并发线程扩容完成将sc-1,修改完成扩容的值,同时判断是否是最后一个线程扩容完成是的话则直接结束
    • 4.3:如果当前桶为空桶,则将该桶标记为fwd
    • 4.4:如果该桶已经转移,则将advance标记为true
    • 4.5:不满组上面的情况,则进行扩容
      • 4.5.1:加锁
      • 4.5.2:二次检测
      • 4.5.3:进行节点拷贝,分为红黑树节点和非红黑树节点,两者相似

5.ConcurrentHashMap的get方法

ConcurrentHashMap的get方法的源码为:

public V get(Object key) {
	Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
	// 计算该key的hash值
	int h = spread(key.hashCode());
	// 当该concurrentHashMap不为空,且有值,且该hash值所在的列表node不为空
	if ((tab = table) != null && (n = tab.length) > 0 &&
		(e = tabAt(tab, (n - 1) & h)) != null) {
		if ((eh = e.hash) == h) {
			// 当该列表节点就是该key值直接返回
			if ((ek = e.key) == key || (ek != null && key.equals(ek)))
				return e.val;
		}
		//hash < 0 有两种情况:
        //(1) 该节点是TreeBin节点(红黑树代理节点,其hash值固定为:-2);
        //(2) ConcurrentHashMap正在扩容当中,并且该hash桶已迁移完毕,该位置被放置了FWD节点(ForwardingNode:扩容节点,只是在扩容阶段使用的节点,主要作为一个标记。);
        //1、如果是TreeBin节点,调用TreeBin类的find方法,具体是以链表方式遍历还是红黑树方式遍历视情况而定
        //2、如果正在扩容中,则跳转到扩容后的新数组上去查找,TreeBin和Node节点都有对应的find方法,具体什么节点类型则调用对应节点类型的find方法
		else if (eh < 0)
			return (p = e.find(h, key)) != null ? p.val : null;
		// 如果不是上面两种情况,则表示是链表的形式,直接便利链表
		while ((e = e.next) != null) {
			if (e.hash == h &&
				((ek = e.key) == key || (ek != null && key.equals(ek))))
				return e.val;
		}
	}
	return null;
}

从上面的源码可以看出get方法的步骤为:

  • 1:计算key的hash值
  • 2:判断concurrentHashMap是否不为空,并且根据key计算的hash值所在的桶不为null
    • 2.1:如果上面的条件不满足则直接返回null
    • 2.2:如果满足上面的条件,则判断开始节点是否就是key,是的话,则直接返回,不是的话则分别按照链表或者红黑树的方式查找该key的值

从上面的源码可以看出其中最重要的是find方法,而find方法的实现方法又有多种实现方式,这里主要看的是普通的node节点和TreeBin节点的实现方式:
普通的node节点实现方式:

Node<K,V> find(int h, Object k) {
	Node<K,V> e = this;
	if (k != null) {
		do {
			K ek;
			if (e.hash == h &&
				((ek = e.key) == k || (ek != null && k.equals(ek))))
				return e;
		} while ((e = e.next) != null);
	}
	return null;
}

从上面的代码可以看出普通node节点的实现九四遍历链表的方式进行实现的。
TreeBin节点实现方式:

final Node<K,V> find(int h, Object k) {
	if (k != null) {
		// 使用for循环遍历红黑树
		for (Node<K,V> e = first; e != null; ) {
			int s; K ek;
			// volatile int lockState;
			// static final int WRITER = 1; 写(001)
			// static final int WAITER = 2; 等待写(010)
			// static final int READER = 4; 读(100)
			// 当lockState不为0或者4时,则使用便利的方式遍历红黑树
			if (((s = lockState) & (WAITER|WRITER)) != 0) {
				if (e.hash == h &&
					((ek = e.key) == k || (ek != null && k.equals(ek))))
					return e;
				e = e.next;
			}
			// 如果当前没有加锁,则先将lockState+4,在按照红黑树的查找方式查询节点
			else if (U.compareAndSwapInt(this, LOCKSTATE, s,
										 s + READER)) {
				TreeNode<K,V> r, p;
				try {
					// 使用红黑树的方式查找当前节点的值
					p = ((r = root) == null ? null :
						 r.findTreeNode(h, k, null));
				} finally {
					// 查找操作执行完之后,将lockState-4,并将之前的值返回,判断其原先的值是否为(READER|WAITER)也就是当前有线程持有等待写锁
					Thread w;
					if (U.getAndAddInt(this, LOCKSTATE, -READER) ==
						(READER|WAITER) && (w = waiter) != null)
						// 唤醒持有等待写锁的线程
						LockSupport.unpark(w);
				}
				return p;
			}
		}
	}
	return null;
}

从上面的代码中可以看出TreeBin的find实现方法步骤为:

  • 1:循环遍历红黑树
  • 2:判断当前锁状态,如果加了写或等待写锁
    • 2.1:如果加锁则使用链表的方式获取值
    • 2.2:如果没有加锁则按照红黑树查找的方式查询节点,结束后如果有等待线程则会进行唤醒

6.总结

  • ConcurrentHashMap的结构是数组+链表+红黑树。
  • ConcurrentHashMap的key和value值不能为null,key不能为null则是源码不允许为null,value不能为null,则是为了防止二义性
  • ConcurrentHashMap的put方法的安全是通过synchronized关键字+CAS方法来保证的,而get方法则是不需要加锁的,通过volatile关键字来保证可见性。

参考博客:https://blog.csdn.net/caoyuanyenang/article/details/115499984