《Java并发编程的艺术》之ConcurrentLinkedQueue

队列这个数据结构已经很熟悉了,就不多介绍,主要还是根据代码理解Doug Lea大师的一些其他技巧。

入队

offer状态图
如图所示,很多人可能会很疑惑,为什么第一次入队后,TAIL没有指向Node2?答案是为了效率!Σ(っ °Д °;)っ 那这还能叫队列吗?当然,它依然符合先进先出(FIFO)的规则。只是TAIL变量不一定指向尾结点,那么来看看大师是怎么做的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
// p是tail结点
if (p.casNext(null, newNode)) {
// 判断p结点是否是尾结点
// 这一步不执行的话,不会对整体流程造成影响,至多是多一次循
// 环,相比CAS操作,更愿意多一次循环
if (p != t)
// 交换Tail结点,如果CAS更新失败表示已经有其他线程对其进行更新
casTail(t, newNode);
return true;
}
// CAS竞争失败,重新循环,竞争失败后q一般不会为null,除非又发生了出队
}
// HEAD和TAIL都指向同一个结点
// 一个线程执行入队,一个线程执行出队,假设前面都没有更新tail和head
// 执行出队的线程更新HEAD并设置其为自引用
// 那么就会发生这个条件想要的现象
else if (p == q)
// 如果tail发生了改变,那么就为p设置t,并重新寻找
// 如果tail未发生改变,head发生了改变,保
// 险方法就是重新从新head开始遍历
// 注意: -----只要在读取前完成tail发生更新就行了-----
p = (t != (t = tail)) ? t : head;
else
// p != t 表示p不是尾结点,发生的原因是 入队时没有更新尾结点
// t != (t = tail) 更新tail,如果tail被其他线程修改,则返回true
// 如果为true,重新将p设置为尾结点(此时尾结点已经更新了)
// 如果为false,p = q,继续循环下去
p = (p != t && t != (t = tail)) ? t : q;
}
}

第一次入队:

  1. 初始状态下head和tail都指向同一个item为null的结点
  2. tail没有后继结点
  3. 尝试CAS为tail设置next结点
  4. p == t,没有更新tail变量,直接返回true
    注意offer永远都返回true

第二次入队:

  1. 初始状态下head和tail都指向同一个item为null的结点,但是next指向Node2
  2. tail有后继结点
  3. p != q,进入下个if语句
  4. p == t 返回false,整个三目运算符返回false,p = t
  5. 此时p没有后继结点
  6. 尝试CAS为p设置next
  7. p != t,更新tail结点,直接返回true

第二次入队多线程版
线程A执行入队,线程B执行出队

初始状态图如下所示:

操作步骤如下表所示:

次序 线程A 线程B
1
2 node1.item == null,执行下个if语句
3 (q = node1.next) != null,执行下个if语句
4 p != q,执行后面的else语句
5 p = q
6 node2.item != null
7 p != h (p是node2)
8 (q = p.next) == null,故将p设置为头结点,即将node2作为头结点
9 将node1设置为自引用
10 q = p.next,即node1.next,因为自引用q == p 返回item
11 q != null
12 p == q
13 t != (t = tail),即此时tail是否发生改变:true -> p =tail;false -> p = head

在步骤13,如果有个线程C已经执行了入队且tail发生改变,那么p就直接紧跟着更新后的tail就行了;如果tail没更新,就要设置p = head,然后重新循环遍历。

出队

从图中可以看出每次出队会有两种可能:将首结点的item置空,不移除;或是将移除首结点。总结一下,就是每次出队更新HEAD结点时,当HEAD结点里有元素时,直接弹出HEAD结点内的元素,而不会直接更新HEAD结点。只有当HEAD结点里没有元素时,出队操作才会更新HEAD结点。这种做法是为了减少CAS更新HEAD结点的消耗,从而提高出队效率。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public E poll() {
restartFromHead:
for (;;) {
// p变量可以理解为游标、当前处理的结点,用于遍历的
// q变量可以理解为p变量的next
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
// ① 先判断当前结点的值不为null 且 CAS更新p变量的item值
if (item != null && p.casItem(item, null)) {
// ②更新成功后,判断当前结点是否是头结点
// 这一步主要是为了节省CAS操作,因为少更新一次HEAD结点没什么影响
if (p != h)
// ③更新头结点
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
// ④ 获取当前结点的下一个结点,判断是否为null
// 一般发生于只有一个结点的情况
else if ((q = p.next) == null) {
// ⑤ 将当前结点设置为自引用
updateHead(h, p);
return null;
}
// ⑥ 如果当前结点出现自引用
// 一般发生在一个线程更新了头结点,让p结点自引用时,p才会等于q
else if (p == q)
// 重新获取一个头结点
continue restartFromHead;
else
// p = p.next
p = q;
}
}
}

poll的流程图

第一次出队:

  1. 初始状态所有item不为null,尝试更新结点的item(一般情况下都是成功的),更新成功后结点item为null
  2. 判断 p == h,不满足条件,直接返回item值

第二次出队:

  1. Node1.item为null,进入下个条件
  2. (q = p.next) != null,进入下个条件
  3. p != q,进入下个条件
  4. p = q,重新循环
  5. Node2.item不为null,尝试更新结点的item,更新成功后结点item为null
  6. 判断 p != h,满足条件
  7. 判断p结点(Node2)是否有后继结点,如果有就将后继结点作为HEAD,否则将P作为HEAD
  8. 返回item值

