《Java并发编程的艺术》之Java内存模型

整体层次思路:Java采用的是内存共享模型,该模型会遇到内存可见性的问题,而内存可见性通常都是由 重排序写缓冲区 引发的,重排序又分为 处理器重排序编译器重排序。面对 写缓冲区的问题,像Java这样的高级语言一般无能为力,所以从重排序 入手,在重排序里,JVM通过内存屏障提供了一层最低限度的保障(比如初始化保证默认值,静态类第一次加载等等)。但是需要更高的保障(比如顺序一致性)还是需要更高的性能就由程序员自行定夺,我将面向程序员的那部分划分为了三块,一块是happens-before规则,一块是同步原语(volatile、synchrnoized等等),最后则是Doug Lea 大大写的JUC包。我想这些规则及工具在多线程开发中会起到至关重要的作用。也是我们学习的重中之重。


相信许多在多线程前线作战的伙伴们经常会遇到一种叫做内存可见性的大麻烦。这个麻烦可以简单粗暴的用synchronized解决,也可以很巧妙的用一些轻量级的同步原语解决。虽然能解决问题的都是好方案,但是在程序人生的旅途上,后者才是更为远见的选择。

在面对内存可见性这个问题时,我们不得不先去学习一个叫做内存模型的东西。这个模型解释了 如何解决多线程间的通讯如何实现多线程间的同步 两个问题。
传统上有两种模型,一种叫做内存共享模型,另外一种则时消息传递模型。而Java采用了内存共享模型,后文简称JMM。这个抽象概念主要描述了:

线程间的共享变量存储在主内存中,每个线程拥有自己的本地内存,每个本地内存上都有该线程读/写变量的副本。

JMM概念图

然而了解这个模型还不够,我们还要知道在Java编译、运行阶段都会有一种优化手段——重排序。重排序分为以下三种类型:

  • 编译器优化的重排序。在不改变单线程程序语义的前提下,可以重新安排语句的执行顺序
  • 指令集并行的重排序。现代处理器采用了指令级并行技术(Instruction-Level Parallelism)来将多条指令重叠执行,如果不存在数据以来,处理器可以改变语句对应机器指令的执行顺序。
  • 内存系统的重排序。由于处理器适用缓存和读/写缓冲区,这使得加载和存储操作看上去是乱序执行的。

第1类属于编译器重排序,第2、3类型属于处理器重排序。而JMM作为语言级的内存模型,它确保在不同的编译器和不同的处理器平台之上,禁止特定类型的编译器重排序和处理器重排序,为程序员提供一致的内存可见性保障。

另外Java尽可能地满足处理器级别的重排序优化,又让程序员自己把握优化程度。正如我们前面所讲的,遇到内存可见性问题,可以简单的用synchronized 大范围的禁止重排序,也可以根据实际情况,选用volatile、final等轻量级同步原语 仅仅禁止关键部分的重排序。

总而言之,发生内存可见性问题的原因不外乎两点:

  1. 本地内存(在CPU那个层面理解为缓冲区)在作怪
  2. 重排序

第一点属于硬件架构问题,基本上是无法从语言层面进行解决。故通过解决重排序问题来解决内存可见性问题

深入底层,了解本地内存

现代处理器都会配备一个写缓冲区,该缓冲区用来暂存写入的变量,保证指令流水线持续运行,避免CPU停下来等待向内存写入数据而产生的延迟;同时也能合并对同一地址的多次写。虽然写缓冲区好处多多,但是只对自己的CPU可见。这个特性在重排序的加持下,容易发生CPU对内存的读/写顺序 和实际内存发生的读/写顺序 不一致的情况。

假设现在有这样一个情况,线程A和线程B并发执行:

  • 处理器A:
    • int a = 1;
    • int a = b;
  • 处理器B:
    • int b = 2;
    • int b = a;

这样的程序偶尔会出现预料之外的结果,比如a和b均为0,或者a=1,b=1等等。具体原因如下图所示

这里CPU0 和 CPU1 均往自己的缓冲区写入数据,然后从内存中读取共享变量,最后才把写缓冲区中的数据刷新至主内存。当以这种时许执行时,就会出现a = y = 0的情况。

从内存理想执行角度看,下面的图例可能更符合直观感受:先写入缓冲区,缓冲区刷新到主内存,最后CPU从主内存中读取。对处理器来说,它认为执行顺序是①、②、③,但是实际操作情况却是①、③、②。此时CPU0的内存操作顺序被重排序了。

为了解决一些重要操作被重排序导致的问题,处理器提供了一种被称作内存屏障(Memory Fence) 的CPU指令,该指令可以处理重排序和可见性问题。Java编译器在生成指令序列时,会在适当位置中插入内存屏障来禁止特定类型的处理器重排序(所以那些同步原语本质都是内存屏障在其作用)。JVM把内存屏障分为4类,如下所示:

屏障类型 指令示例 说明
LoadLoad Load1;LoadLoad;Load2 在Load2及所有后续读取指令之前,Load1读取数据完毕
StoreStore Store1;StoreStore;Store2 在Store2及所有后续写入指令执行前,Store1写入的数据对其他处理器可见(将修改的变量都刷新到主内存中,毕竟刷新不可能只刷新Store1这一个数据,而是Store1及前面所有的修改后的共享变量)
LoadStore Load1;LoadStore;Store2 在Store2及所有后续写入指令刷新到主内存前,Load1读取数据完毕
StoreLoad Store1;StoreLoad;Load2 在Load2及后续所有读取操作执行前,保证Store1的写入对所有处理器可见(将修改的变量都刷新到主内存中)

最后一个StoreLoad指令是万能指令(有些处理器不支持前三种指令),兼具前三种指令的功能,且开销最大。
内存屏障在禁止重排序、保证内存可见性方面作用极大,为后续JMM的规则打下了基础。

解决内存可见性问题

为了能让JVM在一定程度上(因为重排序就是优化)保持重排序,又能解决内存可见性问题。Java 在 JSR-133 里推出了Happens-Before规则,修改了volatile、final等同步原语

程序顺序规则(as-if-serial)

上面我们都了解到了处理器重排序 对 CPU的影响,现在我们看看编译器重排序对单线程的影响

1
2
3
int a = 1; // ①
int b = 3; // ②
int c = a * b; // ③

在单线程的环境下运行这段代码,会和直观感受一样,它的结果是3,但是它的执行顺序是否和想象中一样就不得而知了。这就引出了一个这么概念——if as serial,这个概念主要描述了 不管如何重排序,单线程程序的执行结果不能被改变,编译器、runtime和处理器都必须遵守这个as-if-serial语义(处理器本身是不会遵守的,但是有JMM的控制)。

因为③依赖①、②的数据,所以为了保证结果一致,无论如何③都不会被排到①或②之前。

as-if-serial语义使单线程程序员无需担心重排序会干扰他们,也无需担心内存可见性问题。这是JMM对程序员作的第一个保证:在单线程下,你不用管重排不重排,结果肯定给你保证一样。

但是面对多线程的情况下,这个语义显得有点单薄,无法保证 多线程的重排序不会对程序有影响。为了能让多线程也无需担心重排序和内存可见性问题,我们需要同步

顺序一致性模型

