突击并发编程JUC系列演示代码地址:https://github.com/mtcarpenter/JAVATutorial
作者:故人
出处:https://segmentfault.com/a/1190000037493581
本节让我们一起研究一下该容器是如何在保证线程安全的同时又能保证高效的操作。 ConcurrentHashMap 是线程安全且高效的 HashMap 。
在并发编程中使用 HashMap 可能导致程序死循环。而使用线程安全的 HashTable 效率又非常低下,基于以上两个原因,便有了 ConcurrentHashMap 的登场机会。
在多线程环境下,使用 HashMap 进行put操作会引起死循环,导致CPU利用率接近100%,所以在并发情况下不能使用 HashMap 。例如,执行以下代码会引起死循环。
public class HashMapExample {
// 请求总数
public static int clientTotal = 5000;
// 同时并发执行的线程数
public static int threadTotal = 200;
private static Map<Integer, Integer> map = new HashMap<>(2);
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++) {
final int count = i;
executorService.execute(() -> {
try {
update(count);
} catch (Exception e) {
e.printStackTrace();
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
System.out.println("size:"+ map.size());
}
private static void update(int i) {
map.put(i, i);
}
}
HashMap 在并发执行put操作时会引起死循环,是因为多线程会导致 HashMap 的 Entry 链表形成环形数据结构,一旦形成环形数据结构,Entry 的 next 节点永远不为空,就会产生死循环获取 Entry。
HashTable 容器使用 synchronized 来保证线程安全,但在线程竞争激烈的情况下 HashTable 的效率非常低下。因为当一个线程访问 HashTable 的同步方法,其他线程也访问 HashTable 的同步方法时,会进入阻塞或轮询状态。如线程 1 使用 put 进行元素添加,线程2不但不能使用put方法添加元素,也不能使用 get 方法来获取元素,所以竞争越激烈效率越低。
HashTable 容器在竞争激烈的并发环境下表现出效率低下的原因是所有访问 HashTable 的线程都必须竞争同一把锁,假如容器里有多把锁,每一把锁用于锁容器其中一部分数据,那么当多线程访问容器里不同数据段的数据时,线程间就不会存在锁竞争,从而可以有效提高并发访问效率,这就是 ConcurrentHashMap 所使用的锁分段技术。首先将数据分成一段一段地存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问。
public class ConcurrentHashMapExample {
// 请求总数
public static int clientTotal = 5000;
// 同时并发执行的线程数
public static int threadTotal = 200;
private static Map<Integer, Integer> map = new ConcurrentHashMap<>();
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++) {
final int count = i;
executorService.execute(() -> {
try {
update(count);
} catch (Exception e) {
e.printStackTrace();
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
System.out.println("size:" + map.size());
}
private static void update(int i) {
map.put(i, i);
}
}
JDK 1.7 中的 ConcurrentHashMap 内部进行了 Segment 分段, Segment 继承了 ReentrantLock ,可以理解为一把锁,各个 Segment 之间都是相互独立上锁的,互不影响。
相比于之前的 Hashtable 每次操作都需要把整个对象锁住而言,大大提高了并发效率。因为它的锁与锁之间是独立的,而不是整个对象只有一把锁。
每个 Segment 的底层数据结构与 HashMap 类似,仍然是数组和链表组成的拉链法结构。默认有 0~15 共 16 个 Segment ,所以最多可以同时支持 16 个线程并发操作(操作分别分布在不同的 Segment 上)。16 这个默认值可以在初始化的时候设置为其他值,但是一旦确认初始化以后,是不可以扩容的。
图中的节点有三种类型:
HashMap
ConcurrentHashMap
链表长度大于某一个阈值(默认为 8),满足容量从链表的形式转化为红黑树的形式。
红黑树是每个节点都带有颜色属性的二叉查找树,颜色为红色或黑色,红黑树的本质是对二叉查找树 BST 的一种平衡策略,我们可以理解为是一种平衡二叉查找树,查找效率高,会自动平衡,防止极端不平衡从而影响查找效率的情况发生,红黑树每个节点要么是红色,要么是黑色,但根节点永远是黑色的。
/**
* table 桶数最大值,且前两位用作控制标志
*/
private static final int MAXIMUM_CAPACITY = 1 << 30;
/**
* table 桶数初始化默认值,需为2的幂次方
*/
private static final int DEFAULT_CAPACITY = 16;
/**
* 数组可能最大值,需要与toArray()相关方法关联
*/
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
/**
*并发级别,遗留下来的,为兼容以前的版本
*/
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
/**
* 加载因子,扩容的阀值,可在构造方法定义
*/
private static final float LOAD_FACTOR = 0.75f;
/**
* 链表转红黑树阀值, 超过 8 链表转换为红黑树
*/
static final int TREEIFY_THRESHOLD = 8;
/**
* 树转链表阀值,小于等于6(tranfer时,lc、hc=0两个计数器分别++记录原bin、新binTreeNode数量,<=UNTREEIFY_THRESHOLD 则untreeify(lo))
*/
static final int UNTREEIFY_THRESHOLD = 6;
/**
* 树化阀值2,当数组桶树达到64以上才允许链表树化
*/
static final int MIN_TREEIFY_CAPACITY = 64;
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
// 初始化成功退出循环
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
Thread.yield(); // 有其他线程在初始化,自旋等待
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
// 进入初始化 sizeCtl = -1
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
// sc = n-n/4 ???
sc = n - (n >>> 2);
}
} finally {
// 初始化成功设置 sizeCtl
sizeCtl = sc;
}
break;
}
}
return tab;
}
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
// 检查 table 是否存在,hashCode 所在的索引是有为空
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// 按照树的方式遍历 Node 查找
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
// 按照链表的方式遍历 Node 查找
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
总结一下 get 的过程:
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
// 计算 key 的 hashCode
int hash = spread(key.hashCode());
int binCount = 0;
// 循环直到插入成功,
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
// 初始化 table
tab = initTable();
// tabAt 调用 getObjectVolatile
// 当前位置为空可以直接插入的情况
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 通过 CAS 操作插入,不需要加锁
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break;
}
// 下面是位置已经有值的情况
// MOVED 表示当前 Map 正在进行扩容
else if ((fh = f.hash) == MOVED)
// 帮助进行扩容,然后进入下一次循环尝试插入
tab = helpTransfer(tab, f);
// 未在扩容的情况
else {
V oldVal = null;
// 对f加锁,f 是存储在当前位置的 Node 的头节点
synchronized (f) {
// 双重检查,保证 Node 头节点没有改变
if (tabAt(tab, i) == f) {
if (fh >= 0) {
// 对链表进行操作
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
// 更新值(判断 onlyIfAbsent)或插入链表尾部 ...
break;
}
}
else if (f instanceof TreeBin) {
// 对树进行操作 ...
}
}
}
// 判断是否需要 treeify
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
总结一下 put 的过程:
private final void tryPresize(int size) {
//计算扩容的目标size
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; int n;
//tab没有初始化
if (tab == null || (n = tab.length) == 0) {
n = (sc > c) ? sc : c;
//初始化之前,CAS设置sizeCtl=-1
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if (table == tab) {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
//sc=0.75n,相当于扩容阈值
sc = n - (n >>> 2);
}
} finally {
//此时并没有通过CAS赋值,因为其他想要执行初始化的线程,发现sizeCtl=-1,就直接返回,从而确保任何情况,只会有一个线程执行初始化操作。
sizeCtl = sc;
}
}
}
//目标扩容size小于扩容阈值,或者容量超过最大限制时,不需要扩容
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
//扩容
else if (tab == table) {
int rs = resizeStamp(n);
//sc<0表示,已经有其他线程正在扩容
if (sc < 0) {
Node<K,V>[] nt;
/**
1 (sc >>> RESIZE_STAMP_SHIFT) != rs :扩容线程数 > MAX_RESIZERS-1
2 sc == rs + 1 和 sc == rs + MAX_RESIZERS :表示什么???
3 (nt = nextTable) == null :表示nextTable正在初始化
4 transferIndex <= 0 :表示所有hash桶均分配出去
*/
//如果不需要帮其扩容,直接返回
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
//CAS设置sizeCtl=sizeCtl+1
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
//帮其扩容
transfer(tab, nt);
}
//第一个执行扩容操作的线程,将sizeCtl设置为:(resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2)
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
}
}
}
此段代码参考网址: https://www.jianshu.com/p/487d00afe6ca
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
//计算需要迁移多少个hash桶(MIN_TRANSFER_STRIDE该值作为下限,以避免扩容线程过多)
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
//扩容一倍
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
//1 逆序迁移已经获取到的hash桶集合,如果迁移完毕,则更新transferIndex,获取下一批待迁移的hash桶
//2 如果transferIndex=0,表示所以hash桶均被分配,将i置为-1,准备退出transfer方法
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
//更新待迁移的hash桶索引
while (advance) {
int nextIndex, nextBound;
//更新迁移索引i。
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
//transferIndex<=0表示已经没有需要迁移的hash桶,将i置为-1,线程准备退出
i = -1;
advance = false;
}
//当迁移完bound这个桶后,尝试更新transferIndex,,获取下一批待迁移的hash桶
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
//退出transfer
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
//最后一个迁移的线程,recheck后,做收尾工作,然后退出
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
/**
第一个扩容的线程,执行transfer方法之前,会设置 sizeCtl = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2)
后续帮其扩容的线程,执行transfer方法之前,会设置 sizeCtl = sizeCtl+1
每一个退出transfer的方法的线程,退出之前,会设置 sizeCtl = sizeCtl-1
那么最后一个线程退出时:
必然有sc == (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2),即 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT
*/
//不相等,说明不到最后一个线程,直接退出transfer方法
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
//最后退出的线程要重新check下是否全部迁移完毕
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
//迁移node节点
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
//链表迁移
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
//将node链表,分成2个新的node链表
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
//将新node链表赋给nextTab
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
//红黑树迁移
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
作者:故人
出处:https://segmentfault.com/a/1190000037493581
突击并发编程JUC系列演示代码地址:https://github.com/mtcarpenter/JavaTutorial