起因

蚂蚁二面,问到看过什么源码,一时有点紧张,就说自己看过JUC下的AQS源码,结果问起来的时候只能说出来state和CLH队列,再深究就不知道了。因此吸取教训,尝试手撕一下AQS的代码,下次再问到JUC或者源码相关的问题就不怕了。

整篇文章分为以下部分:

  1. What is AQS 首先对AQS进行了一个大概的介绍,说明了为什么会存在这样的类。
  2. Basic 以独占锁版本的普通拿锁释放方法,说明了AQS最核心的逻辑。
  3. Interruptibly 介绍独占锁版本可打断拿锁的实现。
  4. Timeout 接受独占锁版本带超时拿锁的实现。
  5. Shared 介绍另一套共享锁获取的实现。
  6. Cancel 介绍节点取消机制的原因和作用。

What is AQS

为什么要有

AQS是AbstactQueuedSynchronizer类的简称,是一个抽象类,采用模板方法的设计模式,将实现锁常用的公共部分抽取出来,形成了这样一个抽象类。很多JUC中其他的锁和类似锁的实现,都是集成AQS,复用代码完成的。

既然说AQS是模板方法模式中的抽象类,先要知道锁有哪些公共的实现,可以被抽取出来,以及不同的锁的实现区别体现在什么地方。不管是什么样的锁,都需要一个阻塞等待队列(对应monitor锁的entry-list),这样多个线程在竞争锁的时候,竞争失败的线程可以进入队列中等待,直到持有锁的线程将锁release。因此AQS就将这样一件事抽象出来,形成公共的操作模板:在获取锁失败时,线程进入等待队列并阻塞;持有锁的线程释放时,会唤醒等待队列中的下一个阻塞的线程,开始重新争抢锁。不同的锁实现上的区别无非在于,什么情况下获取锁可以成功,什么时候获取锁会失败,线程需要阻塞。

当然上面说的只是AQS核心基础功能,本身还有一些其他丰富功能:可支持可打断的获取(获取不到锁时进入阻塞状态,但是可以被打断,结束阻塞);支持带有超时时间的获取(阻塞状态最多维持一个超时时间)。而且,这样一套功能还分为两个版本:独占版和共享版。另外,独占版中还支持在条件变量上的等待。

怎么使用

抽象类,无法直接实例化使用。可以使用一些常用的基于AQS的锁实现:ReentrantLock、CountDownLatch、Semaphore等,也可以自定义子类进行实现,创建自己的锁。

自己定义新的锁类型时,只需要继承AQS类,然后重写任意一套方法:

  • tryAcquire & tryRelease 对应独占锁版本。
  • tryAcquireShared & tryReleaseShared 对应共享锁版本。

自己的锁就可以使用了,就拥有了一个拥有完整功能的锁(阻塞获取、可打断获取、可超时获取、等待条件变量)。当然也可以两套都重写,就拥有了一个既可以共享也可以独占使用的锁。有时候为了方便使用,还需要自己实现isHeldExclusively方法。

源码结构

AQS的源代码文件分为以下几个部分:

  • CLH队列节点类定义(内部类)
  • 3个成员变量:state状态、head链表头结点引用、tail链表尾节点引用
  • 核心方法:包含public的,供客户使用的方法;protected,供子类使用的方法;以及一些辅助的private方法。
  • 队列观察方法:获得CLH队列的各种信息
  • Conditions支持方法
  • ConditionObject定义
  • 底层CAS操作的支持静态变量、魔法类unsafe定义

Basic of AQS

AQS最基本的是独占版本的普通阻塞获取方法的实现,先把这个方法的实现思路理清,其他也就比较好办了。因此,下面先关注AQS中和基础功能相关的内容。

Node

CLH队列的节点。CLH队列时一种先进先出,可以理解为双向链表实现的队列。每个节点保存的最关键的信息:state和thread。state表示当前节点的状态,而thread表示当前节点中等待的线程是哪个。另外,还有前后项指针,将Node串成一个双向链表。

AQS中CLH队列创建时是Lazy模式的,也就是在第一次用到的时候才创建。创建时,会采用一个虚拟head节点,但是在后续使用中,head.next指向的就是第一个等待、准备马上下一个持有锁的节点(也不绝对,因为acquire可能会cancel,往后看就知道了)。