在讲同步之前,需要了解下顺序一致性模型,这个模型是一个理论参考模型,它为程序提供了极强的内存可见性,该模型有以下两个特点:

  1. 一个线程中的所有操作必须按照程序的顺序来执行
  2. (不管程序是否同步)所有线程都只能看到一个单一的操作执行顺序。在顺序一致内存模型中,每个操作都必须原子执行且立刻对所有线程可见

在顺序一致性内存模型中,假设现有A、B两个线程,A线程先获取锁再执行A1、A2、A3,B线程同上。它的执行顺序图如下所示
SequenceModelInSynchronized
当线程A获取锁时,按顺序依次执行A1到A3;在A释放锁,B获取时,按顺序依次执行B1到B3。

假设现在不用锁这些同步工具,它的执行顺序如下所示
SequenceModelInNormal
虽然整体执行顺序发生了改变,但是再每个线程依然能看到一个一致的执行顺序,之所以能保证它一致就是因为上述的两个特点:必须按程序顺序执行、每个操作立即对所有线程可见。
但是在JMM上却没有这个保证,首先JMM不是顺序一致性模型,而且未同步程序的执行顺序是无需的,而且所有线程看到的执行顺序也可能不一样。就比如A线程先写入了一个数值,缓存在本地内存,A线程以为写进去了,但是本地内存只对A线程可见。其他线程仍然是原来的数值,只有等本地内存将数据刷新到主内存,其他线程才可见。这种情况下,当前线程看到的执行顺序和其他线程顺序将不一致。

JMM里正确同步地顺序

讲完了理想的顺序一致性模型,我们回归到现实,看看JMM里的同步程序执行顺序。

1
2
3
4
5
6
7
8
9
10
11
12
int a;
int result;
boolean flag;
public synchronized void init(){
a = 1;
flag = true;
}
public synchronized void doTask(){
if(flag){
result = a;
}
}

在这个代码里,假设A线程执行init()方法,B线程执行doTask()方法。根据JMM规范,只要正确同步的程序,结果都会和顺序一致性内存模型的结果一致。下面是该程序在两个内存模型的时序图。
对于JMM来说,在临界区内的代码可任意重排序(但不允许临界区内的代码溢出到临界区外,那样会破坏监视器语言),JMM会在进入临界区和退出临界区时做一些特别的处理,使程序在这两个时间点具有和顺序一致性相同的内存视图。虽然线程A在临界区内进行了重排序,但是在监视器互斥执行的特性下,线程B根本不知道过程,只能观测到结果。这种重排序既提高了效率,又没改变程序的执行结果。

从这里我们可以看到JMM在不改变(正确同步)程序执行结果的前提下,尽可能地为编译器和处理器地优化打开了方便之门。

JMM里没有同步地执行顺序

在JMM里,对没有正确同步地程序只提供了最小安全性:线程执行时读取到的值,要么时默认值(0,null,false),JMM保证线程读取到的值不会无中生有。为了实现最小安全性,JVM在堆上分配对象时,首先会对内存清零,然后才会在上面分配对象(JVM内部会同步该操作)。因此,在已清零的内存空间中分配对象时,域的默认初始化已经完成了。

在JMM里,未同步的程序不能保证执行结果和顺序一致性模型的结果一样。因为JVM要这么做的话需要大量禁止重排序,极其影响性能。未同步程序在JMM中的执行顺序整体上是无序的,其执行结果也是不可预测的,在两个模型中的执行特性有如下几个差异。

  • 顺序一致性模型保证单线程内的操作会按程序的顺序执行,而JMM不保证单线程的操作会按程序顺序执行
  • 顺序一致性模型保证所有线程只能看到一致的操作顺序,而JMM不保证所有线程能看到一致的操作执行顺序
  • JMM不保证对64位的long、double型变量的写操作具有原子性,而顺序一致性保证所有对内存的读写操作都是原子性

第三点与处理器总线工作机制密切相关。这里放出书上的原话

JVM在这点上也只是鼓励去做但不强求,因为在一些32位处理器上,要64位的写操作保持原子性是会需要大开销的。所以当JVM在这种处理器上运行时,会把64位long/double型变量拆做两个32位的写操作执行。此时,这种64位变量的写操作将不具有原子性。当单个内存操作不存在原子性时,可能会产生意想不到的后果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class Target{
public long a;
}
public static void main(String[] args){
Target target = new Target();
new Thread(() -> while(true) target.a = 0l).start();
new Thread(() -> while(true) target.a = Long.MAX_VALUE).start();

while(true){
String binary = toBinary(target.a);
// 出现不是0,又不是MAX_VALUE的二进制字串是就输出
if(!binary.equals("00000000000000000000000000000000") &&
!binary.equals("01111111111111111111111111111111")){
System.out.println("long不是原子性操作..");
break;
}
}
}
private static String toBinary(long l) {
StringBuilder sb = new StringBuilder(Long.toBinaryString(l));
while (sb.length() < 64) {
sb.insert(0, "0");
}
return sb.toString();
}

代码如上所示,假设处理器A写一个了long型变量,同时B处理器要读取这个long型变量。处理器A中的64位操作被拆分为两个32位的写操作,且这两个32位的写操作被分配到不同的事务中执行。同时处理器B 64位的读操作被分配到单个读事务中处理,执行顺序如下图所示时就会发生处理器A写到一半的数据被处理器B看见。

《Java并发编程的艺术》之synchronized的底层实现原理

在学习锁优化时,对象头(Mark Word) 是必不可缺的一环,因为synchronized 用的锁是存在对象头里的。32位的虚拟机上对象头占64位(8字节),64位的虚拟机上对象头占128位(16字节)^objectHead;而不同的类型,对象头的布局不太一样:

  • 数组类型:Mark Word、Class Metadata Address、Array Length
  • 普通类型:Mark Word、Class Metadata Address

Mark Word 表示对象的HashCode锁信息
Class Metadata Address 表示对象的数据类型在方法区对应的地址
Array Length 表示数组的长度(只在对象是数组的情况下才会存在)

对象头的默认表示应该如下所示

锁状态 25bit 4bit 1bit是否是偏向锁 2bit 锁标志位
无状态锁 对象的hashcode 对象分代年龄 0 01

具体的对象内存布局看这篇文章

而根据JVM的设置^1,具体分配时又会有不同的情况,如下所示

偏向锁到重量锁的过程

当关闭了偏向锁的设置,那么就会走左边的流程;反之则走右边的流程。

偏向锁

由于大多数情况下,锁大多都不处于多线程竞争状态,而且总是由同一个线程获取,所以JVM在1.6之后加入了偏向锁轻量锁 ,如今总共由4种锁状态:无状态锁偏向锁轻量锁重量锁。随着线程竞争的提升,锁会逐渐升级(无法降级)。
偏向锁在没有竞争的情况下可以提高同步的性能,这方面主要体现在偏向锁只需要进行一次CAS而轻量锁需要两次。它是一个需要权衡利弊的选择,它不是在任何情况下都对程序有利的。如果竞争很多,那么撤销偏向锁的过程就会成为性能瓶颈。

当偏向锁可用时,初始化的对象头分配如下所示

锁状态 23bit 2bit 4bit 1bit 是否是偏向锁 2bit 锁标志位
偏向锁 线程ID epoch 对象分代年龄 1 01

