AbstractQueuedSynchronizer
抽象同步队列简称AQS ,它是实现同步器的基础组件,并发包中锁的底层就是使用AQS 实现的。另外,大多数开发者可能永远不会直接使用AQS ,但是知道其原理对于架构设计还是很有帮助的。
LockSupport 工具类
在剖析AQS之前,我们先来介绍LockSupport
工具类,LockSupport
是使用Unsafe
类实现的,它的主要作用是挂起和唤醒线程,该工具类是创建锁和其他同步类的基础。
LockSupport
类与每个使用它的线程都会关联一个许可证,在默认情况下调用LockSupport
类的方法的线程是不持有许可证的。下面介绍LockSupport
中的几个主要函数:
void park()
方法:用于挂起进程。如果调用park
方法的线程没有拿到了与LockSupport
关联的许可证,调用线程会被禁止参与线程的调度, 也就是会被阻塞挂起。void unpark(Thread thread)
方法:用于唤醒进程。当一个线程调用unpark
时,如果参数thread 线程没有持有thread 与LockSupport
类关联的许可证, 则让thread 线程持有。如果thread 之前因调用park()
而被挂起,则调用unpark
后,thread获得许可证,该线程会被唤醒。可以把许可证不恰当地比成一次性的“免死金牌”,
park()
的线程消耗”免死金牌”,如果没有”免死金牌“,那么只能”上法场”(线程阻塞挂起)了。unpark(thread)
授予thread线程”免死金牌”
1 | Thread thread=new Thread(()->{ |
抽象同步队列AQS
AQS 是一个FIFO 的双向队列,其内部通过节点head
和tail
记录队首和队尾元素。
节点元素
我们首先讨论的是队列元素Node
。线程就保存在Node
的thread
变量中。Node
的类型分为两种:
SHARED
:线程是获取共享资源时被阻塞挂起后放入AQS 队列EXCLUSIVE
: 线程是获取独占资源时被挂起后放入AQS 队列的。
Node
还拥有waitStatus
变量,用于记录当前线程等待状态,可以为CANCELLED
(线程被取消了)、SIGNAL
( 线程需要被唤醒)、CONDITION
(线程在条件队列里面等待〉和PROPAGATE
(释放共享资源时需要通知其他节点)。
状态信息的获取
AQS 中维持了一个单一的状态信息state
,可以通过getState
、setState
、compareAndSetState
函数修改其值。对于不同的锁阻塞和同步器,state
拥有不同的含义。
对于AQS 来说,线程同步的关键是对状态值state
进行操作。根据state
是否属于一个线程,操作state
的方式分为独占方式和共享方式。
独占方式
使用独占方式获取的资源是与具体线程绑定的,就是说如果一个线程获取到了资源,就会标记是这个线程获取到了,其他线程再尝试操作state
获取资源时会发现当前该资源不是自己持有的,就会在获取失败后被阻塞。
在独占方式下,获取与释放资源的流程如下:
当一个线程调用
acquire(int arg)
方法获取独占资源时,会首先使用tryAcquire
方法尝试获取资源, 具体是设置状态变量state
的值,成功则直接返回,失败则将当前线程封装为类型为Node.EXCLUSIVE
的Node
节点后插入到AQS 阻塞队列的尾部,并调用LockSupport.park(this)
方法挂起自己。1
2
3
4
5
6
7
8public final void acquire(int arg) {
//尝试获取state资源
if (!tryAcquire(arg) &&
//获取资源失败则调用addWaiter方法创建节点,加入队列,阻塞挂起
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}当一个线程调用
release(int arg)
方法时会尝试使用tryRelease
操作释放资源,这里是设置状态变量state
的值,然后调用LockSupport.unpark(thread)
方法激活AQS 队列里面被阻塞的一个线程 。1
2
3
4
5
6
7
8
9
10
11public final boolean release(int arg) {
//设置state值
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//唤醒队列中的进程
unparkSuccessor(h);
return true;
}
return false;
}
需要注意的是, AQS 类并没有提供可用的tryAcquire
和tryRelease
方法,正如AQS是锁阻塞和同步器的基础框架一样, tryAcquire
和tryRelease
需要由具体的子类来实现。子类在实现tryAcquire
和tryRelease
时要根据具体场景使用CAS 算法尝试修改state
状态值,成功则返回true
,否则返回false
。子类还需要明确,在调用acquire
和release
方法时state
状态值的增减代表什么含义。
共享方式
对应共享方式的资源与具体线程是不相关的,当多个线程去请求资源时通过CAS 方式竞争获取资源,当一个线程获取到了资源后,另外一个线程再次去获取时如果当前资源还能满足它的需要,则当前线程只需要使用CAS 方式进行获取即可。
在共享方式下,获取与释放资源的流程如下:
当线程调用
acquireShared(int arg)
获取共享资源时,会首先使用tryAcquireShared
操作尝试获取资源,具体是设置状态变量state 的值,成功则直接返回,失败则将当前线程封装为类型为Node.SHARED
的Node 节点后插入到AQS阻塞队列的尾部,并使用LockSupport.park(this)
方法挂起自己。1
2
3
4public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}当一个线程调用
releaseShared(int arg)
时会尝试使用tryReleaseShared
操作释放资源,这里是设置状态变量state
的值,然后使用LockSupport.unpark(thread)
激活AQS 队列里面被阻塞的线程。
1 | public final boolean releaseShared(int arg) { |
AQS 类也没有提供可用的tryAcquireShared
和tryReleaseShared
方法, 需要由具体的子类实现。
中断的忽略
独占方式下获取资源有两种函数:void acquire(int arg)
和void acquireInterruptibly(int arg)
,共享方式也是如此。不带Interruptibly
关键字的方法的意思是不对中断进行响应,也就是线程在调用方法获取资源时或者获取资源失败被挂起时,其他线程中断了该线程, 那么该线程不会因为被中断而抛出异常,它还是继续获取资源或者被挂起,也就是说不对中断进行响应,忽略中断。
队列的维护
队列的维护主要看入队操作,当一个线程获取锁失败后该线程会被转换为Node 节点,然后就会使用enq(final Node node)
方法将该节点插入到AQS 的阻塞队列。
1 | private Node enq(final Node node) { |
条件变量
使用
和notify
和wait
是配合synchronized
内置锁实现线程间同步的基础设施类似,条件变量的signal
和await
方法是用来配合锁(使用AQS 实现的锁〉实现线程间同步的基础设施。
它们的不同在于, synchronized
同时只能与一个共享变量的notify
或wait
方法实现同步,
而AQS 的一个锁可以对应多个条件变量。
1 | static final ReentrantLock reentrantLock=new ReentrantLock(); |
其实这里的reentrantLock
对象等价于synchronized
加上共享变量,调用reentrantLock.lock()
方法就相当于进入了synchronized
块(获取了共享变量的内置锁),调用reentrantLock.iunlock()
方法就相当于退出synchronized
块。调用条件变量的await()
方法就相当于调用共享变量的wait()
方法,调用条件变量的signal
方法就相当于调用共享变量的notify()
方法。调用条件变量的signalAll()
方法就相当于调用共享变量的notifyAll()
方法。
原理剖析
lock.newCondition()
的作用其实是new 了一个在AQS 内部声明的ConditionObject
对象, ConditionObject
是AQS 的内部类,可以访问AQS 内部的变量(例如状态变量state
)和方法。在每个条件变量内部都维护了一个条件队列,用来存放调用条件变量的await()
方法时被阻塞的线程。注意这个条件队列是单向队列,和AQS 队列不是一回事。
当线程调用条件变量的await()
方法时(必须先调用锁的lock()
方法获取锁),在内部会构造一个类型为Node.CONDITION
的node节点,然后将该节点插入条件队列末尾,之后当前线程会释放获取的锁( 也就是会操作锁对应的state
变量的值),并被阻塞挂起。这时候如果有其他线程调用lock.lock()
尝试获取锁,就会有一个线程获取到锁,如果获取到锁的线程调用了条件变量的await()
方法,则该线程也会被放入条件变量的阻塞队列,然后释放获取到的锁,在await()
方法处阻塞。
1 | public final void await() throws InterruptedException { |
1 | private Node addConditionWaiter() { |
当另外一个线程调用条件变量的signaL
方法时( 必须先调用锁的lock()
方法获取锁),在内部会把条件队列里面队头的一个线程节点从条件队列里面移除并放入AQS 的阻塞队列里面, 然后激活这个线程。
1 | public final void signal() { |