应付面试可直接跳转到文章结尾处的总结
AQS(AbstractQueueSynchronizer)
在锁框架中,AbstractQueueSynchronizer抽象类可以毫不夸张的说,占据着核心地位,它提供了一个基于FIFO队列,可以用于构建锁或者其他相关装置的基础框架。
AbstractQueueSynchronizer会把所有的请求线程构成一个CLH队列,当一个线程执行完毕(lock.unlock)时会激活自己的后继节点,但正在执行的线程并不在队列中,而那些等待执行的线程全部处于阻塞状态。
注:大多数JUC包下的并发工具类都是利用AbstractQueuedSynchronizer同步器来委托实现的,它是用来构建锁或者其他同步组件的基础框架。想要弄明白并发原理,搞明白AQS的实现机制很重要。
AQS主要实现了独占式和共享式
独占式:同步状态在0与1之间切换,同一时刻只有一个线程获取锁进行操作,其他线程挂起,ReentrantLock就是一个经典的独占式锁。
共享式:PROPAGATE的数值大于0,可以使同一时刻有多个线程获取锁,如果PROPAGATE小于0,则就变成了共享式模式,可以参照Semphore。
数据结构
AbstractQueuedSynchronizer类底层的数据结构使用的是 双向链表 + state(锁状态) + 底层CAS操作
。
Sync queue(同步队列):是一个双向链表,包括head节点和tail节点,head节点主要做后续的调度。
Condition queue(条件队列):这不是必须的,它是一个单向链表,只有当使用Condition时,才会存在此单向链表。并且可能会有多个。
概念:
- AQS使用了一个int类型的state成员变量表示同步状态,通过Node类型的head和tail头尾节点所确定的FIFO队列来完成资源获取线程的排队工作。
- 当一个线程成功的获取了同步状态,其他线程将无法获取到同步状态,没有成功获取同步状态的线程会成为节点加入该队列的尾部,为保证线程安全,使用
compareAndSetTail(Node expect, Node update)
方法原子更新在尾部插入等待节点。 - 头节点是获取同步状态成功的节点,头节点的线程在释放同步状态时,将会唤醒后继节点,而后继节点将会在获取同步状态成功时将自己设置为首节点。
- 获取同步状态成功的线程才能设置头节点,且只有一个线程能获取到同步状态,不存在线程安全问题,所以不用CAS来设置节点。
同步容器AQS的基本用法
同步器的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态,在抽象方法的实现过程中免不了要对同步状态进行更改,这时就需要使用同步器提供的三个方法getState()
、setState(int newState)
、compareAndSetState(int expect, int update)
来进行操作,因为它们能够保证状态的改变是安全的。
子类推荐被定义为自定义同步组件的静态内部类,同步器自身没有实现任何同步接口,它仅仅是定义了若干同步状态获取和释放的方法供自定义同步组件使用,同步器既可以支持独占式地获取同步状态,也可以支持共享式的获取同步状态,这样就可以方便实现不同类型的同步组件。
使用技巧
同步器的设计是基于模板方法的,也就是说,使用者需要继承同步器并重写指定的方法,随后将同步器组合在自定义同步组件的实现中,并调用同步器提供的模板方法,而这些模板方法将会调用使用者重写的方法。而重写同步器指定的方法时,需要使用同步器提供的如下三个方法来访问或修改同步状态。
getState()
: 获取当前同步状态。setState(int newState)
: 设置当前同步状态。compareAndSetState(int expect, int update)
: 使用CAS设置当前状态,该方法能保证状态设置的原子性。
AQS可重写的方法
- boolean tryAcquire(int):独占式获取同步状态,实现此方法需要查询当前状态并判断同步状态是否符合预期,然后再进行CAS设置同步状态。
- boolean tryRelease(int):独占式释放同步状态,等待获取同步状态的线程将有机会获取同步状态。
- boolean isHeldExclusively():当前同步器是否在独占模式下被线程占用,一般此方法表示是否被当前线程所独占。
- int tryAcquireShared(int):共享式获取同步状态。
- boolean tryReleaseShared(int):共享式释放同步状态。
AQS中的模板方法
以下的模板方法的内部都会调用上面说的5个可被重写的方法,而自定义同步组件在实现一些同步功能时一般会使用委托的方法调用AQS的这些模板方法,而不是直接调用上面AQS的那5个可重写的方法,当然也有例外,比如:ReentrantLock的tryLock()方法就是直接调用 boolean tryAcquire(int)
- void acquire(int):独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则,将会进入同步队列等待,该方法将会调用重写的tryAcquire(int)方法。
- void acquireInterruptibly(int):与acquire(int arg)相同,但是该方法响应中断,当前线程未获取到同步状态而进入同步队列中,如果当前线程被中断,则该方法会抛出InterruptedException并返回。
- boolean tryAcquireNanos(int ,long):在acquireInterruptibly(int arg)基础上增加了超时限制,如果当前线程在超时时间内没有获取到同步状态,那么将会返回false,如果获取到了返回true。
- boolean release(int):独占式的释放同步状态,该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒。
- void acquireShared(int):共享式的获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待,与独占式获取的主要区别是在同一时刻可以由多个线程获取到同步状态。
- void acquireSharedInterruptibly(int):与acquireShared(int)相同,该方法响应中断。
- boolean tryAcquireSharedNanos(int, long):在acquireSharedInterruptibly(int)基础上增加了超时限制。
- boolean releaseShared(int):共享式的释放同步状态。
源码示例
AQS类中的排他锁示例
1 | class Mutex implements Lock, java.io.Serializable { |
AQS类中的共享锁示例
1 | class BooleanLatch { |
AQS源码分析
类继承关系
1 | public abstract class AbstractQueuedSynchronizer |
AbstractOwnableSynchronizer抽象类源码
1 | public abstract class AbstractOwnableSynchronizer |
这个类中只有一个Thread类型的变量exclusiveOwnerThread及getter/setter方法,用来获取/设置独占线程,正如其名Ownable
一个线程可能专有的同步器。
这对getter/setter方法对于独占锁类型的并发工具特别有用。如ReentrantLock是一个可重入排他锁(排他锁又称独占锁),因此ReentrantLock类中经常用到这两个方法。
类属性
1 | public abstract class AbstractQueuedSynchronizer |
内部类:Node
1 | static final class Node { |
内部类:ConditionObject
1 | public class ConditionObject implements Condition, java.io.Serializable { |
独占式:acquire
逐行分析看着比较吃力,可以先看看最后的总结:AQS独占式获取锁逻辑:详细版
1 | // 以独占模式获取(资源),忽略中断,即线程在aquire过程中,中断此线程是无效的 |
说明:通过调用acquire获取同步状态,该方法忽略中断,线程获取同步状态失败后,进入同步队列,在对其进行中断操作后,线程不会从同步队列移除。
流程:
- 首先调用
tryAcquire
方法获取同步状态,AQS并没有实现这个方法,具体的实现由它的继承类进行重写,如:ReentrantLock的Sync类。 - 如果获取同步状态成功直接返回true,如果获取失败执行下面的步骤。
- 调用addWaiter方法把线程封装成一个独占式的Node节点添加到同步队列的尾部,然后调用acquireQueued方法使节点以自旋的方式获取同步状态,如果获取同步状态失败,要挂起线程(被挂起的线程的唤醒主要依靠前驱节点的出队或阻塞线程被中断来实现),最后,线程如果在获取同步状态中和同步队列中被中断过,要进行自我中断。
添加新的等待节点:addwaiter
主要通过调用compareAndSetTail(Node node)
来确保正确安全的设置尾节点,避免了非线程安全状态下添加新节点时节点先后顺序混乱的情况,如果一次CAS设置尾节点失败,则直接进入enq(final Node)
方法通过自旋的CAS操作,直到将节点设置为尾节点成功。
1 | // 把Node节点添加到同步队列的尾部 |
总结:简单来说就是把当前线程按照mode模式(独占/共享)封装成一个node节点,然后把node节点添加到队列尾部,但是当入队失败或者尾节点为空,就需要调用enq方法自旋了。
这里的操作非常经典,将入队操作提前尝试快速入队,如果失败才去执行自旋。
自旋添加尾节点:enq
1 | // 采用自旋方式把node插入到队列中 t <-> node : t.next = node; node.prev = t; |
enq
方法采用了非常经典的自旋操作,只有通过CAS把node设为尾节点后,当前线程才能退出该方法,否则的话,当前线程不断尝试,直到能把节点添加到队列中为止,这样就把并行添加变成了串行添加。
获取同步状态:acquireQueued
1 | // 通过自旋方式获取同步状态 |
首先获取当前节点的前驱节点,如果前驱节点是头节点,有两种情况
- 默认空的头节点,说明此时是同步队列中的第一个线程去尝试获取同步状态。
- 获取到同步状态的节点,然后再一次调用子类重写的tryAcquire方法去获取同步状态,如果成功获取同步状态,则把当前节点设为头节点。
如果当前节点的前驱节点不是头节点或者没有获取到同步状态的话,就要调用shouldParkAfterFailedAcquire方法挂起当前线程。
注:为什么只有前驱节点为头节点才能尝试获取同步状态?
- 首先队列这种数据结构要保证FIFO的基本原则。
- 其次头节点是当前获取到同步状态的节点,只有在头节点释放同步状态,才能通知后继节点可以进入同步状态。
检查同步状态失败是否需要阻塞线程:shouldParkAfterFailedAcquire
1 | // 如果线程获取同步状态失败就要检查它的节点status,要保证prev = node.prev |
总结:这段代码用来检测是否挂起当前线程,分三种情况
- 前驱节点的
ws = SINGAL
:表示前驱节点释放同步状态的时候会唤醒当前节点,可以安全挂起当前线程。 - 前驱节点被取消:那就从前驱节点继续往前遍历,直到找到第一个ws <= 0的节点。
- 前驱节点的ws = 0:表示前驱节点获取到同步状态,当前线程不能挂起,因该尝试去获取同步状态,前驱节点的同步状态的释放正好可以让当前节点进行获取,所以使用CAS把前驱节点的ws设为SINGAL,另外如果ws = PROPAGATE,说明以共享模式进行传播,也需要使用CAS把ws设置为SINGAL。
挂起当前线程并检查当前线程是否被中断:parkAndCheckInterrupt
1 | // 挂起当前线程并判断线程是否被中断过 |
从同步队列中移除无效的节点:cancelAcquire
1 | private void cancelAcquire(Node node) { |
总结:分为三种情况进行考虑
- node本身就是尾节点,直接把node的prev设置为尾节点.
- node的prev不是头节点,直接把pred和node的next进行连接.
- node的prev是头节点,使用unparkSuccessor唤醒后继节点.
unparkSuccessor
1 | // 如果node存在唤醒它的后继节点 |
selfInterrupt
如果执行完acquireQueued方法返回线程被中断过,那线程最后要进行自我中断一下.
1 | static void selfInterrupt() { |
其他获取同步状态的方法
响应式中断获取同步状态acquireInterruptibly
1 | // 当前线程被中断后,直接抛出异常,否则的话,再次调用tryAcquire方法获取同步状态 |
doAcquireInterruptibly
1 | // 以独占模式获取同步状态,线程被中断直接抛出异常 |
指定时间内获取同步状态: tryAcquireNanos
1 | // 以独占模式获取同步状态,线程被中断,直接抛出异常,如果再指定时间内没有获取到同步状态, |
doAcquireNanos
1 | private boolean doAcquireNanos(int arg, long nanosTimeout) |
释放同步状态的方法:release
1 | public final boolean release(int arg) { |
共享式
独占式和共享式的最大不同就是在同一时刻能否有多个线程获取同步状态,通过调用acquireShared方法获取同步状态
1 | public final void acquireShared(int arg) { |
doAcquireShared
1 | // 以共享非中断获取同步状态 |
setHeadAndPropagate
1 | private void setHeadAndPropagate(Node node, int propagate) { |
doReleaseShared
1 | private void doReleaseShared() { |
总结:
如果只是为了应付面试可以只看下面的内容。
AQS类的核心概念
- AQS主要实现了独占式和共享式两种,独占式的同步状态在0与1之间切换,同时只能有一个线程获取锁进行操作,而共享式同一时刻可以有多个线程获取锁。
- AQS会把所有的请求线程构成一个CLH队列,当一个线程执行完毕(lock.unlock)时会激活自己的后继节点,但正在执行的线程并不在队列中,而那些等待执行的线程全部处于阻塞状态。
- AQS采用Node类型的双向链表来确定资源获取线程的排队工作,其中头节点是获取同步状态成功的节点,头节点的线程在释放同步状态时,将会唤醒它的后继节点,而后继节点将会在获取同步状态成功时将自己设置为头节点。而没有成功获取同步状态的线程会成为节点加入该队列的尾部,为了线程安全采用了CAS原子方法更新尾部插入节点。
AQS类最关键的几个变量:
AQS.head
: 同步队列头节点,表示获取同步状态成功节点。AQS.tail
: 阻塞的尾节点,每个新进来的节点,都插入到最后,形成一个链表。AQS.state
: 表示当前锁的状态,0表示没有被占用,大于0表示有线程持有当前锁,当大于1的时候表示锁重入。Node.waitStatus
: 表示节点状态,分为CANCELLED(被取消的节点)、SIGNAL(前驱节点正在占用锁)、CONDITION()、PROPAGATE()Node.nextWaiter
: nextWaiter在描述同步队列的head和tail两个成员变量中只是表示共享型或独占型节点。其主要是在描述Condition条件队列的firstWaiter和lastWaiter两个成员变量中起比较重要的作用,表示下一个等待条件的节点。ConditionObject.firstWaiter
: condition队列的头节点。ConditionObject.lastWaiter
: condition队列的尾节点。
AQS独占式获取锁逻辑:简单版
1、首先调用 tryAcquire
方法获取同步状态,AQS并没有实现这个方法,具体的实现由它的子类实现,如:ReentrantLock的Sync类。如果获取同步状态失败执行下面的步骤。
2、调用addWaiter方法把线程封装成一个独占式的Node节点添加到同步队列的尾部。
3、然后调用acquireQueued方法使节点以自旋的方式获取同步状态,如果获取同步状态失败,要挂起线程(被挂起线程的唤醒主要依靠前驱节点的出队或阻塞线程被中断来实现)。
4、最后,线程如果在获取同步状态中和同步队列中被中断过,还要进行一次自我中断。
AQS独占式获取锁逻辑:详细版
1 | acquire(int arg) |
AQS独占式释放锁逻辑
调用tryRelease尝试释放同步状态,然后再唤醒头节点的后继节点线程(主要是将node节点的waitStatus设置为初始值0,并唤醒可用的后继节点)。