加锁过程

  1. 当对象头的isBiased 为1时且锁状态为01时,偏向锁可用,继续后面的流程
  2. 判断目标对象头是否包含本线程ID,如果没有,则直接CAS往对象头里写入本线程ID。到这一步加锁就结束了

锁撤销

由于偏向锁使用了一种直到竞争发生时才会释放的机制,所以当其他线程竞争偏向锁时,持有偏向锁的线程才会去释放锁。

  1. 等待原持有偏向锁的线程(后文简称原线程)运行至全局安全点(safe point)
  2. 暂停原线程
  3. 检查原线程 的线程状态,如果退出了同步代码块,则重偏向;反之升级为轻量锁
  4. 恢复原线程

轻量锁

加锁过程

注意:轻量锁会一直保持,唤醒总是发生在轻量锁解锁的时候,因为加锁的时候已经成功CAS操作;而CAS失败的线程,会立即锁膨胀,并阻塞等待唤醒。
引用Java并发编程艺术的图片

  1. 第一次进入同步块,开辟一个叫做Lock Record 的空间用于存储锁记录
  2. 将对象头中的Mark Word 复制到 当前线程栈中
  3. 尝试用CAS将Mark Word 替换指向Lock Record的指针
  4. 第三步操作成功,则将Mark Word 设置为00状态,标识轻量锁
  5. 然后执行同步体
  6. 第三部操作失败,进入自旋获取锁
  7. 自旋获取锁的失败次数到达阈值,膨胀锁,修改为重量级锁(状态改为10
  8. 线程阻塞

锁释放过程

  1. 尝试CAS将Lock Record的Owner 复制回 Mark Word
  2. 如果CAS操作成功,则表示没有竞争发生;否则看步骤3
  3. 释放锁并唤醒等待的线程

总结

本章是对synchronized 在JVM里的各种等级及升级的流程进行了讲解,其中主要是通过控制对象头的一些状态来控制锁的等级。偏向锁通过标记Thread ID 来表示,当前对象已经被对应线程占用;轻量锁则替换Mark WordLock Record 地址 来表示当前对象被对应线程占用。无论是哪种锁,在不同的场景下有不同的需求,可以参考以下表格做出选择

偏向锁:

  • 优点:加锁和解锁不需要额外小号,和执行非同步方法相比,仅存在纳秒级的差距
  • 缺点:如果线程间存在竞争,会带来额外开销(偏向锁的撤销)
  • 适用场景: 适用于只有一个线程访问同步块的场景

轻量锁:

  • 优点: 竞争的线程不会造成阻塞,提高了程序的响应速度
  • 缺点: 如果始终得不到锁,使用自旋会消耗CPU
  • 适用场景: 追求相应实践,同步块执行速度非常快

重量锁:

  • 优点: 线程竞争不使用自选,不会消耗CPU
  • 缺点: 线程阻塞,响应时间缓慢
  • 适用场景: 追求吞吐量,同步块执行速度较慢

这个是网上找到的关于锁撤销、膨胀等操作的总流程

ForkJoin并不是银弹

这是一段难受的排Bug的经历,ForkJoin框架的粗浅理解让我在单核服务器下发生了栈溢出,因为”ForkJoinPool.invokeAll()”的底层原理不仅会调用空闲的线程,也会调用当前的线程。如果处理不好,在单核服务器下就会发生死循环,最终产生栈溢出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.St
ackOverflowError
at java.util.concurrent.FutureTask.report(Unknown Source)
at java.util.concurrent.FutureTask.get(Unknown Source)
at cn.codeleven.App.main(App.java:20)
Caused by: java.lang.StackOverflowError
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)

at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Sou
rce)
at java.lang.reflect.Constructor.newInstance(Unknown Source)
at java.util.concurrent.ForkJoinTask.getThrowableException(Unknown Sourc
e)
at java.util.concurrent.ForkJoinTask.reportException(Unknown Source)
at java.util.concurrent.ForkJoinTask.join(Unknown Source)
at cn.codeleven.handler.XiCiHandler.handleDocument2IPEntity(XiCiHandler.
java:40)
at cn.codeleven.Crawler.start(Crawler.java:34)
at cn.codeleven.Crawler.run(Crawler.java:20)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.StackOverflowError
at java.util.stream.ReferencePipeline$3$1.<init>(Unknown Source)
at java.util.stream.ReferencePipeline$3.opWrapSink(Unknown Source)
at java.util.stream.AbstractPipeline.wrapSink(Unknown Source)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(Unknown Source
)
at java.util.stream.AbstractPipeline.evaluate(Unknown Source)
at java.util.stream.ReferencePipeline.collect(Unknown Source)
at cn.codeleven.handler.MyTask.compute(MyTask.java:34)
at cn.codeleven.handler.MyTask.compute(MyTask.java:13)
at java.util.concurrent.RecursiveTask.exec(Unknown Source)
at java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
at java.util.concurrent.ForkJoinTask.doInvoke(Unknown Source)
at java.util.concurrent.ForkJoinTask.invokeAll(Unknown Source)
at cn.codeleven.handler.MyTask.compute(MyTask.java:36)
at cn.codeleven.handler.MyTask.compute(MyTask.java:13)
at java.util.concurrent.RecursiveTask.exec(Unknown Source)
at java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
at java.util.concurrent.ForkJoinTask.doInvoke(Unknown Source)
at java.util.concurrent.ForkJoinTask.invokeAll(Unknown Source)
at cn.codeleven.handler.MyTask.compute(MyTask.java:36)
at cn.codeleven.handler.MyTask.compute(MyTask.java:13)
at java.util.concurrent.RecursiveTask.exec(Unknown Source)
at java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
at java.util.concurrent.ForkJoinTask.doInvoke(Unknown Source)
at java.util.concurrent.ForkJoinTask.invokeAll(Unknown Source)
at cn.codeleven.handler.MyTask.compute(MyTask.java:36)
at cn.codeleven.handler.MyTask.compute(MyTask.java:13)
at java.util.concurrent.RecursiveTask.exec(Unknown Source)
at java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
at java.util.concurrent.ForkJoinTask.doInvoke(Unknown Source)
at java.util.concurrent.ForkJoinTask.invokeAll(Unknown Source)
at cn.codeleven.handler.MyTask.compute(MyTask.java:36)
at cn.codeleven.handler.MyTask.compute(MyTask.java:13)

标题没想好,不过一定是写List集合的

以后有空完整了解下ArrayList和相关集合
这次先解决一个这样的问题,看下面代码:

1
2
3
4
5
6
7
8
List<Integer> foo = new ArrayList<>();
for(int i = 0; i < 10; ++i){
foo.add(i);
}
List<Integer> foo1 = foo.subList(0, 3);
List<Integer> foo2 = foo.subList(3, 5);
foo2.add(1);
System.out.println(foo1.size());

读者感觉是否挺正常的,没有迭代,没有在多线程下,单纯的给foo1列表添加一个值好像没什么问题。但是在运行后会抛出“ConcurrentModificationException”,这就很气了。博主因为这个问题思来想去,从迭代多线程,每个角度都思考过,后来进入源码一看

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// SubList是ArrayList的内部类
/* parent = ArrayList.this
* offset = 0
* 剩下两个就不用解释了
*/
SubList(AbstractList<E> parent,
int offset, int fromIndex, int toIndex) {
this.parent = parent;
this.parentOffset = fromIndex;
this.offset = offset + fromIndex;
this.size = toIndex - fromIndex;
// 注意,这里从ArrayList.this获取了modCount
this.modCount = ArrayList.this.modCount;
}

