0%

Java并发包的基石--抽象同步队列(AQS)

AbstractQueuedSynchronizer 抽象同步队列简称AQS ,它是实现同步器的基础组件,并发包中锁的底层就是使用AQS 实现的。另外,大多数开发者可能永远不会直接使用AQS ,但是知道其原理对于架构设计还是很有帮助的。

LockSupport 工具类

在剖析AQS之前,我们先来介绍LockSupport工具类,LockSupport 是使用Unsafe 类实现的,它的主要作用是挂起和唤醒线程,该工具类是创建锁和其他同步类的基础。

LockSupport 类与每个使用它的线程都会关联一个许可证,在默认情况下调用LockSupport 类的方法的线程是不持有许可证的。下面介绍LockSupport 中的几个主要函数:

  • void park()方法:用于挂起进程。如果调用park 方法的线程没有拿到了与LockSupport 关联的许可证,调用线程会被禁止参与线程的调度, 也就是会被阻塞挂起。

  • void unpark(Thread thread)方法:用于唤醒进程。当一个线程调用unpark 时,如果参数thread 线程没有持有thread 与LockSupport 类关联的许可证, 则让thread 线程持有。如果thread 之前因调用park()而被挂起,则调用unpark 后,thread获得许可证,该线程会被唤醒。

    可以把许可证不恰当地比成一次性的“免死金牌”,park()的线程消耗”免死金牌”,如果没有”免死金牌“,那么只能”上法场”(线程阻塞挂起)了。unpark(thread)授予thread线程”免死金牌”

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Thread thread=new Thread(()->{
System.out.println("child thread start park");
//调用park ,由于当前线程不持有许可证,挂起自己
LockSupport.park();
System.out.println("child thread begin unpark");
});

thread.start();

Thread.sleep(1000);
System.out.println ("main thread begin unpark");
//调用unpark 使得 thread 持有许可证,park方法返回
LockSupport.unpark(thread);

//调用thread的interrupt()方法会得到同样的输出

image-20211024211304771

抽象同步队列AQS

简单理解AQS ( 抽象的队列同步器)_迟到的help的博客-CSDN博客_抽象同步队列器

AQS 是一个FIFO 的双向队列,其内部通过节点headtail 记录队首和队尾元素。

节点元素

我们首先讨论的是队列元素Node 。线程就保存在Nodethread 变量中。Node 的类型分为两种:

  • SHARED :线程是获取共享资源时被阻塞挂起后放入AQS 队列
  • EXCLUSIVE: 线程是获取独占资源时被挂起后放入AQS 队列的。