第二次出队多线程版1
线程A和线程B同时执行poll()方法,假设线程A稍快

线程A 线程B
A1. Node1.item为null B1. Node1.item为null
A2. (q = p.next) != null B2. (q = p.next) != null
A3. p != q B3. p != q
A4. p = q,循环 B4. p = q,循环
A5. Node2.item不为null,尝试更新结点item,更新成功,item值为null B5.
A6. 满足条件 p != h B6. Node2.item为null
A7. 判断p结点(Node2)是否有后继结点 B7. (q = p.next) != null
A8. B8. p != q
A9. B9. p = q 循环
A10. B10. Node3.item不为null,尝试更新结点item,更新成功,item值为null
A11. 将后继结点即Node3设置为HEAD B11.
A12. 返回item B12. 满足条件 p != h
A13. B13. 判断p结点(Node2)是否有后继结点
A14. B14. 由于HEAD已经被修改,所以CAS更新失败
A15. B15. 返回item

这里主要是想讲即便HEAD更新发生冲突,有一次没有更新,也不会影响整体的流程,大不了下次出队的时候多出队一个。

第二次出队多线程版2
线程A和线程B同时执行poll()方法,假设线程A稍快

线程A 线程B
A1. Node1.item为null B1. Node1.item为null
A2. (q = p.next) != null B2. (q = p.next) != null
A3. p != q B3. p != q
A4. p = q,循环 B4. p = q,循环
A5. Node2.item不为null,尝试更新结点item,更新成功,item值为null B5.
A6. 满足条件 p != h B6. Node2.item为null
A7. 判断p结点(Node2)是否有后继结点 B7.
A8. 将后继结点即Node3设置为HEAD B8.
A9. 返回item B9. (q = p.next) != null
A10. B10. p == q
A11. B11. 重新获取HEAD并执行遍历

这个例子主要表达了当A线程先修改了首结点,并将原来的首结点设置为自引用时,B线程在循环过程中会执行到一条语句

1
2
3
4
5
6
7
8
9
10
11
12
13
14


## 总结
ConcurrentLinkedQueue主要内容都已经学习过了,其中分析的过程花费了一个早上,吃完饭回来坐下才有了一些思路。学习的难点主要还是在**它不同于普通的队列**,它的tail和head变量不会时刻指向头结点和尾结点,这也造就了代码的复杂性,否则如下所示即可:
```java
public boolean offer(E item){
checkNotNull(item);
for(;;){
Node<T> node = new Node(item);
if(tail.casNext(null, node) && casTail(tail, node)){
return true;
}
}
}

但是这样和上面的例子比起来,就有性能的差距,差距主要体现在CAS写竞争方面:

最悲观的角度,ConcurrentLinkedQueue的offer方法需要执行两次 CAS (假设不发生竞争,其实我觉得不会有CAS竞争发生),上面的通用代码方法也需要执行两次,这里持平。
最乐观的角度,ConcurrentLinkedQueue只需要执行一次CAS,上面的通用方法仍需要两次。
原本是参考《Java并发编程的艺术》,但是里面的实现和现在不同了,所以根据现在的实际情况写了一份。当然,里面的主线思路仍然没有发生改变——尽量减少CAS操作。书上的代码是通过hops变量来控制多久需要更新一次值,大致思路如下所示:

1
2
3
4
5
6
遍历前,hops = 0
HEAD---
|
Node1 -> Node2 -> Node3 -> Node4
|
TAIL---

假设现在要插入Node5,就要从TAIL变量位置(Node1位置)开始往后遍历,总共要循环三次才能找到最后一个尾结点,此时计数器hops就等于3,当Node5插入成功后,判断hops的值是否达到阈值,如果达到了,就更新tail变量;反之则不更新,直接返回。

1
2
3
4
5
6
遍历完后,hops = 3,达到阈值(假设达到了),将tail变量更新给Node5
HEAD---
|
Node1 -> Node2 -> Node3 -> Node4 -> Node5
|
TAIL---

ConcurrentLinkedQueue初看以为很简单,其实逻辑还是挺复杂的,拓展了对队列的看法。今天在写这篇博客时,感觉一头雾水,因为CAS操作不像锁那样简单,代码块锁住就能放心执行,CAS只对单个操作保证可见性和原子性,很担心后面的线程会对其进行什么修改,今天过后总结了一下写并发容器的思路:

  1. 在了解某个方法的实现时,需要分清局部变量和共享变量,在理清了局部变量的含义后,将重点放在共享变量上
  2. 如果方法里某个语句没看懂(一头雾水型,突然就来了这么一句),请往多线程方向思考。
  3. 多对方法进行多线程分析(有助于理清思路),用md表格的例子写出来,很清晰,如果感觉md表格难以阅读,可以看这个网站 MD表格生成器

《Java并发编程的艺术》之final

HashMap只是相对线程安全,如果出现数据竞争就抛出fail-fast;HashTable则将每个操作都上锁,如果有耗时的操作,那么后续的操作均会被阻塞,大大降低程序的吞吐率。而ConcurrentHashMap正是为了解决这样一个问题而出现的。
ConcurrentHashMap和HashMap的数据结构如下所示:


HashMap会持有一个Entry数组,每个Entry都是链表的结点,每次进行修改时,先查找key对应的hash值,找到BucktIndex,在遍历链表查看key是否相等。

ConcurrentHashMap会持有一个Segment数组,每个segment会持有一个HashEntry的数组,HashEntry又可以串成链表。每次进行写操作时,只需要加锁一个Segment即可,这就是分段加锁技术。相比HashMap,ConcurrentHashMap的修改就会比较费劲:首先要通过哈希key定位SegmentIndex,再对hash值进行再哈希获取BucketIndex,找到对应的Bucket,就可以进行和HashMap一样的操作,即遍历链表。

ConcurrentHashMap需要了解一些初始参数:

  • segmentShift: segment的偏移量,有效的hash部分
  • segmentSize: segment数组的长度
  • concurrencyLevel: 并发等级,默认16,不会直接使用,而是通过这个值获取segment数组的长度和偏移量
  • initCapacity: 初始化时容器的总大小,不会直接使用这个值,而是将其均分到不同的segment中
  • loadFactor: 负载因子
  • threshold: 阈值,Segment判断HashEntry数组(table)是否需要扩容的标准

这里摘抄了一段初始化的代码,省略了一些不必要的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
// 偏移量
int sshift = 0;
// segment数组的长度
int ssize = 1;
// 找到一个最适合的 二的N次方 的长度
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
// segment偏移量,后面定位的时候需要用到
this.segmentShift = 32 - sshift;
// segment数组的长度
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
// 获取capacity的倍数
int c = initialCapacity / ssize;
// 保证持有的HashEntry大于等于initialCapacity
if (c * ssize < initialCapacity)
++c;
// MIN_SEGMENT_TABLE_CAPACITY 为 2
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
// 创建segment数组和第一个segment
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
}

先通过移位运算符,找到最小的大于concurrencyLevel的次数,对ssize进行移位、sshift进行递增。segmentShift为定位segment的便宜量,segmentMask为有效位数。ssize是segment数组的长度,通过

/ ssize```均分可持有的HashEntry数组。最后创建segment,segment需要loadFactor,cap * loadFactor (threshold,判断HashEntry数组是否需要扩容的标准),HashEntry数组。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