在进入add等修改方法后,可以看到最终递增modCount的方法

1
2
3
4
5
6
7
private void ensureExplicitCapacity(int minCapacity) {
// 可以看到,这里递增的只是ArrayList.this的modCount
modCount++;
// overflow-conscious code
if (minCapacity - elementData.length > 0)
grow(minCapacity);
}

所以通过size()add()等方法对SubList内部类的修改会引起外部类的modCount变化,但是其他内部类的modCount (作修改的内部类的modCount会改变)不改变就会产生不一致,所以抛出这个异常

了解AQS之Condition

在实现生产队列和消费队列时,第一个想到的大多数都是Object.wait()Object.notify() ,而今天要介绍一个ReentrantLock 自带的一个工具Condition。它相比Object 在唤醒方面多了一些可控性,阻塞方面多了一个限时功能。两者的等待唤醒机制几乎一致,宏观流程图如下所示:

宏观的Condition和Object的等待唤醒机制

  1. 在当前线程A获取到锁
  2. 调用对象的等待方法(AQS是Condition.await(),Object是Object.wait()),并进入阻塞
  3. 其他线程B获取锁(是不是发现了问题,为什么B线程可以获取锁?)
  4. B线程唤醒阻塞的A线程(AQS是Condition.signal(),Object是Object.notify());B线程释放锁(如果A线程解除阻塞会做什么?)
  5. A线程尝试获取锁(为什么要尝试获取锁?)
  6. A线程执行下文
  7. A线程释放锁

由于Object的实现机制涉及到native方法,所以这里趁机讲解一波AQS的实现过程,我猜测两者的实现原理不会相差太大。这里根据宏观流程图的①、②、③等序号分别进行解释

流程详解

  1. Condition对象调用了await() 方法时,
    *await()*方法的执行细节
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 将当前线程封装成结点,加入Condition队列尾部,类似enq()方法,但是在锁内进行await(),不需要CAS
Node node = addConditionWaiter();
// 释放当前线程持有的所有锁,在这个操作之后很有可能出现竞态条件
int savedState = fullyRelease(node);
int interruptMode = 0;
// 结点在同步队列里时就返回true
// 结点在CONDITION队列里时就返回false
while (!isOnSyncQueue(node)) {
// 当唤醒线程调用unlock()时就会解除该线程的等待
LockSupport.park(this);
/* 第一种情况,该线程被中断,就会将interruptMode设置为THROW_IE
* 第二种情况,该线程被中断且在检查过程中状态改变(比如中断时,被唤醒),就会将mode设置为REINTERRUPT
* 第三种情况,该线程被正常signal(notify),此时结点状态值为0
*/
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 加入获取锁的 Sync队列中,获取成功时判断一下mode类型
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 如果当前结点有后续结点,那么就清理一下链表
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
// 这里判断事抛出异常还是简单的中断
reportInterruptAfterWait(interruptMode);
}

在看到

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21