注意:有地方说head本身表示当前持有锁的线程节点,这样理解没大问题,但是也要注意,head节点中记录的thread并不一定是当前持有锁的thread。只是说,当前持有锁的thread在release的时候,一定会触发一次unparkSuccesser(head),后面等待的node就像是被当前thread唤醒一样。

一旦一个在队列中的节点获得锁,就会把head指向自己。tail用来指向队列尾部的节点,方便在队尾进行插入节点。可以看到head和tail都是共享并且会读写的变量,所以要用volatile修饰。

获得锁的逻辑

线程尝试获得锁的时候,会调用acquire()方法。在acquire()方法中,处理的非常简单:

graph LR
A(acquire开始) --> B[tryAcquire]
B --成功--> Z(返回成功)
B --失败--> C[线程包装成节点入队]
C --> D[acquireQueued在队列中申请]
D -.最终成功.-> Z

可以看到,一旦不能马上拿到锁,节点就会入队,后续在队列中等待拿锁。acquireQueued中做了什么呢?

graph LR
A((开始)) --> B[检查自己是不是head.next]
B --是--> C[tryAcquire]
B --否--> D
C --失败--> D[检查能否park]
D --能--> P[park]
D --不能--> B
P -..-> B
C --成功--> Z((返回))

检查是否能park由静态方法shouldParkAfterFailedAcquire完成。该方法的处理逻辑是:

graph LR
A((开始)) --> B[前一节点状态]
B--SIGNAL-->Z(返回True)
B--CANCELED-->C[找到前面第一个没取消的节点]
C-->D[设置为该节点的前驱]
D-->Y(返回False)
B--0\-3-->E[前一节点设置为SIGNAL]
E-->Y

这里涉及到节点状态,要知道几种状态都是什么意思:

  • 0。初始状态,表示该节点中的线程在等待,完成后不需要做别的动作。
  • SIGNAL。被head指向时,表示线程(这里的线程不一定时节点中的thread)结束时会unpark唤醒下一个等待节点中的线程。
  • CANCELED。表示该节点中的线程已经取消获取了,比如超时或者被打断了。节点无效,会被删掉。
  • -3。待定。

虽然状态在Node类中有描述,但是其作用最关键体现在release锁时调用的unparkSuccesser()方法中。

释放锁的逻辑

释放锁非常简单:调用tryRelease,然后根据当前head节点的状态,如果不为0则调用unparkSuccesser()方法。不为零的话只可能是SIGNAL状态。在unparkSuccesser()方法中,会找到下一个没有取消的节点,然后unpark其中的thread。

总结

获取锁时,线程尝试一次,如果失败就进入队列。在队列中,如果自己是队列中等待的第一个节点,就尝试获取锁。获取锁失败后,会将自己挂在前面最近的有效(没有取消的)节点后面,设置前面节点为SIGNAL状态,这样前面节点中线程release的时候就会唤醒自己,相当于找个人吩咐完事了。之后如果再拿不到锁,就会park长眠。。。

可以看出,凡是进入了队列的线程,都会按照顺序被唤醒。那么,为什么ReentrantLock中还可以实现非公平锁呢?分析发现,这个顺序还有一个漏洞,也就是说线程在第一次尝试获得锁的时候还不在队列,因此可以抢占。队列中好不容易排到队的线程可能会被刚来的线程直接插到最前面抢走锁,因此还是非公平的。

ReentrantLock中公平锁的实现也非常好改:tryAcquire的时候,没有人排队的时候才能直接拿锁,有人排队的时候就要排队,创建了公平的秩序。

情境分析

Q:t1持有锁时,t2到达,乖乖入队并把head节点(可能是t1的节点,也可能不是)标记为SIGNAL,自己等在后面。刚刚标好,还没来得及park,t1就release并触发unparkSuccesser了,t2会通知不到吗?

A:unpark确实叫不醒没睡的t2,但是别忘了t2在acquireQueued中进入新循环时会尝试tryAcquire,因此还是能正常拿到锁。

Q:假如t2再tryAcquire之前又来了个t3,直接把锁给抢了,t2会怎么样?

