引:HashMap是我们平时开发过程中用的比较多的集合,但它是非线程安全的。解决方案有Hashtable和Collections.synchronizedMap(hashMap),不过这两个方案基本上是对读写进行加锁操作,性能可想而知。所以感谢Doug Lea给我们带来了并发安全的ConcurrentHashMap。
详细注释:源码分析地址
概览
ConcurrentHashMap 是一个并发散列映射表的实现,它允许完全并发的读取,并且支持给定数量的并发更新。它采用CAS操作和内置锁synchronized来实现同步
构造
主要就是设置初始容量和负载因子。(很多在HashMap中介绍过的变量和常量,这里就不再次介绍了)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22// 默认为null,初始化发生在第一次插入操作,默认大小为16的数组,用来存储Node节点数据,扩容时大小总是2的幂次方
transient volatile Node<K,V>[] table;
// 默认为null,扩容时新生成的数组,其大小为原数组的两倍
private transient volatile Node<K,V>[] nextTable;
// 默认为0,用来控制table的初始化和扩容操作
// -1:表示table正在初始化
// -N:表示有N-1个线程正在进行扩容操作
// 非负情况
// 如果table未初始化,表示table需要初始化的大小
// 如果table初始化完成,表示table的阈值,默认是table大小的0.75倍
private transient volatile int sizeCtl;
// 根据initialCapacity初始化
// initialCapacity + (initialCapacity >>> 1) + 1)没看懂,知道的请call me(难道和0.75有关系?)
public ConcurrentHashMap(int initialCapacity) {
// ...
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
// 复制给 sizeCtl
this.sizeCtl = cap;
}
增
新增节点,可能会(并发)扩容1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219// 通过Unsafe拿到sizeCtl的地址
private static final long SIZECTL;
// 通过Unsafe拿到baseCount的地址
private static final long BASECOUNT;
// 移动节点的hash值,只在transfer(扩容方法中)新建ForwardingNode设置
static final int MOVED = -1;
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
// 对hashCode进行再散列
// 算法为(h ^ (h >>> 16)) & HASH_BITS(0x7fffffff)
int hash = spread(key.hashCode());
// 准备遍历table
int binCount = 0;
// 这边加了一个循环,就是不断的尝试
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
// 初始化table
tab = initTable();
// 获取对应下标节点,如果是空,直接插入
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// CAS 进行插入
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// 如果 hash 冲突了
// 且 hash 值为 -1(MOVED),说明是 ForwardingNode 对象(这是一个占位符对象,保存了扩容后的容器)
// 表示了正在扩容,需要帮助 Map 进行扩容,以加快速度
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
// 如果 hash 冲突了,且 hash 值不为 -1
else {
V oldVal = null;
// 同步 f 节点,防止增加链表的时候导致链表成环(会出现死循环)
synchronized (f) {
// 如果对应的下标位置 的节点没有改变
if (tabAt(tab, i) == f) {
// 并且 f 节点的hash 值 大于0
if (fh >= 0) {
// 链表长度
binCount = 1;
// 死循环,直到将值添加到链表尾部,并计算链表的长度
for (Node<K,V> e = f;; ++binCount) {
K ek;
// 遇到重复的key,根据标志位覆盖
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
// 尾部插入
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 如果 f 节点的 has 小于0 并且f 是 树类型
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
// 插入树中
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
// 链表长度大于等于8时,将该节点改成红黑树树
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// 判断是否需要扩容CAS
addCount(1L, binCount);
return null;
}
// 初始化table
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
// 如果一个线程发现sizeCtl<0,意味着另外的线程执行CAS操作成功
// 当前线程只需要让出cpu时间片
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
// CAS 修改 sizeCtl 的值为-1
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
// sc 在初始化的时候用户可能会自定义,如果没有自定义,则是默认的16
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
"unchecked") (
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
// 计算后作为扩容的阀值(相当于乘以0.75)
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
/**
* Helps transfer if a resize is in progress.
* 如果正在扩容,则帮助扩容
*/
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
// 如果 table 不是空 且 node 节点是转移节点
// // 且 node 节点的 nextTable(新 table) 不是空
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
// 根据 length 得到一个标识符号
int rs = resizeStamp(tab.length);
// 如果 nextTab 没有被并发修改 且 tab 也没有被并发修改 且 sizeCtl < 0 (说明还在扩容)
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
// 如果 sizeCtl 无符号右移 16 不等于 rs ( sc前 16 位如果不等于标识符,则标识符变化了)
// 或者 sizeCtl == rs + 1 (扩容结束了,不再有线程进行扩容)(默认第一个线程设置 sc ==rs 左移 16 位 + 2,当第一个线程结束扩容了,就会将 sc 减一。这个时候,sc 就等于 rs + 1)
// 或者 sizeCtl == rs + 65535 (如果达到最大帮助线程的数量,即 65535)
// 或者转移下标正在调整 (扩容结束)
// 结束循环,返回 table
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
// 如果以上都不是, 将 sizeCtl + 1, (表示增加了一个线程帮助其扩容)
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
// 扩容
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
// 判断是否需要扩容,需要则进行扩容
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
// 如果计数盒子不是空 或者 修改 baseCount 失败
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
// 如果计数盒子是空(尚未出现并发)或者 随机取余一个数组位置为空
// 或者 修改这个槽位的变量失败(出现并发了)
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
// CAS增加
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
// 统计size
s = sumCount();
}
// 如果需要检查,检查是否需要扩容,在 putVal 方法调用时,默认就是要检查的。
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
// 如果map.size() 大于 sizeCtl(达到扩容阈值需要扩容)
// 且table 不是空;且 table 的长度小于 1 << 30。(可以扩容)
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n
// 如果正在扩容
if (sc < 0) {
// 如果 sc 的低 16 位不等于 标识符(校验异常 sizeCtl 变化了)
// 如果 sc == 标识符 + 1 (扩容结束了,不再有线程进行扩容)(默认第一个线程设置 sc ==rs 左移 16 位 + 2,当第一个线程结束扩容了,就会将 sc 减一。这个时候,sc 就等于 rs + 1)
// 如果 sc == 标识符 + 65535(帮助线程数已经达到最大)
// 如果 nextTable == null(结束扩容了)
// 如果 transferIndex <= 0 (转移状态变化了)
// 结束循环
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// 如果可以帮助扩容,那么将 sc 加 1. 表示多了一个线程在帮助扩容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
// 扩容(有点复杂,设计到并发扩容,可以在后面的参考找到文章,这里就不分析了)
transfer(tab, nt);
}
// 如果不在扩容,将 sc 更新:标识符左移 16 位 然后 + 2. 也就是变成一个负数。高 16 位是标识符,低 16 位初始是 2.
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
// 更新 sizeCtl 为负数后,开始扩容。
transfer(tab, null);
// 统计size
s = sumCount();
}
}
}
删
根据key删除键值对,如果key不存在,什么也不做1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90public V remove(Object key) {
return replaceNode(key, null, null);
}
// 用新cv取代原来的value,如果value是null,就代表删除
final V replaceNode(Object key, V value, Object cv) {
// 计算key的hash值
int hash = spread(key.hashCode());
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// table为空 或 容量为0 或 不存在
if (tab == null || (n = tab.length) == 0 ||
(f = tabAt(tab, i = (n - 1) & hash)) == null)
break;
// 正在扩容
else if ((fh = f.hash) == MOVED)
// 协助扩容
tab = helpTransfer(tab, f);
else {
V oldVal = null;
boolean validated = false;
// 同步删除
synchronized (f) {
// 桶中的第一个结点没有发生变化
if (tabAt(tab, i) == f) {
// 替换链表中的节点
if (fh >= 0) {
validated = true;
for (Node<K,V> e = f, pred = null;;) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
V ev = e.val;
if (cv == null || cv == ev ||
(ev != null && cv.equals(ev))) {
oldVal = ev;
// value不为null,替换
if (value != null)
e.val = value;
// value为null,删除
else if (pred != null)
// 前驱的后继为e的后继,即删除了e结点
pred.next = e.next;
else
setTabAt(tab, i, e.next);
}
break;
}
pred = e;
if ((e = e.next) == null)
break;
}
}
// 替换红黑树中的节点
else if (f instanceof TreeBin) {
validated = true;
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> r, p;
if ((r = t.root) != null &&
(p = r.findTreeNode(hash, key, null)) != null) {
V pv = p.val;
if (cv == null || cv == pv ||
(pv != null && cv.equals(pv))) {
oldVal = pv;
// value不为null,替换
if (value != null)
p.val = value;
// value为null,删除
else if (t.removeTreeNode(p))
// 返回为true,去树化
setTabAt(tab, i, untreeify(t.first));
}
}
}
}
}
if (validated) {
if (oldVal != null) {
if (value == null)
// 总数-1
addCount(-1L, -1);
return oldVal;
}
break;
}
}
}
return null;
}
改
1 | public boolean replace(K key, V oldValue, V newValue) { |
查
根据key获取value1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// 计算key的hash
int h = spread(key.hashCode());
// table不为空
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
// 判断头节点是否符合
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// 在树中查找,小于0的代表了树的根节点,因为树的根节点的哈希值为TREEBIN(-2)
else if (eh < 0)
// 调用红黑树的查找方法
return (p = e.find(h, key)) != null ? p.val : null;
// 在链表中查找
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
总结
ConcurrentHashMap 是 Doug Lea 的神作,可以说是精彩绝伦,总的来说,使用了 CAS(扩容并发) 和 synchronized 来保证了 put 操作并发时的危险(特别是链表,防止了死循环的发生),在读方法完全没有锁,完全并发,相比其他线程安全的Map,它的性能真是杠杠滴。写完才发现自己只是打了注释,还没有概括性的话语,缺少了总结,以后需要注意。除此之外,ConcurrentHashMap肯定不是一遍文章就能说完了,以后还需要基础回顾,继续补充。