```java
final boolean isOnSyncQueue(Node node) {
// node是CONDITION状态 或者 是CONDITION队列里的第一个
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
// 该状态一般发生在两个以上Condition处于wait且被唤醒一个
if (node.next != null)
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
// 当前结点是尾结点返回true;反之返回false
return findNodeFromTail(node);
}

  1. 当其他线程唤醒等待线程,因为这部分比较简单就省略了前面的内容,讲一下重要的地方。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    final boolean transferForSignal(Node node) {
    /*
    * 更改node结点的状态;如果更改失败就是已经取消了
    */
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
    return false;
    /*
    * 将当前结点接入Sync队列,让这个结点加入等待;如果结点处于取消状态或者CAS
    * 设置状态失败,都会重新唤醒结点所在的线程,重新来过(在这一步会有一些不影
    * 响整体的小错误)
    */
    Node p = enq(node);
    int ws = p.waitStatus;
    // 如果这边修改状态失败了,那么就直接唤醒线程;否则交给release()方法来唤醒
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
    LockSupport.unpark(node.thread);
    return true;
    }
  2. 阻塞线程被唤醒后,检查结点状态,然后设置interruptMode,最后进入争夺锁的过程里

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
     private int checkInterruptWhileWaiting(Node node) {
    // 判断是否有中断
    return Thread.interrupted() ?
    (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
    0;
    }
    final boolean transferAfterCancelledWait(Node node) {
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
    // 加入Sync队列,这一步做了signal()做的事
    enq(node);
    return true;
    }

    /*
    * 该线程先被signal()后又被中断,我们就等该signal()流程结束。
    * 因为transfer过程不会太长,就选择自选
    */
    while (!isOnSyncQueue(node))
    Thread.yield();
    return false;
    }
1
2
3
4
5
6
7
8
9
10
11
// 这里是await()的后半部分

// 加入锁的获取,因为后半部分还有lock.unlock(),所以必须要重新获取锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
// 如果该线程是被中断的,那么就重新抛出异常
// 如果该线程是被唤醒且中断的,那么就重新设置中断标志,交给程序员自己处理
reportInterruptAfterWait(interruptMode);

总结

AQS的锁机制主要依靠双向的链表,而Condition的等待唤醒机制只需要普通链表即可实现。
这里对整体流程又进行了个总结。

了解AQS之ExclusiveLock

1.1 AQS简介

相比重新造个轮子来管理内部线程状态,AQS提供了一个稳定的、多面(排斥锁,Condition及共享锁)的线程状态管理框架。AQS是一个由FIFO队列构成的同步框架,主要用于构建自定义的同步器,比如ReentrantLock等常见的同步器。它的整体方法流程如下所示:

排斥锁方法流程图

1.2 AQS要点

AQS的结构和CLH(Craig, Landin, anHagersten)自旋锁队列 很像,但AQS是阻塞的队列而CLH是自旋的队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
static class Node{
/**
* 等待状态,用来判断当前的线程是否需要阻塞
*/
volatile int waitStatus;
/**
* 前一个结点
*/
volatile Node prev;
/**
* 后一个结点,当该结点释放了锁后,通知后一个结点
*/
volatile Node next;
/**
* 当前结点所代表的线程
*/
volatile Thread thread;
/**
* 后一个节点,用于condition的情况下
*/
Node nextWaiter;
/**
* 获取前一个结点
*/
predecessor():boolean;
}

1.3 排斥锁

为了方便后续讲解,这边给出场景:

现在有A、B、C、D四个线程同时抢占同一个锁实例。

为了方便讲解, 假设A线程会先抢到锁,我们对A线程获取锁的过程进行解析;B、C、D线程作为结点依次加入等待队列(队列里顺序按B、C、D来,方便点)

1.3.1 上锁

  1. 当调用ReentrantLock.lock() 时,首先会调用AQS的acquire() 方法并传入数值1。

    1
    2
    3
    4
    5
    6
    7
    public final void acquire(int arg) {
    // tryAcquire()是留给子类实现的,先让当前线程尝试获取锁,如果获取锁失败就会将当前线程封装到等待队列里
    // 调用子类的tryAcquire()
    if (!tryAcquire(arg) &&
    acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    selfInterrupt();
    }
  2. ReentrantLock.tryAcquire() 会调用其内部的Sync.nonfairTryAcquire(),并对加锁的个数进行计算。
    非公平锁下的tryAcquire()方法流程图

  • ① 传入的acquires 参数是想请求的锁个数,这个参数的值是由子类决定的,在ReentrantLock 里面这个值均为1
  • ② 在ReentrantLock 里面(不同的实现state 意义不同),state是否为0代表锁是否被持有
  • ③ 当state == 0 时,调用compareAndSetState() 尝试设置state 为1。成功则设置当前线程为锁拥有者,并返回true;否则返回false
  • ④ 当state != 0 时,判断是否是当前线程持有锁
  • ⑤ 当前线程已经持有该锁了,那么重入该锁,state 值递增1,最多重入Integer.MAX_VALUE次。注意,当state > 0 && state != 1 的情况下,释放一次锁是无法完全释放的,只有释放state 次锁,让state 为0,才能完全释放锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 该锁尚未被持有
if (c == 0) {
/* 非公平锁:
* 将state设置为acquires的值,谁能抢到就是谁的
* 想不通的这样想:头结点唤醒了后继结点
* 此时另外一个节点插了进来,发生竞争,可能新插进来的结点获取到锁
* 也可能是后继结点获取到锁
* 公平锁:
* 将锁让给等待时间最长的结点
* 一般来说是让给头结点的后继结点(因为FIFO)
*/
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
  1. 到这一步,A线程顺利拿到锁去执行它的任务了,而B、C、D三线程就会因为tryAcquire() 返回false而执行后续的内容:addWaiter()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private Node addWaiter(Node mode) {
// 创建结点,mode用来控制是共享锁、还是排斥锁
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
// 是否存在尾结点
if (pred != null) {
// 新结点放在尾结点后端
node.prev = pred;
// 重新设置尾结点(原子操作)
if (compareAndSetTail(pred, node)) {
/* 尾结点的next域指向新结点
* 这里会发生竟竞态条件?
* 竞态条件典型的一个例子就是“条件判断失效”
* 因为CAS的原子性,条件不会失效,保证一次只有一个线程执行成功
* 注意CAS无法保证ABA问题
*/
pred.next = node;
return node;
}
}
// 第一次发生竞争(或者设置尾结点竞争失败)进入enq()
enq(node);
return node;
}
  1. 因为B、C、D线程均会进入未初始化的等待队列,所以至少有一个结点会进入enq() 方法。enq() 方法很简单,就是创建一个空结点作为头结点来初始化等待队列(有人可能会问那A线程呢?不用封装成结点吗?别急,往下看),其余CAS 竞争失败的结点插入尾结点后
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
// Must initialize
/* 为什么要在这里初始化,而不是一开始就初始化队列?
* 因为如果没有发生竞争,就永远不会产生等待队列
* 所以将初始化放到产生竞争的时候
*/
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 要入队的结点插入尾结点
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

在addWaiter结束后:
addwaiter结束

  1. addWaiter() 方法结束后,准备通过acquireQueued() 开始尝试获取锁。B结点获取新建的空结点(即上文新建的头结点),然后B线程tryAcquire() ,众所周知,A线程正持有锁,所以B线程tryAcquire() 失败,然后进入shouldParkAfterFailedAcquire()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取当前结点的前驱结点
final Node p = node.predecessor();
// 判断p是否是头结点,如果p是头结点,尝试获取
// 否者根据p判断变量node是否需要阻塞
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
/*
* 设置结点状态为CANCEL
* 当获取锁时抛出了异常或者超出时限
* 那么就为当前结点设置CANCELLED
*/
cancelAcquire(node);
}
}
  1. 该方法主要根据前驱结点判断当前结点是否需要阻塞,因为当前所有结点的状态都是初始化状态(ws为0)。每个结点在第一次失败后,都会进入else块(除非执行acquireQueued()里的tryAcquire() 成功)。else块 里会将前驱结点设置为SIGNAL 状态,暗示下一次你要是还是获取失败,你就乖乖阻塞吧
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node){
int ws = pred.waitStatus;
// 前驱结点pred是SIGNAL状态,所以需要让当前结点node阻塞
if (ws == Node.SIGNAL)
return true;
// 前驱结点pred是CANCELLED状态
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
// 前驱结点pred状态是0或者PROPAGATE
} else {
// 更新前驱结点pred的状态为SIGNAL,在下一次尝试获取锁的时候对该结点进行阻塞
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

shouldParkAfterFailedAcquire后

7.此时除了尾结点外,其他结点的waitStatus域 均为SIGNAL。但除了头结点(Empty结点)外的所有结点包含的线程都处于阻塞状态,并等待它们各自的前驱结点来唤醒自己

1.3.2 释放锁

本节内容将会连接着1.3.1节的内容,前面讲到除了那个空的头结点外,其他结点包含的线程都发生了阻塞。那么A线程不在队列内该如何唤醒B呢?

  1. 当A线程调用了ReentrantLock.release() (其实是AQS的release)
1
2
3
4
5
6
7
8
9
10
public final boolean release(int arg) {
// 调用子类的实现
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
protected final boolean tryRelease(int releases) {
// 计算当前线程持有锁的个数
int c = getState() - releases;
// 防止未持有锁的线程瞎搞
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
  1. 细心的小伙伴可能已经注意到tryRelease() 传入的参数,如果state域 减去 releases参数 不为0,依然无法释放锁。这个特性和CountDownLatch很像,当持有的数字为0时才能释放。

  2. 假设现在tryRelease() 返回true,接下来就判断头结点是否为空且状态是否为0(如果为0,代表后面没有结点需要唤醒)。如果B结点运行到acquireQueued()里tryAcquire() 和 shouldParkAfterFailedAcquire() 间时,A线程调用了release() 会发生问题吗?
    不会,因为在这种情况下头结点的waitStatus域 如果是0,B结点还有机会可以再次tryAcquire() ,如果是SIGNAL,那就对头结点调用后续的unparkSuccessor()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* 获取头结点后面的结点s
* 如果结点s是null或者s已经被取消了
* 就从后往前遍历,直到找到一个满足条件的结点
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
// 不用担心被唤醒的结点不是头结点,在acquireQueue方法里会重新设置
if (t.waitStatus <= 0)
s = t;
}
// 唤醒结点
if (s != null)
LockSupport.unpark(s.thread);
}
  1. 在A线程释放了锁后。如果没有新的线程要竞争,那么锁不出意外就是B线程的;否则,在非公平锁的实现里,鹿死谁手还不知道(公平锁里不出意外也一定是B线程的)

1.3.3 取消/中断

本节介绍方法cancelAcquire(),该方法都出现在最后的finally块中,而且需要failed域 为true(获取锁失败),才会进入cancelAcquire() ,所以最终需要取消的结点,要么是定时器到时间了,要么是线程被中断了。废话不多说,看初始图。

设置取消状态
图表解释:

  • A结点:采用lock()方法,并且获取锁成功
  • B结点:采用tryLock()方法,并设置好10秒的期限,一到10秒就取消获取锁,当前处于阻塞状态
  • C结点:操作同B线程
  • D结点:采用lock()方法,获取锁失败,处于阻塞状态
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/**
* 10s过后,假设A线程仍持有锁,而B线程的tryLock()到期,那么B线程因为
* throw new InterruptedException()而跳出for循环进入finally块,准备执行
* cancelAcquire()
* PS:此情况下C尚未到期,因为方便写:)
*/
private void cancelAcquire(Node node) {
if (node == null)
return;
node.thread = null;
// 获取当前结点的前驱结点
Node pred = node.prev;
/* 判断前驱结点是否为取消状态;
* 如果是取消状态的话,就一直向前查找,直到找到非CANCELLED状态的结点
*/
// 本示例里pred是A结点
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;

Node predNext = pred.next;
/* 不管三七二十一,来了这个方法,这个结点就注定要设置成CANCELLED
* 那会不会有其他线程同时对这个结点的状态进行操作?
* 有也没关系,它的顺序无论在其他CAS写操作之前还是之后最终都会设置为
*/
CANCELLED(注意是“CAS”写操作)
// ①到这一步,本示例里B结点的ws转为CANCELLED
node.waitStatus = Node.CANCELLED;
// 如果当前结点是尾结点,那么就将predNext(从后到前第一个非CANCELLED状态的结点)设置为尾结点
// 本示例里B结点不是尾结点,跳过if进入else
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
// 如果pred不是头结点,就设置状态为SIGNAL
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&pred.thread != null) {
Node next = node.next;
/* 当前结点是否有后继结点
* 若有后继结点且不是CANCELLED状态,则将后继结点和pred相连
*/
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
/*
* 如果pred是头结点就唤醒后继的非CANCELLED结点
*/
} else {
// ② 唤醒后面的非CANCELLED状态的结点
unparkSuccessor(node);
}
/* 很骚的操作,一般都是设置null来help gc,这里采用循环引用当新一波的
* tryAcquire()之后,会将CANCELLED的引用清理掉,所以最后就变成不可达的
* 引用,自然就被gc了
*/
node.next = node;
}
}

流程图大致是这样的:
最后的状态

到这里,整个cancelAcquire() 的流程结束了,最后的unparkSuccessor() 方法最终也不是为了唤醒C,具体的可以参考前面的release()一节

1.4 lock的其他方式

ReentrantLock 除了lock() 之外还有tryLock()lockInterruptibly()

  • tryLock(long, TimeUnit) -> 在一定时间内没有获得锁就放弃获取锁
  • lockInterruptibly() -> 获取锁的过程中可以被中断

1.4.1 tryLock

  1. 调用tryLock()

    1
    2
    3
    4
    5
    // ReentrantLock
    public boolean tryLock(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
  2. 先检查中断状态,然后至少调用一次tryAcquire()。个人理解:如果当前锁没有发生竞争,不需要再额外创建等待队列。所以预先判断锁是否被持有,可以降低资源消耗

1
2
3
4
5
6
7
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
  1. 没有什么特别之处,再acquireQueued() 基础上加了时限。唯一注意的点就是这里用到了自旋,在时间没超过spinForTimeoutThreshold域 之前,一直处于for循环(其实也就1000纳秒的时间处于自旋)。当超过自旋时间,进入LockSupport.parkNanos(),故名思意,不多介绍,想了解的可以看我的LockSupport文章
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
// 设置的时间小于0,直接返回
if (nanosTimeout <= 0L)
return false;
// 期限,绝对时间
final long deadline = System.nanoTime() + nanosTimeout;
// 创建一个新的结点,注意EXCLUSIVE
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
// 如果当前时间超过了时限,就当获取锁失败,返回false
if (nanosTimeout <= 0L)
return false;
/* 前面和acquireQueued() 一样,后面是允许自旋的时间
* 再没达到spinForTimeoutThreshold前,一直处于循环
*/
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
// 限时阻塞的主要逻辑
LockSupport.parkNanos(this, nanosTimeout);
// 可以中断
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

1.4.2 lockInterruptibly

原本计划着要写,但是内容差不多,为了缩短篇幅,就算了。读者有兴趣可以去看看。

1.5 小栗子

占坑,以后想到了来写

1.6 总结

AQS总体给人的感觉就是提供了一个管理线程状态的框架,而这个框架是基于先进先出的链式队列,而这个队列主要是以阻塞为主,和CLH以自旋为主的队列不同,因为暂时没接触太多的并发项目,想写关于AQS的小栗子也没什么头绪,干脆在这里留个坑,以后有了回来填。

深入Unsafe

在JUC(并发包)里经常会使用到Unsafe这个类,那么了解这个类,就成为了下面学习的重中之重

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
/**
* 注意如果是获取基础变量类型,那么所有方法都要调用对应基础变量类型的方法,不能使用Object代替全部,因为C语言底层是没有包装类型的
*/

/*------------------对象操作------------------*/

/**
* 返回一个字段的偏移量
* PS:注意区分objectFieldOffset和staticFieldOffset,一个操作的参数是对象实例,一个是Class对象
* 不要对该偏移量进行任何处理,该偏移量会被传给不安全的堆内存里
*
* @param f Class文件里面的字段
*/
long objectFieldOffset(Field f);

/**
* 获取变量时,具有volatile语义
* PS:如果是获取静态变量,那么Object就要用Class;获取非静态变量,那么Object就要用对应的Object
*
* @param o 待获取的对象,要区分Class和Object
* @param offset 对象里某个位置的偏移量,@see #objectFieldOffset(Field)
*/
Object getObjectVolatile(Object o, long offset)

/**
* 为Object实例设置一个值,具有volatile的语义
*
* @param o 目标实例
* @param offset 对应字段的偏移量
* @param x 要赋予的值
*/
void putObjectVolatile(Object o, long offset, Object x);

/**
* 获取静态字段的偏移量
* PS:注意区分objectFieldOffset和staticFieldOffset,一个操作的参数是对象实例,一个是Class对象
* 不要对该偏移量进行任何处理,该偏移量会被传给不安全的堆内存里
*
* @param f Class文件里的静态字段
*/
long staticFieldOffset(Field f);

/**
* 延迟设置属性
* 对于volatile属性来说是立即生效的。
* 详情见putOrderedObject小节
*
* @see #putObjectVolatile
*/
void putOrderedObject(Object o, long offset, Object x);

/**
* 非阻塞锁的属性交换
* 比较字段的值和原来的是否一样,如果一样就进行设置
*
* @param o 要操作的实例对象
* @param offset 要操作的字段
* @param expected 原来的值
* @param x 新的值
* @return 设置是否成功
*/
boolean compareAndSwapInt(Object o, long offset, int expected,
int x);

/*------------------数组操作------------------*/

/**
* 获取array第一个元素的偏移量
* 该数值配合arrayIndexScale使用
* 通过arrayBaseOffset + arrayIndexScale * index 获取对应下标的元素
* arrayBaseOffset在64位下,都是16(没有开启指针压缩)
* arrayIndexScale根据不同的类型,有不同的值,比如double是8,int是4
*
* @param arrayClass 数组的Class的类型,比如A[].class
*/
int arrayBaseOffset(Class arrayClass);

/**
* 获取数组元素的增量
*
* @arrayClass 数组的Class类型
*/
int arrayIndexScale(Class arrayClass);

/*------------------锁相关操作------------------*/

/** 锁对象 */
void monitorEnter(Object o);

/** 解锁对象 */
void monitorExit(Object o);

/*------------------线程相关操作------------------*/

/**
* block当前线程
* 在以下几种情况下会结束阻塞并返回
* 1. 在park前调用unpark或者在park后调用unpark均可立即返回
* 2. 当前线程被中断(interrupte)
* 3. 阻塞时间超过给定值
* 4. ??没有任何原因的返回??
* @param isAbsolute true则使用Epoch来表示;false则使用相对时间
* @param time 根据isAbsolute来决定,若为true,表示绝对时间(epoch、unix时间戳)单位毫秒;若为false,表示相对时间(相对现在多少纳秒以后返回),单位纳秒;若time为0,表示永远阻塞
*/
void park(boolean isAbsolute, long time);

/**
* unblock指定的线程
* 当unblock不同状态的线程有不同的现象:
* 当指定线程已经阻塞,那么unblock指定的线程
* 当指定线程尚未阻塞,那么指定线程的下一次park将不会产生阻塞
*
* @param thread 请确保thread不为空
*/
void unpark(Object thread);

putOrderedObject整理

这方法纠结了我很久,因为doc上很简略,大体就是讲对非volatile变量的修改对其他线程不会立即生效

  1. 那么这货有什么用?
  2. 这货和普通写入有什么区别吗?(因为普通写入对其他线程也不会立即生效)

下面的Bug Report是为什么添加“lazySet”的初衷:

Bug_6275329 By Doug Lea

“As probably the last little JSR166 follow-up for Mustang,
we added a “lazySet” method to the Atomic classes
(AtomicInteger, AtomicReference, etc). This is a niche
method that is sometimes useful when fine-tuning code using
non-blocking data structures. The semantics are
that the write is guaranteed not to be re-ordered with any
previous write, but may be reordered with subsequent operations
(or equivalently, might not be visible to other threads) until
some other volatile write or synchronizing action occurs).

我们添加了“lazySet”一方法给AtomicX这些类(比如AtomicInteger、AtomicReference等)。这个方法在使用非阻塞数据结构调整代码的情况下很有用。它的作用主要表现在 保证写入前的操作的不会被重排序,写入后的操作可能会被重排序(换言之就是写入后对其他线程不会立即可见) 直到有其他的volatile写入或者同步操作发生。

The main use case is for nulling out fields of nodes in
non-blocking data structures solely for the sake of avoiding
long-term garbage retention; it applies when it is harmless
if other threads see non-null values for a while, but you’d
like to ensure that structures are eventually GCable. In such
cases, you can get better performance by avoiding
the costs of the null volatile-write. There are a few
other use cases along these lines for non-reference-based
atomics as well, so the method is supported across all of the
AtomicX classes.

这个方法的主要用途是 在非阻塞数据结构中单独空出结点,来降低长时间的垃圾滞留问题来的性能问题;这能运用在就算其他线程看到非空值也没关系的情况下。但是你要确保这个结构确实能够被GC。在这种情况下,你能通过避免null值的写入,来获得更好的性能。

For people who like to think of these operations in terms of
machine-level barriers on common multiprocessors, lazySet
provides a preceeding store-store barrier (which is either
a no-op or very cheap on current platforms), but no
store-load barrier (which is usually the expensive part
of a volatile-write).”

对于那些在通用多处理器下需要考虑底层机器屏障的人来说,“lazySet”提供了预Store-Store屏障(对于现在的平台来说,要么无操作,要么代价不大),但是没有Store-Load屏障(通常需要付出较高的代价)

PS:因为“lazySet”其实就是“volatile”的削弱版,所以叫“weak volatile”也挺符合的。


“weak volatile”的场景:

“weak volatile”适用于对实时性要求不高的场景,该方法可以较大的节省性能消耗

  1. 一个链表中的某个节点被修改了,在volatile的情况下,整个链表都会一定被强制更新;而在”weak volatile”的情况下,不被强制更新,节省部分性能。
  2. 在修改帖子状态的时,如果对性能要求很高,可以使用“lazySet”保证写入内存,但是对实时性的要求不高

具体的其他场景可以看下面的链接(表示第一个回答没看懂,功力深了后再来)

  1. AtomicXXX.lazySet(…) in terms of happens before edges —-StackOverflow

“weak volatile”的原理

volatile的工作原理是插入屏障,在写入的时候插入StoreStore,写完后插入StoreLoad,根据Doug Lea的说法,StoreLoad相比StoreStore更消耗资源
而“weak volatile”则是省去了最后的StoreLoad的步骤,相比插入两个屏障,“weak volatile”性能提升了不少

更加具体的底层原理可以看下面的链接

  1. JUC中Atomic class之lazySet的一点疑惑—-并发编程网

了解LockSupport的原理

在学习AQS的出现的LockSupport,秉着知根知底的想法,决定顺带着了解一下LockSupport。

LockSupport是什么

一个可以创建锁、同步器的基础线程阻塞的底层工具。其工作方式和Semaphore十分相似:

  1. 调用 LockSupport.park() ,将在调用线程中消耗permit。如果permit可用,那么就会立即返回;否则阻塞调用线程
  2. 调用 LockSupport.unpark() ,将会使得permit可用(注意,这方面和Semaphore不同,Semaphore.release() 可以叠加permit,而LockSupport.unpark() 至多只有一个permit

LockSpport的优势

针对线程的阻塞和解除阻塞,我们很容易想到Thread.suspend()Thread.resume() 两个方法,然而这两个方法因为天生容易发生死锁(suspend()方法原理是暂停JVM,而暂停JVM是不稳定不安全的,所以容易发生问题)从而被抛弃使用。

具体可以看我的《为什么Thread.suspend和resume被弃用》一文

Object.wait()Object.notify() 使用起来过于麻烦,因为要放在synchronized 块里

所以 LockSupport 就成为了很好的替代品

使用参数注意事项

  • blocker
    • blocker在这个类里面主要起到一个线索作用,告诉我们是哪个对象阻塞了当前的线程,方便dump日志进行分析
  • isAbsolute
    • 如果isAbsolute 为true,就是使用Epoch(Unix时间戳),在LockSupport 里面对应的方法是parkUntil();
    • 如果isAbsolute 为false,就是使用相对时间(单位纳秒),在LockSupport里面对应的方法是parkNanos()

了解HashMap原理

对于写了这么久的HashMap,对其知之甚少,趁大三有点闲时,打算深入了解下HashMap的原理。

  1. HashMap、HashTable之间有什么关系
  2. HashMap在put、remove的时候做了什么
  3. resize(再哈希)是什么
  4. 为什么每次扩容,容量都是原来的二次方

HashMap和HashTable间的关系

HashMap和HashTable间的差异:

  1. HashMap是线程不安全的,HashTable几乎都加上了方法级别的Synchronize,所以同一时间最多只有一个线程可以对同个方法进行调用
  2. HashMap允许null键和null值,HashTable遇到null值会抛出NPE

    HashTable的源码分析戳这里

    HashMap再put、remove的时候做了什么

JDK6

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
/**
* 第一次注意到put有返回值
* 如果put之前key已经存在和值X的对应关系,则返回值X
* 如果put之前key没有对应关系,则返回null
*/
public V put(K key, V value) {
// 当key为null时,特殊处理,这是和HashTable不一样的地方
// HashMap允许key、value为null
if (key == null)
return putForNullKey(value);
// hash一下,这里用到的是扰动函数
int hash = hash(key.hashCode());
// 获取该hash值的bucketIndex
int i = indexFor(hash, table.length);
// 遍历对应bucktIndex位置的Entry
for (Entry<K,V> e = table[i]; e != null; e = e.next) {
Object k;
// 比较该bucktIndex下的所有Entry
// 比较hash值、比较key是否相等(从值和地址两方面进行比较)
// 为什么要再比较次hash值
if (e.hash == hash && ((k = e.key) == key || key.equals(k))) {
V oldValue = e.value;
e.value = value;
e.recordAccess(this);
return oldValue;
}
}
// fail-fast的计数器
modCount++;
// 添加新值
addEntry(hash, key, value, i);
return null;
}
private V putForNullKey(V value) {
// 从Entry数组下标为0的位置开始一一遍历
for (Entry<K,V> e = table[0]; e != null; e = e.next) {
// 是否存在这么一个entry,其key为null
// 如果存在则替换新值
if (e.key == null) {
V oldValue = e.value;
e.value = value;
// 钩子函数
// 可以看一下LinkedHashMap
e.recordAccess(this);
return oldValue;
}
}
// 计数器加一
modCount++;
// 新添加一个键值对
addEntry(0, null, value, 0);
return null;
}
void addEntry(int hash, K key, V value, int bucketIndex) {
// 获取下该bucketIndex下的Entry
Entry<K,V> e = table[bucketIndex];
// 产生了hash冲突。将Old Entry连接到New Entry后面
table[bucketIndex] = new Entry<K,V>(hash, key, value, e);
// 判断是否要再hash
if (size++ >= threshold)
// 因为capacity本身就是2的n次方,见HashMap的初始方法
// 所以每次扩大,只需要*2
resize(2 * table.length);
}
void resize(int newCapacity) {
Entry[] oldTable = table;
int oldCapacity = oldTable.length;
if (oldCapacity == MAXIMUM_CAPACITY) {
threshold = Integer.MAX_VALUE;
return;
}

Entry[] newTable = new Entry[newCapacity];
// 移动数组
transfer(newTable);
table = newTable;
threshold = (int)(newCapacity * loadFactor);
}
void transfer(Entry[] newTable) {
Entry[] src = table;
int newCapacity = newTable.length;
// 遍历全部的Entry
for (int j = 0; j < src.length; j++) {
Entry<K,V> e = src[j];
// 如果该BucketIndex存在数值
if (e != null) {
src[j] = null;
do {
// 保存下一个Entry
Entry<K,V> next = e.next;
// 获取该e对象在新entry数组的index
int i = indexFor(e.hash, newCapacity);
// 是否发生hash冲突,如果有,就接到e对象的后面,没有就是null
e.next = newTable[i];
// 将e设置到对应的位置
newTable[i] = e;
e = next;
} while (e != null);
}
}
}

JDK7

在JDK6的基础上,没有做特别的改变,倒是去除了modCount的volatile关键字(1.7取消了modCount的volatile)。

移除volatile的原因:
对于单线程集合,没必要承担volatile的额外消耗;多线程情况下,也不应该使用单线程集合,所以在hashmap里面移除volatile

移除Volatile的原文

JDK8

需要红黑树的知识,待学习

再哈希

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* 扩容,根据key的hash和数组总长度重新hash获取新的BucketIndex,然后存放
* 因为不是线程安全的,再哈希可能会发生死循环
*/
void resize(int newCapacity) {
Entry[] oldTable = table;
int oldCapacity = oldTable.length;
if (oldCapacity == MAXIMUM_CAPACITY) {
threshold = Integer.MAX_VALUE;
return;
}

Entry[] newTable = new Entry[newCapacity];
// 移动数组
transfer(newTable);
table = newTable;
threshold = (int)(newCapacity * loadFactor);
}

每次扩容,容量都是原来的二次方

容量都是2的N次,主要是为了下面的

% length```,先保证不超过最大长度,其次保证hash坐落的位置。但是取模运算是所有运算符里较为繁杂的运算指令。大师们为了更高效,就不得不采用位运算符并让```hash```的每位都参与到运算中。这里放出式子```hash & (length - 1)```
1
2
3
4
5
6
7
8