A:那t2本次tryAcquire会失败,并且开始park。但是t3结束时,依然会触发head的unparkSuccesser,之前这个head指向的节点被t2设置了SIGNAL状态,因此会unpark叫醒t2,t2还是可以有机会拿到锁。所以,AQS设计很精巧,还是很可靠的。

Interruptibly

如果想允许拿锁过程的阻塞是可被打断的,要使用AQS的public方法acquireInterruptibly()方法。

进入这个方法,首先看到的就是判断Thread.isInterrupted(),如果在调用方法的时候打断位已经被置位了,直接抛出InterruptedException异常,进入客户端代码的打断处理逻辑。这里是一个小细节,在很多其他地方也有出现。

然后进入到其中调用的doAcquireInterruptibly()方法中,发现这个方法和之前看到的acquireQueued方法不是如出一辙吗!两个方法几乎一致,为什么就可以实现被打断的效果呢。

其实,acquireQueued方法中也对线程park被唤醒之后的打断进行了判断,逻辑就在parkAndCheckInterrupt方法中(方法名就告诉你了会检查是否有打断),只不过,这里就算park是因为interrupt而唤醒,acquireQueued方法中只是进行了记录,并没有其他什么操作,直接忽视掉了,继续进行下一轮的循环,线程会重新park睡着。

这个地方需要一些基础知识:park的线程可以被interrupt打断,线程会从park的后面开始执行,并且线程的打断位不会重置,也就是isInterrupted()还是返回true。interrupt是unpark之外的另一种park线程唤醒方式。

在doAcquireInterruptibly()方法中,依然会检查park出来之后的线程打断位状态。只不过,这次不再是忽视打断状态了,而是如果有被打断直接抛出InterruptedException异常,一直抛到客户端代码,触发打断处理逻辑。

Timeout

带有超时的拿锁方法通过调用tryAcquireNanos方法来使用。方法名字里多了一个try,说明这个方法是可能拿锁失败(返回false)的。

acquireNanos是在acquireInterruptibly方法的基础上实现的,其中保留了可打断拿锁的所有功能(被interrupt的时候会抛出异常),另外还实现了超时等待。

具体来说,acquireNanos中调用了私有方法doAcquireNanos。进入该方法时,先计算了一个超时时刻的时间戳deadline,然后在尝试拿锁的循环中,计算本轮需要park的时间,在park的时候传入,实现定时park的效果。注意这里有一个小细节:计算了本轮需要park的时间之后,会和spinForTimeoutThreshold(默认1000毫秒)进行比较,如果比这个小,则不会park,而是重新循环,这里就是自旋的体现。

park过程中,线程重新唤醒的可能性有三种:

  1. 线程如果是超时唤醒,则再次循环时,发现等待时间<0,说明已经超时,就会返回false表示拿锁失败。
  2. 如果是被打断,则会抛出打断异常,一直到客户端代码,进入打断处理。
  3. 如果是前面持有锁的线程release时unpark唤醒,线程运行进入新的一轮拿锁循环,有机会成功拿锁(不保证,因为可能会被新来的线程抢走)。

这三种情况在源码中都进行了妥当的处理。

Condition

AQS定义了一套自己的ConditionObject实现。ConditionObject实现了Condition接口,实现了signal、signalAll、await等方法,以供客户代码使用。需要注意的是,只有当线程已经拿到了锁,才能进行await、signal操作,否则会报错。因此,在设计和读条件变量中的代码时,不需要考虑线程安全问题。

基本实现的思路是:当在一个condition上await时,会为该线程创建一个CLH链表的节点Node,但是不是将这个节点放在AQS的队列中,而是放在该condition的链表中(通过firstWaiter和lastWaiter维护)。当signal的时候,会将firstWaiter从该condition的队列中移除,放入AQS的队列中,表示该线程可以获取到锁了。signalAll的作用是,将该condition的所有Node转移到AQS的阻塞队列中。这样就实现了在某一条件变量上等待的效果。

Shared

普通拿锁和释放

共享的逻辑。之前所说的都是独占版本的,每次只允许有一个线程拿锁。共享版本的可以处理资源有多个的情况,这时可以同时有多个线程拿锁,不需要控制只有一个线程拿锁。

