1.1 AQS简介
相比重新造个轮子来管理内部线程状态,AQS提供了一个稳定的、多面(排斥锁,Condition及共享锁)的线程状态管理框架。AQS是一个由FIFO队列构成的同步框架,主要用于构建自定义的同步器,比如ReentrantLock等常见的同步器。它的整体方法流程如下所示:
1.2 AQS要点
AQS的结构和CLH(Craig, Landin, anHagersten)自旋锁队列 很像,但AQS是阻塞的队列而CLH是自旋的队列。
1 | static class Node{ |
1.3 排斥锁
为了方便后续讲解,这边给出场景:
现在有A、B、C、D四个线程同时抢占同一个锁实例。
为了方便讲解, 假设A线程会先抢到锁,我们对A线程获取锁的过程进行解析;B、C、D线程作为结点依次加入等待队列(队列里顺序按B、C、D来,方便点)
1.3.1 上锁
当调用ReentrantLock.lock() 时,首先会调用AQS的acquire() 方法并传入数值1。
1
2
3
4
5
6
7public final void acquire(int arg) {
// tryAcquire()是留给子类实现的,先让当前线程尝试获取锁,如果获取锁失败就会将当前线程封装到等待队列里
// 调用子类的tryAcquire()
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}ReentrantLock.tryAcquire() 会调用其内部的Sync.nonfairTryAcquire(),并对加锁的个数进行计算。
- ① 传入的acquires 参数是想请求的锁个数,这个参数的值是由子类决定的,在ReentrantLock 里面这个值均为1
- ② 在ReentrantLock 里面(不同的实现state 意义不同),state是否为0代表锁是否被持有
- ③ 当state == 0 时,调用compareAndSetState() 尝试设置state 为1。成功则设置当前线程为锁拥有者,并返回true;否则返回false
- ④ 当state != 0 时,判断是否是当前线程持有锁
- ⑤ 当前线程已经持有该锁了,那么重入该锁,state 值递增1,最多重入Integer.MAX_VALUE次。注意,当state > 0 && state != 1 的情况下,释放一次锁是无法完全释放的,只有释放state 次锁,让state 为0,才能完全释放锁
1 | final boolean nonfairTryAcquire(int acquires) { |
- 到这一步,A线程顺利拿到锁去执行它的任务了,而B、C、D三线程就会因为tryAcquire() 返回false而执行后续的内容:addWaiter()
1 | private Node addWaiter(Node mode) { |
- 因为B、C、D线程均会进入未初始化的等待队列,所以至少有一个结点会进入enq() 方法。enq() 方法很简单,就是创建一个空结点作为头结点来初始化等待队列(有人可能会问那A线程呢?不用封装成结点吗?别急,往下看),其余CAS 竞争失败的结点插入尾结点后。
1 | private Node enq(final Node node) { |
在addWaiter结束后:
- 在addWaiter() 方法结束后,准备通过acquireQueued() 开始尝试获取锁。B结点获取新建的空结点(即上文新建的头结点),然后B线程tryAcquire() ,众所周知,A线程正持有锁,所以B线程tryAcquire() 失败,然后进入shouldParkAfterFailedAcquire()
1 | final boolean acquireQueued(final Node node, int arg) { |
- 该方法主要根据前驱结点判断当前结点是否需要阻塞,因为当前所有结点的状态都是初始化状态(ws为0)。每个结点在第一次失败后,都会进入else块(除非执行acquireQueued()里的tryAcquire() 成功)。else块 里会将前驱结点设置为SIGNAL 状态,暗示下一次你要是还是获取失败,你就乖乖阻塞吧
1 | private static boolean shouldParkAfterFailedAcquire(Node pred, Node node){ |
7.此时除了尾结点外,其他结点的waitStatus域 均为SIGNAL。但除了头结点(Empty结点)外的所有结点包含的线程都处于阻塞状态,并等待它们各自的前驱结点来唤醒自己
1.3.2 释放锁
本节内容将会连接着1.3.1节的内容,前面讲到除了那个空的头结点外,其他结点包含的线程都发生了阻塞。那么A线程不在队列内该如何唤醒B呢?
- 当A线程调用了ReentrantLock.release() (其实是AQS的release)
1 | public final boolean release(int arg) { |
1 | protected final boolean tryRelease(int releases) { |
细心的小伙伴可能已经注意到tryRelease() 传入的参数,如果state域 减去 releases参数 不为0,依然无法释放锁。这个特性和CountDownLatch很像,当持有的数字为0时才能释放。
假设现在tryRelease() 返回true,接下来就判断头结点是否为空且状态是否为0(如果为0,代表后面没有结点需要唤醒)。如果B结点运行到acquireQueued()里tryAcquire() 和 shouldParkAfterFailedAcquire() 间时,A线程调用了release() 会发生问题吗?
不会,因为在这种情况下头结点的waitStatus域 如果是0,B结点还有机会可以再次tryAcquire() ,如果是SIGNAL,那就对头结点调用后续的unparkSuccessor()
1 | private void unparkSuccessor(Node node) { |
- 在A线程释放了锁后。如果没有新的线程要竞争,那么锁不出意外就是B线程的;否则,在非公平锁的实现里,鹿死谁手还不知道(公平锁里不出意外也一定是B线程的)
1.3.3 取消/中断
本节介绍方法cancelAcquire(),该方法都出现在最后的finally块中,而且需要failed域 为true(获取锁失败),才会进入cancelAcquire() ,所以最终需要取消的结点,要么是定时器到时间了,要么是线程被中断了。废话不多说,看初始图。
图表解释:
- A结点:采用lock()方法,并且获取锁成功
- B结点:采用tryLock()方法,并设置好10秒的期限,一到10秒就取消获取锁,当前处于阻塞状态
- C结点:操作同B线程
- D结点:采用lock()方法,获取锁失败,处于阻塞状态
1 | /** |
流程图大致是这样的:
到这里,整个cancelAcquire() 的流程结束了,最后的unparkSuccessor() 方法最终也不是为了唤醒C,具体的可以参考前面的release()一节
1.4 lock的其他方式
ReentrantLock 除了lock() 之外还有tryLock()和lockInterruptibly()。
- tryLock(long, TimeUnit) -> 在一定时间内没有获得锁就放弃获取锁
- lockInterruptibly() -> 获取锁的过程中可以被中断
1.4.1 tryLock
调用tryLock()
1
2
3
4
5// ReentrantLock
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}先检查中断状态,然后至少调用一次tryAcquire()。个人理解:如果当前锁没有发生竞争,不需要再额外创建等待队列。所以预先判断锁是否被持有,可以降低资源消耗
1 | public final boolean tryAcquireNanos(int arg, long nanosTimeout) |
- 没有什么特别之处,再acquireQueued() 基础上加了时限。唯一注意的点就是这里用到了自旋,在时间没超过spinForTimeoutThreshold域 之前,一直处于for循环(其实也就1000纳秒的时间处于自旋)。当超过自旋时间,进入LockSupport.parkNanos(),故名思意,不多介绍,想了解的可以看我的LockSupport文章
1 | private boolean doAcquireNanos(int arg, long nanosTimeout) |
1.4.2 lockInterruptibly
原本计划着要写,但是内容差不多,为了缩短篇幅,就算了。读者有兴趣可以去看看。
1.5 小栗子
占坑,以后想到了来写
1.6 总结
AQS总体给人的感觉就是提供了一个管理线程状态的框架,而这个框架是基于先进先出的链式队列,而这个队列主要是以阻塞为主,和CLH以自旋为主的队列不同,因为暂时没接触太多的并发项目,想写关于AQS的小栗子也没什么头绪,干脆在这里留个坑,以后有了回来填。