```java
/**
* Returns index for hash code h.
*/
static int indexFor(int h, int length) {
return h & (length-1);
}

可以看一下,如果容量的长度不是2的幂次方

1
2
3
length: 1000 0000
&
hash: 0110 1111

上述例子实际可存放的位置就只有一个: 1000 0000 ,会有大量的空间被浪费,

1
2


length: 1111 1111
&
hash: 0110 1111

1
但是这样的容量很难通过一次移位获取,所以实际情况应该是2的N次方 - 1,尽可能的将1布满。如下所示

length: 0111 1111
&
hash: 0110 1111
`
通过移位获取最大的2的N次方,减去1,获得最可能的理想情况,允许存在128个位置。一个是256,一个是255,前者只能存放1个位置,后者可以存放128个位置,其高效可想而知。

所以容量是2的幂次方是为了减少空间的浪费,降低hash冲突的几率

为什么Thread.suspend和resume被弃用

笔者在学习JUC包的时候,了解到同样用于线程阻塞和恢复功能的类—— LockSupport,那么这个类和Thread.suspend()Thread.resume() 有什么区别,为什么后者被弃用了?

阻塞小例子

该例子(来源于Bug Database)很好的验证了Thread.suspend()/resume()这对兄弟天生容易犯错,在输出几个i之后就会发生永久性阻塞(通过jstack看,只能判断其是阻塞,无法判断是死锁)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/* ---------Thread.suspend 和 resume发生“死锁”的示例--------- */
public static void main(String[] args) throws InterruptedException {
TestTask mainThread = new TestTask();
mainThread.start();
while(true){
if(!mainThread.isTalking){
// LockSupport.unpark(mainThread);
mainThread.resume();
mainThread.isTalking = true;
}
}
}

static class TestTask extends Thread {
volatile boolean isTalking = true;
int i = 0;

public void run() {
while (true) {
if (isTalking) {
i++;
isTalking = false;
// LockSupport.park();
suspend();
System.out.println(i);
}
}
}
}

弃用原因

在这个方面,笔者上网了解过很多,许多人都是泛泛而谈,甚至是随手复制粘贴,质量低的可怕,完全无法了解到为什么它被弃用了。笔者最后在Bug Database上了解到了原因:

  1. Thread.suspend()和Thread.resume() 发生阻塞的原因主要在于底层JVM在暂停时候的不安全
  2. 就算花大力气修复了Thread.suspend()和Thread.resume(),问题也会转移到应用层级别(这一点暂时无法理解) 。

Thread.suspend()Thread.resume() 被弃用的原因

有能力的朋友们可以直接看原文,可以一起交流一下看法。

替代方案

  1. Object.wait()Object.notify()/notifyAll(),该方法需要在synchronized()中使用,极其麻烦
  2. LockSupport 粒度小,使用方便

小栗子

暂时没想法,后续补上