通过这一系列初始化,多多少少能感觉到它和HashMap的相似之处。

## put操作
```java
// ConcurrentHashMap.put()
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
// 对key进行hash
int hash = hash(key);
// 定位Segment的位置,原理和取余有些类似
int j = (hash >>> segmentShift) & segmentMask;
// 获取对应的Segment
if ((s = (Segment<K,V>)UNSAFE.getObject(segments, (j << SSHIFT) + SBASE)) == null)
// 创建新的segment
s = ensureSegment(j);
// 将键值对放入对应的segment里面
return s.put(key, hash, value, false);
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// Segment.put()
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 先尝试加锁,加锁失败后进入scanAndLockForPut
// scanAndLockForPut可以理解为是查找对应hash值的Entry
// 如果没有就及时创建
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
// 根据hash值定位BucketIndex
int index = (tab.length - 1) & hash;
// 获取对应位置的Bucket
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
// 该条件指 遍历链表
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
// 该条件指 对应位置的Bucket为空,需要创建这个位置的Bucket
if (node != null)
// 不存在该hash值的entry,这里将其新创建的entry设置为first
node.setNext(first);
else
// 不存在该hash值的entry,也没有新创建的entry,这里执行创建
node = new HashEntry<K,V>(hash, key, value, first);
// Bucket数组的长度增加
int c = count + 1;
// 判断是否达到阈值
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
// 再hash,相当于扩容
rehash(node);
else
// 将node设置到对应的bucketIndex上
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}

put操作虽然会对共享变量进行写入操作,但是执行前会获取到排斥锁,保证同一时刻只有一个线程能修改共享变量。

  • ConcurrentHashMap在是否需要扩容方面 更优于HashMap,具体表现在,ConcurrentHashMap先检查此次插入是否会超出阈值,如果会就先执行扩容再进行插入;HashMap则是插入元素后判断是否已经达到容量,如果达到就进行扩容,但是这样很可能发生扩容后没有新元素的插入,这样就进行了一次无效的扩容。

  • ConcurrentHashMap扩容不会对整个容器进行扩容,而是针对某个segment进行扩容

get操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public V get(Object key) {
Segment<K,V> s;
HashEntry<K,V>[] tab;
// 获取该key的hash值
int h = hash(key);
// 对应的segment的偏移量
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
// 如果查找到对应的segment
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&(tab = s.table) != null) {
// 找到对应的Buckt进行遍历
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}

这里的操作不需要同步,因为Segment和HashEntry的值都是volatile类型,共享变量会对所有线程立刻可见。同一时间,volatile写 happens-before volatile读。

size操作

虽然前面讲到Segment的count变量是volatile,但是并不意味着将所有segment的volatile变量加在一起就是size了,因为volatile只保证单个操作是原子性的,所以无法保证多个volatile变量相加后,其中一个volatile不会发生改变。但是如果在统计size时,将所有segment的写操作(put、clean等)锁住,又显得低效。

实际上,Segment的前两次统计都先不加锁执行,如果统计前的modCount和统计后的modCount不一致,那么就判断统计size失败。失败次数超过两次后,size方法就会将所有的segment加锁。

了解AQS之SharedLock

前面已经学习过AQS的exclusive和condition两个模式,现在要把最后的shared模式看完。此次从shared的实现类——ReentrantReadWriteLock 开始分析。

共享的特性:


如果先获取到线程锁,那么独占锁就要等待;如果先获取独占锁,那么共享锁就要进入等待。对于共享锁来说,顾名思义大家可以一起访问这个资源(读);对于独占锁来说,要想访问只能一一排队,一个个来修改,保证每次修改后的可见性。

前面提到AQS是通过同步队列来管理同步状态的,而状态state只有一个变量,要想记录读/写两个状态,就只能将state拆分成2个16位,如下图所示

高16位表示读状态,低16位表示写状态。比如获取读状态:

>>> 16```,获取写状态:```state & 0X0000FFFF```,根据状态划分可以得出以下结论:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
当state不为0时而state低16位为0,则表示读锁已经被获取;当state不为0而state高16为0时,则表示写锁已经被获取。