共享的节点,会使用Node.nextWaiter来标识。注意这里并不是因为需要拉出一个链表,源码注释中写道:因为Condition条件变量上的等待只有独占节点才用的到,所以这里直接用来标识共享,节省一个字段。

获取共享锁的时候,也是tryAcquireShared()尝试拿锁,如果拿锁成功就直接返回,如果尝试一次失败,就进入doAcquireShared的逻辑,入队列然后再队列中尝试拿锁。

doAcquireShared和之前独占版本的acquireQueued的方法大同小异,但是这个关键性的区别就在于:当共享节点拿到锁的时候,不仅会setHead到自己,还会进行propagate传播。传播的逻辑其实就是判断当前节点是否是共享节点,如果是的话就会调用doReleaseShared私有方法,模拟将锁放出。而doReleaseShared方法中,会对循环当前head进行检查,如果是signal状态,就唤醒下一个节点抢锁;如果是0(初始状态),就标记为Propagate状态。循环会在head不在变化时退出(head变化只可能是由于并发导致的,doReleaseShared方法本身不改动head)。这个Propagate是一个不稳定的中间状态,因为一旦后面接上了其他拿锁失败,等待的节点,当前节点状态就变成signal(表示有责任唤醒后续节点)。

关于Propagate状态,我一开始还不是很理解,为什么要设置这样的状态。但是看了源码中的注释后想清楚了。注释中写:Propagate状态只能设置在head节点上,当setHeadAndPropagate时,如果看到head是propagate状态,则应该无条件向后扩散。这里说的无条件,其实是说,无论tryAcquireShared返回什么(这里返回值一般是剩余资源数量,aqs的doAcquireShared方法会根据剩余资源数量决定是否要扩散唤醒后续节点),都要向后扩散。也就是说,有可能后续线程在获取锁的时候已经判断了剩余资源为0,还是要像后扩散。这么做的原因只有一个,即可能出现一种情况,当后续线程成功获取锁的时候返回剩余资源数为0后,但是前面节点刚刚release导致又有资源了,因此不应该偏信刚才返回的剩余资源数,而是一定要再扩散一下。说起来比较抽象,考虑下面的具体情况:

  1. t0和t1持有共享锁,其中t1对应有节点在CLH队列中,t0没有入队。资源总数为2。
  2. 此时t2来了,尝试拿共享锁会失败,于是入队,刚把自己挂在t1后面。队列状态:head(share, 0)->t2(share, 0)
  3. 此时t0释放锁,导致head状态被改变。head(share, propagate)->t2(share, 0),可用资源为1.
  4. t2在队列中重新拿锁,这次成功了,但是因为是最后一个资源,所以tryAcquireShared返回0,表示没有剩余资源了。
  5. 此时t1释放锁,因为head状态并不是signal,不会调用unparkSuccesser,不唤醒后续节点,就直接结束了
  6. t2尝试setHeadAndPropagate,发现虽然propagate参数为0,但是head的状态为propagate,于是可以扩散,可以唤醒后续的park的共享模式节点。这样做是合理的,因为此时确实有资源,应该让后续共享节点拿锁。

如果没有propagate状态,在第6步中,看到propagate为0就不扩散了,导致虽然剩余一个资源,但是没有后续共享节点拿锁。

拓展的拿锁和释放

拓展功能包括:可打断拿共享锁、带超时拿共享锁。实现逻辑上都是基于基本功能实现的,比较好理解,这里就不再赘述。

Cancel

源码中还有一个细节,就是cancelAcquire方法。为什么可以取消获取呢?

仔细看会发现这是个private的方法,客户端代码并不能调用,六处usage分别在共享和独占版本获取的一共六个acquire方法中。其实这里就是为了防止在获取锁的时候出现异常导致线程跳出,导致节点“烂”在队列里,影响后续的使用。因此,在每一个acquire方法中,finally块都会用一个cancelAcquire方法来取消掉该节点。例如,带超时拿锁和可打断拿锁,当被打断时会抛出异常,这个时候就会将这个节点取消掉。

在新节点入队之后的park检查,以及锁释放的unparkSuccesser方法中,都将cancel掉的节点进行了清理,因此不必担心这些垃圾节点会对后续的使用造成影响。