Node还拥有waitStatus变量,用于记录当前线程等待状态,可以为CANCELLED (线程被取消了)、SIGNAL ( 线程需要被唤醒)、CONDITION (线程在条件队列里面等待〉和PROPAGATE (释放共享资源时需要通知其他节点)。

状态信息的获取

AQS 中维持了一个单一的状态信息state,可以通过getStatesetState
compareAndSetState 函数修改其值。对于不同的锁阻塞和同步器,state拥有不同的含义。

对于AQS 来说,线程同步的关键是对状态值state 进行操作。根据state 是否属于一个线程,操作state 的方式分为独占方式共享方式

独占方式

使用独占方式获取的资源是与具体线程绑定的,就是说如果一个线程获取到了资源,就会标记是这个线程获取到了,其他线程再尝试操作state 获取资源时会发现当前该资源不是自己持有的,就会在获取失败后被阻塞。

在独占方式下,获取与释放资源的流程如下:

  1. 当一个线程调用acquire(int arg) 方法获取独占资源时,会首先使用tryAcquire 方法尝试获取资源, 具体是设置状态变量state 的值,成功则直接返回,失败则将当前线程封装为类型为Node.EXCLUSIVENode 节点后插入到AQS 阻塞队列的尾部,并调用LockSupport.park(this) 方法挂起自己。

    1
    2
    3
    4
    5
    6
    7
    8
    public final void acquire(int arg) {
    //尝试获取state资源
    if (!tryAcquire(arg) &&
    //获取资源失败则调用addWaiter方法创建节点,加入队列,阻塞挂起
    acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

    selfInterrupt();
    }
  2. 当一个线程调用release(int arg)方法时会尝试使用tryRelease 操作释放资源,这里是设置状态变量state 的值,然后调用LockSupport.unpark(thread)方法激活AQS 队列里面被阻塞的一个线程 。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    public final boolean release(int arg) {
    //设置state值
    if (tryRelease(arg)) {
    Node h = head;
    if (h != null && h.waitStatus != 0)
    //唤醒队列中的进程
    unparkSuccessor(h);
    return true;
    }
    return false;
    }

需要注意的是, AQS 类并没有提供可用的tryAcquiretryRelease 方法,正如AQS是锁阻塞和同步器的基础框架一样, tryAcquiretryRelease 需要由具体的子类来实现。子类在实现tryAcquiretryRelease 时要根据具体场景使用CAS 算法尝试修改state 状态值,成功则返回true,否则返回false。子类还需要明确,在调用acquirerelease 方法时state状态值的增减代表什么含义。

共享方式

对应共享方式的资源与具体线程是不相关的,当多个线程去请求资源时通过CAS 方式竞争获取资源,当一个线程获取到了资源后,另外一个线程再次去获取时如果当前资源还能满足它的需要,则当前线程只需要使用CAS 方式进行获取即可。

在共享方式下,获取与释放资源的流程如下:

  1. 当线程调用acquireShared(int arg) 获取共享资源时,会首先使用tryAcquireShared操作尝试获取资源,具体是设置状态变量state 的值,成功则直接返回,失败则将当前线程封装为类型为Node.SHARED 的Node 节点后插入到AQS阻塞队列的尾部,并使用LockSupport.park(this) 方法挂起自己。

    1
    2
    3
    4
    public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
    doAcquireShared(arg);
    }
  2. 当一个线程调用releaseShared(int arg)时会尝试使用tryReleaseShared 操作释放资源,这里是设置状态变量state 的值,然后使用LockSupport.unpark(thread)激活AQS 队列里面被阻塞的线程。

1
2
3
4
5
6
7
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

AQS 类也没有提供可用的tryAcquireSharedtryReleaseShared方法, 需要由具体的子类实现。

中断的忽略

独占方式下获取资源有两种函数:void acquire(int arg)void acquireInterruptibly(int arg),共享方式也是如此。不带Interruptibly 关键字的方法的意思是不对中断进行响应,也就是线程在调用方法获取资源时或者获取资源失败被挂起时,其他线程中断了该线程, 那么该线程不会因为被中断而抛出异常,它还是继续获取资源或者被挂起,也就是说不对中断进行响应,忽略中断。

队列的维护

队列的维护主要看入队操作,当一个线程获取锁失败后该线程会被转换为Node 节点,然后就会使用enq(final Node node) 方法将该节点插入到AQS 的阻塞队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private Node enq(final Node node) {
for (;;) {
Node t = tail;
//如果队列为空,创建结点,同时被head和tail引用
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
//cas设置尾结点,不成功就一直重试
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

image-20211025162355530

条件变量

使用

notifywait 是配合synchronized 内置锁实现线程间同步的基础设施类似,条件变量的signalawait 方法是用来配合锁(使用AQS 实现的锁〉实现线程间同步的基础设施。

它们的不同在于, synchronized 同时只能与一个共享变量的notifywait 方法实现同步,
而AQS 的一个锁可以对应多个条件变量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
static final ReentrantLock reentrantLock=new ReentrantLock();
//newCondition()方法返回reentrantLock对应的一个条件变量
static Condition condition=reentrantLock.newCondition();
public static void main(String[] args) {
new Thread(()->{
try {
//获取独占锁
reentrantLock.lock();
System.out.println("begin wait ");
//调用条件变量的await()方法阻塞挂起
condition.await();
System.out.println("end wait");
}
catch (Exception e){
e.printStackTrace();
}
finally {
reentrantLock.unlock();
}

}).start();

new Thread(()->{
try {
//获取独占锁
reentrantLock.lock();
Thread.sleep(1000);
System.out.println("begin signal ");
condition.signal();
System.out.println("end signal");
}
catch (Exception e){
e.printStackTrace();
}
finally {
reentrantLock.unlock();
}

}).start();
}

image-20211027192845636

其实这里的reentrantLock对象等价于synchronized 加上共享变量,调用reentrantLock.lock()方法就相当于进入了synchronized 块(获取了共享变量的内置锁),调用reentrantLock.iunlock()方法就相当于退出synchronized 块。调用条件变量的await()方法就相当于调用共享变量的wait()方法,调用条件变量的signal 方法就相当于调用共享变量的notify()方法。调用条件变量的signalAll()方法就相当于调用共享变量的notifyAll()方法。

原理剖析

lock.newCondition()的作用其实是new 了一个在AQS 内部声明的ConditionObject 对象, ConditionObject 是AQS 的内部类,可以访问AQS 内部的变量(例如状态变量state )和方法。在每个条件变量内部都维护了一个条件队列,用来存放调用条件变量的await()方法时被阻塞的线程。注意这个条件队列是单向队列,和AQS 队列不是一回事。

浅谈Java中的Condition条件队列,手摸手带你实现一个阻塞队列! - Java填坑笔记- 博客园

当线程调用条件变量的await()方法时(必须先调用锁的lock()方法获取锁),在内部会构造一个类型为Node.CONDITION 的node节点,然后将该节点插入条件队列末尾,之后当前线程会释放获取的锁( 也就是会操作锁对应的state 变量的值),并被阻塞挂起。这时候如果有其他线程调用lock.lock() 尝试获取锁,就会有一个线程获取到锁,如果获取到锁的线程调用了条件变量的await()方法,则该线程也会被放入条件变量的阻塞队列,然后释放获取到的锁,在await()方法处阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
 public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//调用addConditionWaiter方法创建新的node节点,并插入到条件队列末尾
Node node = addConditionWaiter();
//释放当前锁
int savedState = fullyRelease(node);
int interruptMode = 0;
//调用park方法阻塞挂起当前线程
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
...
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private Node addConditionWaiter() {
Node t = lastWaiter;
...
//创建类型为Node.CONDITON的节点
Node node = new Node(Thread.currentThread(), Node.CONDITION);

//向单向队列尾部插入一个元素
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}

当另外一个线程调用条件变量的signaL 方法时( 必须先调用锁的lock() 方法获取锁),在内部会把条件队列里面队头的一个线程节点从条件队列里面移除并放入AQS 的阻塞队列里面, 然后激活这个线程。

1
2
3
4
5
6
7
8
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
//将队列头元素移动到AQS队列
doSignal(first);
}