## 写锁的获取与释放
写锁是一个可重入的排他锁。如果当前线程已经获取了写锁,则增加写状态。如果当前线程在获取写锁时,读锁已经被获取了或者该线程不是持有写锁的线程,则当前线程进入等待状态。
```java
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
if (w == 0 || current != getExclusiveOwnerThread())
// 读锁已经被获取
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
// 重入次数超过最大值
throw new Error("Maximum lock count exceeded");
// 重入
setState(c + acquires);
return true;
}
// writerShouldBlock是由子类实现的策略
// 如果writer是非公平锁,那么就一直返回false
// 如果writer是公平锁,那么就通过hasQueuedPredecessors()判断
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
// 设置唯一
setExclusiveOwnerThread(current);
return true;
}

该方法除了重入溢出的判断外,还增加了是否已经存在读锁的条件判断。如果存在读锁,则写锁不能被获取,原因在于:读写锁要确保写锁的操作对读锁可见,如果允许读锁在已被获取的情况下对写锁的获取,那么正在运行的其他读线程无法得知当前写线程的操作。写锁的缩放和ReentrantLock的释放过程基本类似,每次释放减少写状态,当写状态为0时表示写锁已经被释放。

读锁的获取与释放

读锁时一个支持重入的锁,它能够被多个线程获取,在没有其他写线程的访问时,读线程总会被成功获取,而所做的也只是增加读状态。如果当前线程已经获取了读锁,则增加读状态。如果当前线程在获取读锁时,写锁被其他线程获取,则进入等待状态。读状态保存的是总的读次数,每个线程的读取状态由ThreadLocal进行保存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
// 写锁已经被获取,直接返回
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
// 根据队列策略,参考写锁;
// 读锁个数不超过最大值
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
// 读锁个数等于0
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
// 如果前面都不满足条件,进入该方法,自选到成功获取读锁
return fullTryAcquireShared(current);
}

如果这一步获取失败,那么就会转入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
```java
private void setHeadAndPropagate(Node node, int propagate) {
// 保存头结点
Node h = head;
// 设置头结点
setHead(node);
// propagate大于0 OR 头结点为空 OR 头结点的状态 < 0 OR 重新获取头结点时的状态
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
// 获取当前结点的下个结点并唤醒它(因为独占锁,后面的共享锁阻塞;
// 当独占锁被释放后,共享锁的结点应该苏醒)
Node s = node.next;
// 该操作会唤醒不必要的结点,但是为了避免多线程发生竞争,所以需要尽可能快的通知其他线程
if (s == null || s.isShared())
doReleaseShared();
}
}

读锁的每次释放会将读状态减去值1 << 16

锁降级

锁降级不是获取写锁,释放,再获取读锁(不是分段的);而是先获取写锁,再获取读锁,释放写锁(是交叉的)。

总结

ReentrantReadWriteLock是独占锁和共享锁的合体,在write方面是独占的,read方面是共享的,因为有两个锁,为了管理两个状态,提出了划分高16位作为读状态,低16位作为写状态

《Java并发编程的艺术》之延迟初始化

在Java里,有时候需要延迟初始化来降低初始化类和创建对象的开销。为众人所致的双重检查锁定是常见的延迟初始化技术,但它是一个错误的用法。
比如下面的用法:

1
2
3
4
5
6
7
8
9
public class UnsafeLazyInitialization{
private static Instance instance;
public static Instance getInstance(){
if(instance == null){
instance = new Instance();
}
return instance;
}
}

假设线程A执行getInstance()初始化对象还未完成时,线程B 判断 instance 变量为 null,也执行一遍Instance的初始化。那么最终会出现两个Instance变量。

当然,我们可以简单粗暴的给它加上 synchronized,保证同一时间只有一个线程能获得锁。

1
2
3
4
5
6
7
8
9
public class SafeLazyInitialization{
private static Instance instance;
public synchronized static Instance getInstance(){
if(instance == null){
instance = new Instance();
}
return instance;
}
}

由于对getInstance() 方法做了同步处理,synchrnoized 将导致 getInstance() 方法被多个线程频繁的调用,将会导致程序执行性能的下降。

在早期,synchronized 存在巨大开销,人们为了降低互斥的开销,采取了一些技巧:“双重检查锁定”,通过“双重检查锁定”来降低同步的开销。下面时“双重检查锁定”的代码

1
2
3
4
5
6
7
8
9
10
11
public class DoubleCheckLocking{
private static Instance instance;
public static Instance getInstance(){
if(instance == null){
synchronized(DoubleCheckLocking.class){
instance = new Instance();
}
}
return instance;
}
}

如上面代码所示,如果第一次检查instance部位null,那么就不需要执行下面的加锁和初始化操作。因此可以大幅降低synchronized的开销。上面的代码完善了前面两个例子的缺点:

  • 多个线程试图在同一个线程间创建对象;通过加锁保证只有一个线程能创建对象
  • 在对象创建好后,再次访问getInstance() 不需要再获取锁,直接返回已经创建好的对象

然而“双重检查锁定”也存在问题,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

* memory = allocate(); // 分配对象的存储空间
* ctorInstance(memory); // 调用对象的初始化函数
* instance = memory; // 将instance指向 刚刚分配的内存地址

因为三行伪代码都再临界区内,所以编译器和处理器会对他做一些重排序,下面是可能的执行顺序:

* memory = allocate();
* instance = memory;
* ctorInstance(memory);

不违反happens-before的原则,所以JMM允许重排序,该代码在单线程可以运行的很高校,但是在多线程下会引发问题,即B线程会读取一个尚未完全初始化的对象。执行顺序流程图如下所示:

![](https://blog-1252749790.file.myqcloud.com/JavaConcurrent/problem_doublechecklocking.png)


为了解决这个问题可以从两个方面下手:
1. 禁止图上2和5操作的重排序
2. 在初始化时,不允许其他线程访问

第一个方面可以通过volatile来实现,即
```java
public class NewDoubleCheckLocking{
private volatile static Instance instance;
public static Instance getInstance(){
if(instance == null){
synchronized(NewDoubleCheckLocking.class){
instance = new Instance();
}
}
return instance;
}
}

前面讲过volatile关键字可以保证可见性和单个操作的原子性,所以可以避免创建对象过程被重排序。

第二个方面可以通过类初始化的解决

1
2
3
4
5
6
7
8
public class InstanceFactory{
public static class InstanceHolder{
public static Instance instance = new Instance();
}
public static Instance getInstance(){
return InstanceHolder.instance;
}
}

假设线程A初次调用getInstance,线程B也初次调用,下面是执行的示意图。

这里的语义和获取互斥锁一样,虽然允许初始化时的重排序,但是不会被其他线程所观测到。Java语言规范规定,对于每一个类或接口C,都有一个唯一的初始化锁LC与之对应。从C到LC的映射,由JVM的具体实现去自由实现。JVM在类初始化期间会获取这个初始化锁,并且每个线程至少获取一次锁来确保这个类已经被初始化过了。

第一个阶段:
线程A和线程B同时去获取Class的锁,线程A抢占成功并设置Class状态,随后就释放了锁;而线程B因此进入阻塞状态,示意图如下。

第二个阶段:
线程A在释放了锁后就要开始初始化,而线程B获取到了锁,看到Class状态还是initializing,就放弃锁并进入等待状态。

第三个阶段
线程A执行完初始化,获取Class锁,将state设置为initialized,然后唤醒其他等待中的线程。

第四个阶段
线程B被唤醒,线程B尝试获取Class锁,读取到state为initialized,释放锁并访问这个类。

第五个阶段
当初始化完毕后,线程C来访问该Class,线程C获取初始化锁,读取状态,如果为initialized就释放锁,直接访问类

《Java并发编程的艺术》之happens-before

happens-before 是JMM的核心概念

JMM的设计

  • 程序员对内存模型的使用。程序员希望内存模型简单易用、易于理解,程序员需要一个强内存模型(尽量偏向顺序一致性)编写程序
  • 编译器和处理器对内存模型的实现。编译器和处理器希望内存模型对它们的束缚越小越好,编译器和处理器需要一个弱内存模型(尽量远离顺序一致性)

所以JSR-133专家组在设计JMM时候就需要找到一个平衡点:一方面要保证程序员的简单易用性,一方面要保证对处理器和编译器的限制尽可能放松。

1
2
3
int a = 1; // ①
int b = 2; // ②
int result = a * b; //③

上述代码的happens-before关系如下所示:

  1. ① happens-before ③
  2. ② happens-before ③
  3. ① happens-before ②

其中1、2是必须遵守的,3是不必要的。因此JMM将重排序分为以下两类:

  1. 会改变程序执行结果的
  2. 不会改变程序结果的

而针对这两类,JMM分别进行以下的处理:

  1. 会改变程序执行结果的,JMM坚决禁止
  2. 不会改变程序结果的,JMM尽可能放松限制

happens-before其实就是一剂安慰剂。程序员遵守happens-before的规则,JMM为其提供内存可见性,happens-before对程序员来说看似是禁止了除规定以外的内容,实际上允许编译器和处理器对不改变程序结果的内容都进行了重排序。

happens-before是一个中间产物,从上往下看,它是基础,是规则;从下往上看,它是限制,是约束;所以讨论happens-before的时候一定要讲清楚是哪个视角!(这里以前纠结了很久)

happens-before的定义

  1. 如果一个操作happens-before另一个操作,那么第一个操作的执行结果将对第二个操作可见,而且第一个操作的执行顺序排在第二个操作之前。
  2. 两个操作之间存在happens-before关系,并不意味着Java平台的具体实现必须要按照happens-before关系指定的顺序来执行。如果重排序之后的结果,与按照happens-before关系来执行的结果一致,那么这种重排序并不非法(也就是说JMM允许这种重排序)

as-if-serial语义保证单线程内执行结果不被改变,happens-before保证正确同步下多线程的程序执行结果不被改变。

happens-before有哪些规则

  1. 程序顺序性规则:一个线程中的每个操作,happens-before于同个线程中任意后续的操作。
  2. 监视器锁规则:对一个锁的解锁,happens-before于随后对这个锁的加锁
  3. volatile变量规则:对一个volatile的写,happens-bfore于任意后续对volatile的读
  4. 传递性:如果A happens-before B,且B happens-bfore C,那么 A happens-before C
  5. start()规则: 如果A线程执行ThreadB.start(),那么A线程的ThreadB.start()操作happens-before于线程B的任意操作
  6. join()规则:如果A线程执行ThreadB.join()并成功返回,那么线程B中的任意操作happens-before A线程从ThreadB.join()操作成功返回。

规则举例一

按照程序顺序规则,操作1 happens-before 2、3 happens-before 4,其中因为start规则,2 happens-bfore 3。最后根据传递性规则,1 happens-before 4。这意味着操作1 保证对 操作2 可见。

规则举例二

按照程序顺序规则,1 happens-before3、 4 happens-before 5,而join规则要求3 happens-before 4,所以最后根据传递性规则,1 happens-before 5。这意味着操作 1 保证对操作5 可见

《Java并发编程的艺术》之final

final的重排序规则

以下面的代码为例,讲解final写和final读的重排序规则

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public FinalExample{
int a;
final int b;
private static FinalExample self;
private FinalExample(){
this.a = 1;
this.b = 5;
}

public static FinalExample init(){ //线程A执行
self = new FinalExample();
}

public static void read(){ //线程B执行
FinalExample temp = self; // 获取引用
int resultA = temp.a; // 读普通域
int resultB = temp.b; // 读final域
}
}

final写的重排序规则

  • JMM保证写final变量时不被编译器重排序到构造函数外
  • 编译器会在写final域后,构造函数返回前插入StoreStore屏障

假设现在线程A执行init(),线程B执行read()时,可能的执行顺序如下所示:

当A线程初始化构造函数的时候,有可能会将普通域的初始化重排序到构造函数外,所以当B线程读取普通域的时候很可能获取到0,但是在后续的执行中,该值又会变成给定的数值1。
而编译器在遇到final域时,会在final写后面加入StoreStore内存屏障,保证在结束函数构造前执行final写。

final域可以保证任何线程在读取该final变量时,已经正确初始化过。

final读的重排序规则

在讲final写的重排序规则时,着重点放在了写上。而final域在读方面也做了一些特别处理。通常情况下,一个获取对象的引用和读取该对象的普通域是可能发生重排序。所以可能会发生下面这样的执行顺序:

read()方法乍一看好像不会发生重排序,因为resultA 的写好像依赖temp变量。而resultA实际上是依赖temp引用里的a变量,间接依赖temp对象。虽然有的处理器不会对间接依赖进行重排序,但是不乏万一,比如alpha处理器,JMM就是为了避免这种会重排序间接依赖的处理器,所以给final读加上了下面的重排序规则:

  • JMM保证 初次读对象引用与初次读该对象的final域不会被重排序
  • 编译器会在读final域的前面插入LoadLoad屏障

final域为引用类型

在构造函数内对一个final引用的对象的成员的写入,与随后在构造函数外把这个被构造对象的引用赋值给一个引用变量,这两个操作不能重排序。

final引用不能从构造函数内“溢出”

写final的重排序规则虽然保证了,final域的写会在构造函数执行之前完成,并对其他线程可见。但是如果在构造函数内,引用就发生了溢出,那么就无法保证了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public EscapeFinalExample{
int a;
final int b;
private static EscapeFinalExample self;
private EscapeFinalExample(){
this.a = 1;
this.b = 5;
self = this; // this引用溢出
}

public static EscapeFinalExample init(){ //线程A执行
new EscapeFinalExample();
}

public static void read(){ //线程B执行
EscapeFinalExample temp = self; // 获取引用
int resultA = temp.a; // 读普通域
int resultB = temp.b; // 读final域
}
}

假设线程A执行init(),线程B执行read(),这里的A线程还未完成完整的初始化方法,对象引用就被B可见了。即使代码上 this溢出操作放在最后,仍然有可能被重排序。它们的执行时序如下所示:

上图可以看出,构造函数还没有完成时,final域对其他线程不可见。只有在完成了构造函数后,final域才对其他线程可见。

final语义的特殊例子

在X86处理器上,由于不会发生写写、读写、读读的重排序,所以没有StoreStore内存指令,故在使用final时,编译器会忽略StoreStore内存屏障,同样LoadLoad内存屏障也会被忽略。也就是说,在x86处理器上,final是不做任何处理的。

为什么要增强final语义呢

一方面是final本身是不可修改的,其他线程不该看到final的变化。比如一开始线程读取final值为默认值0,过一段时间再读这个final变量,final值变为值1(被初始化后)。
所以新的模型就保证了,只要正确的完成构造函数(不发生this溢出),即使不用同步,也可以保证其他线程见到final初始化后的值。

《Java并发编程的艺术》之synchronized及JUC

synchrnoized的happens-before

1
2
3
4
5
6
7
8
9
10
11
int a;
boolean flag;
public synchronized void init(){// ①
a = 100; // ②
flag = true; // ③
}// ④
public synchronized void doTask(){ // ⑤
if(flag){ // ⑥
int result = a; // ⑦
}
} // ⑧

假设线程A执行init(),线程B执行doTask(),有如下的happens-before关系:

  • 根据程序次序规则:
    • ① hb ②
    • ② hb ③
    • ③ hb ④
    • ⑤ hb ⑥
    • ⑥ hb ⑦
    • ⑦ hb ⑧
  • 根据监视器规则:
    • ① hb ④
    • ④ hb ⑤
    • ⑤ hb ⑧

根据传递规则,保证init()方法所有的修改对doTask()方法可见,happens-before关系如下所示

1、2间的表示程序次序性规则,4、5间的表示监视器规则,由于3、4有happens-before关系,4、5有happens-before关系,所以根据传递性规则,2、6间有happens-before关系。

线程A释放锁之前所有可见的共享变量,在线程B获取同个锁之后就变得可见了。

synchrnoized的内存语义

synchrnoized获取锁内存语义

当释放锁时,JMM会把线程对应的本地内存中的共享变量刷新到主内存。以上面的例子为例,共享数据的状态示意图如下所示

synchrnoized释放锁内存语义

当A线程获取到锁时,JMM会把该线程对应的本地内存置为无效。从而使得被监视器保护的临界区代码必须从主内存中读取共享变量。

语义总结

对比锁释放-获取的内存语义和volatile的写-读内存语义可以看出:锁释放与volatile写有相同的内存语义;锁获取和volatile读有相同的内存语义。

  • 线程A释放一个锁,实质上是告诉其他 要获取该共享变量 线程 一个消息(线程A所做的修改)
  • 线程B获取一个锁,实质上是接受到其他线程发出的消息(释放这个锁的线程对共享变量所做的修改)
  • 线程A释放,随后线程B获取,实质上是线程A通过主内存给线程B发送消息。

这里判断语义是否相同是通过两个操作的流程是否相同,比如线程A的锁释放完时,刷新至主内存;volatile写完后,刷新至主内存,并通知其他线程本地内存的共享变量失效(在锁释放环节里是交给锁获取执行);

CAS和JUC

synchronized是通过控制对象头来控制锁的升级,但是具体获取锁和释放锁的流程藏在JVM里,这里将通过ReentrantLock类比synchronized过程。

ReentrantLock的实现流程

这里要学习的是CAS,JDK文档对该方法的说明如下:如果当前状态值等于预期值,则以原子方式将同步设置为给定的更新值。此操作具有volatile读和写的语义。
前面讲到volatile写保证volatile写不会和前面的操作发生重排序,volatile读保证volatile读不会和后面的操作发生重排序。组合这两个条件就意味着同时实现了 禁止某一操作和操作前、操作后的重排序。CAS操作就是如此,它在是通过加上lock前缀来实现以下的功能:

  • 使用缓存锁定保证原子性
  • 禁止之前和之后的重排序
  • 把写缓冲区中的所有数据刷新到内存

正是因为CAS同时具有volatile读和写的内存语义,因此Java线程之间的通信有下面四种方式。

  1. A线程写volatile变量,B线程读这个volatile变量
  2. A线程写volatile变量,B线程用CAS修改volatile变量
  3. A线程用CAS修改volatile变量,B线程用CAS修改这个变量
  4. A线程用CAS修改volatile变量,B线程用volatile读取该变量

JUC包的通用化的实现模式:

  • 声明共享变量为volatile
  • 使用CAS的原子条件来实现线程间的同步
  • 配合volatile读/写和CAS 来实习线程间的通信

AQS,非阻塞数据结构和原子变量类,这些JUC包中的基础都是使用上面的模式来实现的,而JUC包的高层类又是依赖这些基础类来实现的。从整体看,JUC包的实现示意图如下所示。

《Java并发编程的艺术》之volatile

本章是在学习内存模型后,对Volatile关键字 有了更加全面得理解,对知识点进行一个分析总结。

volatile的特性

volatile在单个操作上和synchronized一样

  • 可见性:volatile字段的写操作保证对所有线程可见
  • 原子性:volatile字段的单个读写操作是原子性的(比如在32位机上,读取64位的long、double类型),但是volatile++就不具有原子性

volatile的happens-before

本节采用happens-before关系阐述volatile的作用

1
2
3
4
5
6
7
8
9
10
11
12
int a;
volatile boolean flag;
public void init(){
a = 1; //①
flag = true; // ②
}
public void doTask(){
if(flag){ // ③
result = a; // ④
}
......
}

假设线程A执行init(), 线程B执行doTask();这个过程里,happens-before关系共分为三类:

  1. ① happens-before ②
  2. ③ happens-before ④
  3. ② happens-before ③

这个过程的happens-before关系图如下所示:

分别遵守程序次序规则、volatile变量规则和传递规则:

  • 程序次序规则:一个线程内,按照代码顺序,书写在前面的操作先行发生于书写在后面的操作
  • volatile变量规则:对一个变量的写操作先行发生于后面对这个变量的读操作
  • 传递规则:如果操作A先行发生于操作B,而操作B又先行发生于操作C,则可以得出操作A先行发生于操作C

volatile的内存语义

JMM针对编译器制定的volatile的重排序规则

  • 第二个操作是volatile写时,不管第一个操作是什么,都不能重排序。这个规则确保volatile写之前的操作不会被编译器重排序到volatile写之后。
  • 当第一个操作时volatile读时,不管第二个是什么,都不能重排序。这个规则确保volatile读之后的操作不会被编译器重排序到volatile读之前。
  • 当第一个操作是volatile写,第二个操作是volatile读时,不能重排序。

volatile的写内存语义

当volatile写发生时,本地内存将刷新主内存。就拿上面happens-before的例子来说,当A线程执行init()写入volatile变量后,B线程执行doTask()读取volatile变量。内存状态变化图如下所示

线程A写入flag变量后,本地内存将更新的共享变量(更新了几个就刷新几个)刷新至主内存,此时A线程的本地内存和主内存的共享变量相同。

volatile的读内存语义

当B线程要读取flag变量时,本地内存B 中包含的共享变量已经被置为无效,B线程不得不去主内存读取共享变量。线程B的读取将导致本地内存B与主内存的共享变量的值变成一致。

将两张图综合起来看,在读线程B读取一个volatile变量后,写线程A在写这个volatile变量之前所有可见的共享变量都将立即变得对读线程B可见。

语义总结

  • 当写线程写了一个volatile变量,实质是写线程向接下来要读取这个变量的线程发出了消息(对其共享变量所做的修改)
  • 读线程读对应的volatile变量,实质是收到写线程发来的消息(在volatile写之前的共享变量的修改)
  • 随后写线程写入volatile变量,读线程去读取volatile变量,这个过程实质是A线程通过主内存给B线程发送消息

volatile内存语义的实现

volatile关键字实现原理主要还是通过内存屏障进行控制的。编译器在生成字节码时,会在指令序列里插入内存屏障来禁止特定的重排序。对于编译器来说,自行判断最小化插入屏障总数不太可能。为此,JMM采取保守策略:

  • 在每个volatile写操作的前面加入StoreStore
  • 在每个volatile写操作的后面加入StoreLoad
  • 在每个volatile读操作的后面加入LoadLoad
  • 在每个volatile读操作的后面加入LoadStore

上面的插入策略十分保守,但它可以保证在任意处理器平台上(在X86里,写/写,读/读,读/写 是不会发生重排序的,而且只有StoreLoad一个内存屏障),任意的程序中都能实现正确的语义。

volatile写的内存语义实现

下面是保守策略下,volatile写插入内存屏障的指令序列示意图。

StoreStore保证在执行volatile写前,所有写操作的处理已经刷新至内存,保证对其他线程可见了。而StoreLoad的作用是避免后面还有其他的volatile读/写操作发生重排序。由于JMM无法准确判断StoreLoad所处的环境(比如结尾是return),所以有两种选择:

  1. 在volatile读前加上StoreLoad
  2. 在volatile写后加上StoreLoad

但是因为StoreLoad相比其他内存屏障更加消耗性能,考虑更多场景下是少写多读,所以将StoreLoad加在volatile写后。

讲到StoreLoad的性能问题,不得不提一下Unsafe里面的putOrderedObject()。 这个方法很有意思,乍一看命名是放一个有序的对象,但它是通过避免加上StoreLoad内存屏障来弥补volatile写的性能问题。这时可能会有朋友问,不加上volatile不会影响可见性吗?会影响可见性,但不会永远影响下去,最多就两三秒的延迟,就会将共享变量刷新至主内存。所以当延迟要求不高,性能要求高时,就可以采用这个方法(Unsafe不安全类,这个方法的实现在Atmoic***类里面)。

volatile读的内存语义实现

下面是保守策略下,volatile读插入内存屏障的指令序列示意图。

LoadLoad保证先执行volatile读再执行后续的读操作(禁止volatile读和后续的读发生重排序),而后的LoadStore保证先执行volatile读再执行写操作(禁止volatile读和后续的写发生重排序)。两者联合起来就是无论如何volatile读必须和程序顺序保持一致。

volatile执行时的优化

上面的volatile读/写的内存屏障插入策略都十分保守,但是在实际过程中,只要不改变volatile写/读的内存语义,编译器可以根据实际情况省略不必要的屏障。

1
2
3
4
5
6
7
8
9
10
int a;
volatile int v1 = 1;
volatile int v2 = 2;
void readAndWrite(){
int i = v1;
int j = v2;
a = i + j;
v1 = i + 1;
v2 = j + 2;
}

针对readAndWrite()方法,编译器在生成字节码时会做如下优化。

按顺序下来,第一个volatile读先于第二个volatile,第二个volatile先于所有后续的写,故第一个volatile读一定不会被重排序;StoreStore保证普通写先于第一个volatile写,StoreStore又保证第一个volatile写先于第二个volatile写,最后安全起见插入StoreLoad。

上面的优化针对任意处理器平台,由于不同的处理器有不同“松紧度”的处理器内存模型,内存屏障的插入还可以根据具体的处理器内存模型继续优化。比如X86处理器,由于X86不会对读/读,读/写,写/写做重排序,所以面对X86处理器时,JMM会省略掉三种类型对应的内存屏障,保留StoreLoad内存屏障。

JSR-133为什么增强volatile的内存语义

在之前的版本,虽然不允许volatile变量间 的重排序,但是允许volatile和普通变量间的重排序。为了提供一种比锁更轻量级的线程间通信机制,专家组决定增强volatile的内存语义,严格限制volatile变量与普通变量的重排序,确保volatile的写-读和锁的释放-获取具有相同的内存语义。

由于volatile仅仅保证对单个volatile变量的读/写具有原子性,而锁的互斥执行的特性可以确保对整个临界区代码的执行具有原子性。在功能上,锁比volatile更强大;在可伸缩性和性能上,volatile更有优势。具体看《Java理论与实践:正确使用volatile变量》

关于final插入StoreStore的疑惑

在学习volatile和final时,注意到写时的区别:

  1. volatile写时往前面插入StoreStore内存屏障,保证第一个操作先发生于volatile写
  2. final在构造函数时在final写后插入StoreStore内存屏障,保证final写先发生于后面的操作

想必大家经常会听到volatile会保证 volatile写前的所有操作先执行完再执行volatile写。

而在对final域和普通域初始化的构造函数里,即使final写前面有StoreStore,前面的普通写/读仍然会发生重排序??