AQS队列同步器(一)

AQS 队列同步器(一)


概述

​ 接触并发编程的同步机制时,会了解到一个实现的比较好的同步器,例如 原子类(AtomicInteger),信号量(Semaphore),CountDownLatch等,他们都是基于AQS构建的同步器,用来协调多线程对于共享资源的访问策略,再不同的硬件水准的情况下对并发吞吐量性能做了极大的提升,还保证的线程安全性。因此了解AQS对于更好地理解这些优秀同步器的设计有很大帮助。

AQS抽象类简介:

  • 类简介(摘自 JDK 1.8 源代码中对此类的介绍):
  • 基于FIFO的等待队列提供了一个用来实现阻塞锁和相关同步器的框架,这个类设计为实现各种同步器的基础,通过使用一个原子的 AtomicInteger 变量 state 来代表当前锁的状态。子类必须重写 protected 类型的方法,用来改变这个状态,还需要定义当调用请求锁和释放锁的方法时,这个state字段的含义。

  • 通过提供这些保证,AQS的其他方法才能具备队列性和同步性结构。子类也可以增加其他的代表状态的字段,但只有在通过 getState,SetState,CompareAndSetState这些方法原子更新状态值,才是维持了同步的概念。

  • 子类需要通过自己的内部类继承 AQS 类的属性来构建同步器,AQS 本身没有实现任何同步接口,相应地它提供了一些将锁和同步器关联地公共方法用以被实现。

  • AQS 类支持独占(exclusive) 和 共享(shared) 模式地锁获取。独占锁模式下,当锁被占有时,其他调用请求锁的线程会失败;共享锁模式下,多个线程请求获取锁可能都会成功。在共享锁模式中,当一个线程获取锁成功,下一个线程是否获取锁取决于它本身。在不同锁模式下处于等待状态的线程节点,位于同一个FIFO队列,因此共享模式的锁获取并不意味着一定成功。AQS 的子类可以自行决定是否需要支持独占或者共享模式,或者两者都支持,例如读写锁(ReadWriteLock)和可重入锁(ReentrantLock)。当子类只支持一种锁模式时,不需要实现另一种锁模式的相关方法。

  • AQS 定义了一个嵌套类 ConditionObject 可以作为 Condition 对象被支持独占锁模式的子类实现,Condition 对象是条件对象,用来关联锁以决定是否能够去请求获取锁。ConditionObject 也是独占模式下方法 isHeldExclusively,release,getState,acquire等方法正确语义的实现一部分。如果这些方法的约束条件不存在,就不要使用ConditionObject 对象。ConditionObject 对象的语义也取决的于AQS的子类实现。

  • AQS 为内部的队列提供了检查和监视队列状态的代码,类似于ConditionObject 方法。这些方法可以根据同步器的子类同步器结构实现需要导出到类中。

  • AQS 类的序列化只会保存底层的原子状态变量值,所以反序列化的AQS会包含一个空的队列。通常需要序列化的子类需要定义或者实现一个readObject 方法保存把这些信息作为初始状态进行反序列化。

  • 以上这些比较简洁地说明了AQS的正确继承和实现方式以及一些使用时的注意点,接下来是一些方法的使用方式。

  • 将AQS 作为同步器实现基础使用时,需要自定义这些方法的实现 :tryAcquire(独占式获取锁),tryRelease(独占式释放锁),tryAcquireShared(共享式获取锁),tryReleaseShared(共享式释放锁),isHeldExclusively(是否独占式持有锁),通过 getState() 方法查询锁状态,setState() 和 conpareAndSatState()等方法来设置锁状态。

  • 上述需要自定义实现的方法都抛出了 UnsupportedOperationException ,上述方法的实现大体上需要简短且非阻塞,实现这些方法是使用AQS的唯一途径。

  • AQS还继承了AbstractOwnableSynchronizer 抽象类,用来设置当前拥有锁的线程对象,通过它的方法实现可以帮助用户监控和判断哪个线程持有锁。

  • 虽然AQS是基于内部的FIFO队列实现的,它并不会自动地执行队列地先进先出规则,独占式的同步发生在获取锁的时候。因为在非公平的同步器中,检查能否获取锁发生在等待线程进入队列之前,所以当一个新的线程请求锁时,它可能会请求成功而不会进入队列,因此队列中的其他线程需要继续阻塞或等待。

  • 如果实现了一个公平锁,所有的线程都会先进入等待队列,然后按照某种排序规则,依次去请求锁。在公平锁的实现中,如果 hshQueuedPredecessors方法返回 true ,可以让 tryAcquire方法立刻返回 false 表示获取锁失败。

  • 默认的线程调度算法虽然具备相当的吞吐量和扩展性,但却不保证线程公平性和避免线程饥饿,排在队列前面的线程总是排在后面的线程更早地竞争锁,而且每次都是公平地和进入线程竞争锁。一般,获取锁的方法没有自旋;通常情况下,在多个线程处于竞争状态时, tryAcquire方法会被多次调用,直到其他线程阻塞。当锁被短暂持有时,这会带来比自旋更好的收益,不会白白耗费CPU资源。

  • AQS 提供了高效且有扩展性的同步实现方式,当它不能满足的你要求时,你可以i通过一种相对底层一点的方式,使用原子类,自定义队列和阻塞方法等构建自己的同步器。

  • 下面是一些例子

  • Mutex:独占式的不可重入同步器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
> // 0代表未锁定,1代表锁定;
> class Mutex implements Lock,java.io.Serializable{
> //继承AQS的内部同步器类,一个非公平实现
> private static class Sync extends AbstractQueuedSynchronizer{
>
> //判断锁定状态
> protected boolean isHeldExclusively(){
> return getState() == 1;
> }
>
> //未锁定状态时竞争锁
> public boolean tryAcquire(int acquires){
> assert acquires == 1;//代表请求获取锁
> //CAS操作设置锁状态
> if(compareAndSetState(0,1)){
> //设置成功,表示当前线程获得锁,设置当前线程为锁的持有者
> setExclusiveOwnerThread(Thread.currentThread());
> return true;
> }
> return false;
> }
>
> //释放锁
> protected boolean tryRelease(int release){
> assert release == 1;//锁定状态才需要释放锁
> if(getState() == 0)
> throws new IllegalMonitorStateException();
> //置空锁持有线程
> setExclusiveOwerThread(null);
> //重置锁状态
> setState(0);
> return true;
> }
>
> // 条件对象
> Condition newCondition() { return new ConditionObject(); }
>
> ```
> // 反序列化属性
> private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException {
> s.defaultReadObject();
> setState(0); // reset to unlocked state
> }
> }
>
> //借助已经实现的同步方法,实现加锁,解锁等
> private final Sync sync = new Sync();
> public void lock() { sync.acquire(1); }
> public boolean tryLock() { return sync.tryAcquire(1); }
> public void unlock() { sync.release(1); }
> public Condition newCondition() { return sync.newCondition(); }
> public boolean isLocked() { return sync.isHeldExclusively(); }
> public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
> public void lockInterruptibly() throws InterruptedException {
> sync.acquireInterruptibly(1);
> }
> //可超时中断的尝试加锁方法
> public boolean tryLock(long timeout, TimeUnit unit)
> throws InterruptedException {
> return sync.tryAcquireNanos(1, unit.toNanos(timeout));
> }
> }
>
  • 一个基于AQS构建的互斥同步器就这样构建好了,Condition 对象可以用来在一个队列上关联多个等待条件,防止出现虚假唤醒。
    • 在释放锁时还可以添加当前线程是否是持有锁的线程的判断
    • 因为是不可重入的实现,也没有提供对线程获取锁次数的计数
    • BooleanLatch: 共享式的同步器,只需要一个表示可进入状态的标志
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
> class BooleanLatch{
> //共享模式同步器
> private static class Sync extends AbstractQueuedSynchronizer{
>
> //判断是否不可再获取锁
> boolean isSignalled(){return getState() != 0}
>
> //共享式获取锁
> protected int tryAcquireShared(int ignore){
> return isSignalled() ? 1 : -1;
> }
>
> //共享式释放锁
> protected boolean tryReleaseShared(int ignore){
> setState(1);
> return true;
> }
> }
>
> //基于同步器实现的BooleanLatch
> private final Sync sync = new Sync();
> public boolean isSignalled(){return sync.isSignalled();}
> public void signal(){return sunc.releaseShared(1);}
> public void await() throws InterruptedException{
> return sync.acquireSharedInterruptibly(1);
> }
> }
>
  • 只有一个是否允许获得所或者是否允许进入的标志,只需要在指定条件后关闭即可
  • countdownLatch 类似,countdownLatch 关闭的条件是进入许可已经已使用完毕(计数递减至0

AQS 属性:

  • 内部类:Node

    • 基于 LCH队列锁(一种防止线程饥饿,保证公平性的自旋锁)构建的锁队列,使用类似的原理来构建阻塞同步器,通过在当前节点的前驱节点中保存一些关于锁的控制信息来构建阻塞同步器。每个节点中的status字段始终用来追踪确认一个线程是否应该尝试去持有锁。
    • 每当一个节点的前驱结点释放锁时,它都会得到通知。队列中的每一个节点都表现得像是等待具体通知的模式,因为各自只对应一个阻塞线程。status 字段并不能够控制一个线程是否会理所当然的获取锁。在队列中的首节点也许会尝试去获取锁,但不意味着首节店获取锁一定会成功,只是拥有了竞争锁的权利。所以竞争锁失败的节点需要再次进入等待状态。
    • 新入列的节点作为尾节点(tail)被引用,出列只需要重新设置头节点(head).
    • 入列只有一个对tail进行的简单的原子操作,所以入列与未入列之间有一条简单的界限。出列也类似,需要更新head,但是比入列多了一些操作:需要决定哪个竞争成功的线程出列,因为线程在竞争锁时可能由于取消,中断和超时等导致竞争失败。
    • prev引用需要捕获取消动作,如果一个节点取消的竞争的操作,这个节点的后继者需要重新链接到一个没有取消操作的节点,将其作为prev节点。
    • 我们也使用next引用继承阻塞机制,每个节点的线程ID都保存在各自的节点中,所以一个竞争者节点通过next引用决定下一个需要被唤醒的线程。设置后继节点 next必须避免和最新进入队列的节点
    • 取消操作表现出了一些基本算法的保守性特点,因为我们必须轮询其他节点的取消状态,所以会无论取消节点位于当前节点的前面或者后面都会导致当前节点错失通知信号
    • CLH队列需要一个虚假的头节点,但是我们并没有在构造队列时创建这个头节点,因为如果没有发生任何竞争的话这个动作就没有意义。取而代之的是在第一次发生竞争时基于这个节点创建头节点,尾节点。
    • 重点:处于条件等待的线程使用相同结构的节点,但是还持有一个额外的链接。条件(Condition)只需要把所有等待它的节点放置在一个普通的链表中,因为它们只有独占式获取锁时才能被访问。在等待状态时,一个节点会被加入条件队列;获得通知时,这个节点会被交换到主队列(等待顺序竞争),节点会持有一个特殊的状态字段来标志自己处于哪个队列中。

Node类属性和方法

  • 属性:
    • static final Node SHARED = new Node() : 表示当前节点处于共享同步器模式下
    • static final Node EXCLUSIVE = null : 表示节点处于互斥同步器模式下
    • static final Node CANCELLED = 1 : 等待状态字段值,表示线程取消操作
    • static final Node SIGNAL = -1 : 等待状态字段值,表示线程需要唤醒(unparking)
    • static final Node CONDITION = -2 : 等待状态字段值,表示线程处于条件等待状态
    • static final int PROPAGATE = -3 : 等待状态字段值,表示下一次共享式竞争锁需要无条件地传播下去
    • **volatile int waitStatus ** : 状态字段,表示当前线程的各种状态,状态值为上述的 CANCELLED,SIGNAL,CONDITION,PROPAGATE,它们的解释如下:
      • SIGANL : 当前节点的后继者处于/将要处于阻塞状态,当前线程节点取消或者释放锁之后需要通知唤醒后继节点。为了避免过度竞争,请求锁的方法(acquire)必须指明它们具备一个信号值,才能原子地进行锁竞争,或者失败,阻塞。
      • CANCELLED : 节点由于超时或者中断被取消竞争操作,而且将会一直处于这个取消状态。一般来说,处于取消状态的节点,永远不会再进入阻塞状态。
      • CONDITION : 当前节点处于条件队列中,直到它被交换到主队列时才会作为一个同步队列节点存在,节点被交换出去之后,CONDITION 状态被设置为 0 .
      • PROPAGATE : 共享式释放锁的操作需要无条件地传播给其他节点。只有在头节点的 doReleaseShared方法中才能进行设置是否继续传播。
      • 0 : 没有任何意义
      • 这些值各自都用一个数字代表,非负的数字表示一个节点不需要被通知,因此大部分方法都不要检查特定的状态值,只需要确定是否需要进行通知
      • 在普通同步节点中这个字段通常设置为 0 ,在条件等待节点中设置为 CONDITION 值,这些操作都是 CAS(CompareAndSet)。
      • waitStatus 字段本身使用 volatile 修饰,表示它具备了内存可见性
    • volatile Node prev : 当前节点通过检查等待状态,将自己链接到一个前驱节点之后,进入队列时分配,出列时置空。同时,当它的前驱节点发生取消,我们会寻找一个不是取消状态的节点来代替成为前驱节点,这样的节点始终是存在的,因为头节点意味着竞争锁成功的节点。而且,一个取消的节点永远都不会竞争锁成功,它只能取消它自己,而不是其他节点。
    • volatile Node next : 当一个节点释放锁时,它需要唤醒它的next引用指向的后继节点。入列的操作并不会马上为一个前驱节点设置 next 节点。所以查找到一个空节点并不意味着这个节点时尾节点,可以从尾节点的前驱节点开始进行双重检查。取消状态的节点的 next 节点会指向它自己而不是空值,易于生命周期管理。
    • volatile Thread thread : 入列线程。
    • Node nextWaiter : 指向在条件队列中等待的下一个节点,或者是 SHARED(共享同步器模式)。因为条件队列只有在互斥同器中才能使用,条件队列只需要一个普通的队列结构维持所有的条件等待节点。因此当它指向 SHARED 状态时,表示处于共享同步器模式。
    • final boolean isShared() : 确认一个节点是否处于共享等待状态 。return nextWaiter = SHARED
    • final Node predecessor() : 返回当前节点的前驱节点。
    • Node(){},Node(Thread thread,Node node){this.nextWaiter = node ; this.thread = thread;},Node(Thread thread,int waitStatus){this.waitStatus = waitStatus ; this.thread = thread;} : 构造函数

AQS 属性和方法 :

属性 :

  • private transient volatile Node head : 不可序列化的头节点,只有通过setHead 方法才能设置。如果头节点存在,那么保证它的 waitStatus 不是 CANCELLED
  • private transient volatile Node tail : 尾节点,入列时设置
  • private volatile int state : 同步状态字段,表示当前节点的等待状态,决定是否可以竞争锁等一系列操作
  • static final long spinForTimeoutThreshold = 1000L : 竞争锁时的自旋时间。
  • private static final Unsafe unsafe = Unsafe.getUnsafe() : //TODO
  • private static final long stateOffset
  • private static final long headOffset
  • private static final long tailOffset
  • private static final long waitStatusOffset
  • private static final long nextOffset

方法:

  • 从 public 方法开始,了解它的内部实现和关联的私有方法实现

  • acquire(int arg) : 独占式竞争锁,不响应中断。

    • 代码 :

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76
      77
      78
      79
      80
      81
      82
      83
      84
      85
      86
      87
      88
      89
      90
      91
      92
      93
      94
      95
      96
      97
      98
      99
      100
      101
      102
      103
      104
      105
      106
      107
      108
      109
      110
      111
      112
      113
      114
      115
      116
      117
      118
      119
      120
      121
      122
      123
      124
      125
      126
      127
      128
      129
      130
      131
      132
      133
      134
      135
      136
      137
      138
      139
      140
      141
      142
      143
      144
      145
      146
      147
      148
      149
      150
      151
      152
      153
      154
      155
      156
      157
      158
      public final void acquire(int arg) {
      /*调用了 tryAcquire 和 acquireQueued(),当自定义同步器实现的tryAcquire尝试竞争锁失败,并且同样独占式,
      非中断的方法 acquireQueued 方法成功,当前节点调用 selfInterrupt()*/
      if (!tryAcquire(arg) &&
      acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
      selfInterrupt();
      }

      //tryAcquire,需要自定义同步器实现的独占式竞争锁的方法
      //可以参考说明部分的同步器中实现的 tryAcquire 方法
      protected boolean tryAcquire(int arg) {
      throw new UnsupportedOperationException();
      }

      //acquireQueued
      /*
      为队列中已存在的线程进行独占式非中断锁竞争的方法,在条件等待方法中作为 acquire 方法使用,
      */
      final boolean acquireQueued(final Node node, int arg) {
      boolean failed = true;
      try {
      boolean interrupted = false;
      //死循环,直到头节点竞争节点成功或者竞争锁失败阻塞线程中断节点
      for (;;) {
      //获得当前节点的前驱节点
      final Node p = node.predecessor();
      //如果 node 的前驱节点是头节点且已经释放锁,且当前节点 node 竞争锁成功
      if (p == head && tryAcquire(arg)) {
      //设置 node 为头节点,置空旧的头节点
      setHead(node);
      p.next = null; // help GC
      //参数节点竞争锁成功
      failed = false;
      //未发生中断
      return interrupted;
      }
      //调用 `shouldParkAfterFailedAcquire` 方法检查并更新竞争锁失败的节点的状态
      if (shouldParkAfterFailedAcquire(p, node) &&
      parkAndCheckInterrupt())
      interrupted = true;
      }
      } finally {
      if (failed)
      //取消竞争锁
      cancelAcquire(node);
      }
      }


      //shouldParkAfterFailedAcquire
      //如果节点需要阻塞时返回 true ,在所有的 acquire 方法循环中,这是主要的信号控制方法
      private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
      int ws = pred.waitStatus;
      if (ws == Node.SIGNAL)
      //等待通知状态,可以安全阻塞
      return true;
      if (ws > 0) {
      //跳过所有的取消节点
      do {
      node.prev = pred = pred.prev;
      } while (pred.waitStatus > 0);
      pred.next = node;
      } else {
      //前驱节点的状态为 0 或者 PROPAGATE,说明不需要立刻阻塞,而是尝试去获取一个信号,变成等待通知的状态
      //CAS 方法,设置前驱节点为等待通知的状态
      compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
      }
      return false;
      }

      //compareAndSetWaitStatus
      //委托还是代理实现?Usafe 类的CAS操作
      private static final boolean compareAndSetWaitStatus(Node node, int expect, int update) {
      return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update);
      }

      //阻塞并中断当前线程
      private final boolean parkAndCheckInterrupt() {
      //LockSupport.park(this)的实现是 UNSAFE.putObject(t, parkBlockerOffset, arg);
      //将当前线程阻塞
      LockSupport.park(this);
      //中断线程
      return Thread.interrupted();
      }

      //共享式竞争锁失败,取消当前节点的竞争动作
      private void cancelAcquire(Node node) {
      // Ignore if node doesn't exist
      if (node == null)
      return;

      node.thread = null;

      // 跳过处于取消状态的节点,因为它们不可能再去竞争锁
      Node pred = node.prev;
      while (pred.waitStatus > 0)
      node.prev = pred = pred.prev;


      Node predNext = pred.next;
      //将当前节点置为取消状态
      node.waitStatus = Node.CANCELLED;

      // 如果被取消的当前节点时尾节点,移除此节点,并且重新设置尾节点
      if (node == tail && compareAndSetTail(node, pred)) {
      compareAndSetNext(pred, predNext, null);
      } else {
      //如果当前节点的前驱节点竞争锁成功,且当前节点又被取消,需要重新设置头节点的next节点
      int ws;
      if (pred != head &&
      ((ws = pred.waitStatus) == Node.SIGNAL ||
      (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
      pred.thread != null) {
      Node next = node.next;
      if (next != null && next.waitStatus <= 0)
      compareAndSetNext(pred, predNext, next);
      } else {
      unparkSuccessor(node);
      }
      node.next = node; // help GC
      }
      }

      //唤醒竞争锁成功的线程
      private void unparkSuccessor(Node node) {
      //如果节点状态为负值(SIGNAL,CONDITION,PROPAGATE),清除预期状态,0表示无意义
      int ws = node.waitStatus;
      if (ws < 0)
      compareAndSetWaitStatus(node, ws, 0);

      //唤醒竞争锁成功的节点中的线程,通常是next节点。
      //如果next节点取消或者明显是个Null,从尾节点往前开始遍历,查找竞争锁成功的节点
      Node s = node.next;
      if (s == null || s.waitStatus > 0) {
      s = null;
      for (Node t = tail; t != null && t != node; t = t.prev)
      if (t.waitStatus <= 0)
      s = t;
      }
      if (s != null)
      //唤醒线程
      LockSupport.unpark(s.thread);
      }

      //添加等待节点
      private Node addWaiter(Node mode) {
      Node node = new Node(Thread.currentThread(), mode);
      Node pred = tail;
      if (pred != null) {
      node.prev = pred;
      if (compareAndSetTail(pred, node)) {
      pred.next = node;
      return node;
      }
      }
      enq(node);
      return node;
      }
      • 小结:至此 acquire()方法尝试获取锁的整个过程已经大致看了一遍,大致的步骤是:-
        • 先调用 tryAcquire() 尝试竞争锁,同时调用acquireQueued()为队列中已存在的节点竞争锁,如果tryAcquire失败且acquireQueued成功,说明队列中的节点竞争到了锁,中断当前节点的线程。-
        • 在调用 acquireQueued()的过程中,需要阻塞竞争锁失败的节点,调用 shouldParkAfterFailedAcquire方法判断参数节点是否需要阻塞,是的话调用parkAndCheckInterrupt()方法阻塞当前线程并设置中断状态。-
        • acquireQueued方法最后还会判断头节点是否竞争锁失败,如果参数节点是第二个节点,但是竞争锁失败,就会取消竞争锁的动作。
  • acquireInterruptibly(int) : void : 独占式获取锁,响应中断

    • 代码:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      //独占式获取锁,响应中断: 会先检查中断状态,然后再至少调用一次tryAcquire方法,成功时返回。否则节点进入等待队列,重复阻塞和唤醒的动作
      //直到竞争锁成功或者发生中断
      public final void acquireInterruptibly(int arg)
      throws InterruptedException {
      if (Thread.interrupted())
      throw new InterruptedException();
      //第一次竞争锁失败
      if (!tryAcquire(arg))
      doAcquireInterruptibly(arg);
      }

      //独占式中断获取锁
      private void doAcquireInterruptibly(int arg)
      throws InterruptedException {
      //将节点加入等待队列
      final Node node = addWaiter(Node.EXCLUSIVE);
      boolean failed = true;
      try {
      for (;;) {
      final Node p = node.predecessor();
      if (p == head && tryAcquire(arg)) {
      setHead(node);
      p.next = null; // help GC
      failed = false;
      return;
      }
      if (shouldParkAfterFailedAcquire(p, node) &&
      parkAndCheckInterrupt())
      throw new InterruptedException();
      }
      } finally {
      if (failed)
      cancelAcquire(node);
      }
      }
      • 小结:
        • 会先检查线程的中断状态,线程中断则抛出中断异常;否则会调用一次竞争锁的方法
        • 竞争锁失败后,调用 acquireInterruptibly,可以看到这个方法和 acquireQueued很类似,区别在于当竞争失败时需要阻塞并中断线程时,会抛出中断异常而不是返回中断状态。
  • tryAcquireNanos: 独占式限时获取锁,响应中断

    • 代码:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      //限时竞争锁
      public final boolean tryAcquireNanos(int arg, long nanosTimeout)
      throws InterruptedException {
      //同样先检查中断状态
      if (Thread.interrupted())
      throw new InterruptedException();
      return tryAcquire(arg) ||
      doAcquireNanos(arg, nanosTimeout);
      }

      //doAcquireNanos
      private boolean doAcquireNanos(int arg, long nanosTimeout)
      throws InterruptedException {
      //先检查设置的时间合理性
      if (nanosTimeout <= 0L)
      return false;
      //设置竞争锁的deadline
      final long deadline = System.nanoTime() + nanosTimeout;
      final Node node = addWaiter(Node.EXCLUSIVE);
      boolean failed = true;
      try {
      for (;;) {
      final Node p = node.predecessor();
      if (p == head && tryAcquire(arg)) {
      setHead(node);
      p.next = null; // help GC
      failed = false;
      return true;
      }
      //每次循环检查是否到了放弃竞争锁的时间,超时返回失败
      nanosTimeout = deadline - System.nanoTime();
      if (nanosTimeout <= 0L)
      return false;
      //如果没有超时,且剩余时间大于自旋时间
      if (shouldParkAfterFailedAcquire(p, node) &&
      nanosTimeout > spinForTimeoutThreshold)
      //调用限时阻塞方法
      LockSupport.parkNanos(this, nanosTimeout);
      //响应中断
      if (Thread.interrupted())
      throw new InterruptedException();
      }
      } finally {
      if (failed)
      cancelAcquire(node);
      }
      }
    • 小结:

      • 也是先检查中断装填,然后是只要 teyAcquire()和doAcquireNanos()方法任意一个成功,都表示竞争到了锁
      • doAcquireNanos()方法会先检查设置超时时间的合理性,然后根据超时时间获得deadline,之后每次竞争锁失败的循环都判断是否超过deadline
      • 超时返回竞争失败;否则判断剩余时间是否大于一次自旋的时间,是的话阻塞线程一定时间。
  • release: 独占式释放锁,返回true的话会唤醒一个或多个线程

    • 代码:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      //release
      public final boolean release(int arg) {
      if (tryRelease(arg)) {
      Node h = head;
      if (h != null && h.waitStatus != 0)
      unparkSuccessor(h);
      return true;
      }
      return false;
      }

      //tryRelease和tryAcquire一样是需要继承者自行实现释放锁的逻辑
      protected boolean tryRelease(int arg) {
      throw new UnsupportedOperationException();
      }

      //贴一段ReentrantLock公平锁的tryRelease实现
      protected final boolean tryRelease(int releases) {
      int c = getState() - releases;
      //判断当前线程是否是持有锁的线程
      if (Thread.currentThread() != getExclusiveOwnerThread())
      throw new IllegalMonitorStateException();
      boolean free = false;
      if (c == 0) {
      free = true;
      //将持有锁的线程引用置空
      setExclusiveOwnerThread(null);
      }
      //如果正确释放,会设置为无锁状态
      setState(c);
      return free;
      }
      • 小结:

        • ReentrantLock实现的*tryRelease()*方法来看,独占式的实现中,需要先判断释放锁的线程和持有锁的线程是否一致

        • 再判断释放锁的状态是否和当前状态一致

        • 最后会将持有锁的线程引用置空,并设置无锁状态让其他线程可以竞争

    • acquireShared(int arg): 共享式竞争锁,不响应中断

      • 代码:

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        38
        39
        40
        41
        42
        43
        44
        45
        46
        47
        48
        49
        50
        51
        52
        53
        54
        55
        56
        57
        58
        59
        60
        61
        62
        63
        64
        65
        66
        67
        68
        69
        70
        71
        72
        73
        74
        75
        76
        77
        78
        79
        80
        81
        82
        83
        84
        85
        86
        87
        //acquireShared
        public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
        }

        //tryAcquireShared,需要自行实现的方法,返回值负数表示竞争失败;0表示当前节点竞争成功,但后续节点会共享竞争锁失败;
        //正数表示后续节点会共享竞争锁成功,这就是共享模式的特点
        protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
        }

        //doAcquireShared
        private void doAcquireShared(int arg) {
        //往等待队列添加共享模式节点
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
        boolean interrupted = false;
        for (;;) {
        final Node p = node.predecessor();
        if (p == head) {
        int r = tryAcquireShared(arg);
        if (r >= 0) {
        //竞争锁成功且返回正数,后续的共享节点也可以竞争获取锁,设置头节点
        setHeadAndPropagate(node, r);
        p.next = null; // help GC
        if (interrupted)
        selfInterrupt();
        failed = false;
        return;
        }
        }
        if (shouldParkAfterFailedAcquire(p, node) &&
        parkAndCheckInterrupt())
        interrupted = true;
        }
        } finally {
        if (failed)
        cancelAcquire(node);
        }
        }

        //setHeadAndPropagate,设置队列的头节点,并且检查竞争锁成功的方法是否处于共享模式的等待状态
        private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head;
        //重新设置头节点
        setHead(node);
        //判断后续节点是否可以共享竞争锁成功
        if (propagate > 0
        //判断是否空队列
        || h == null
        //判断头节点状态是否等待唤醒,或者条件等待,或者传播共享锁竞争状态
        || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        //如果第二个节点是空节点或者是共享状态节点
        if (s == null || s.isShared())
        //共享式释放锁
        doReleaseShared();
        }
        }

        //doReleaseShared,共享式释放锁,通知竞争锁成功的节点并且传播到后续节点
        private void doReleaseShared() {
        //死循环
        for (;;) {
        Node h = head;
        if (h != null && h != tail) {
        int ws = h.waitStatus;
        //当节点处于等待通知的状态
        if (ws == Node.SIGNAL) {
        //CAS设置节点为无状态,失败就一直循环
        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
        continue; // loop to recheck cases
        //唤醒竞争锁成功节点
        unparkSuccessor(h);
        }
        else if (ws == 0 &&
        !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
        continue; // loop on failed CAS
        }
        //如果这个判断为true,表示此时头节点发生了变化,需要再次循环
        if (h == head) // loop if head changed
        break;
        }
        }
        • 小结:

          • 当共享式竞争锁成功且返回非负数,会去重新设置头节点

          • 后续的流程和acquireQueued类似,检查是否需要阻塞中断线程或者取消线程

    • acquireSharedInterruptibly(int arg) : 共享式请求锁,响应中断

      • 代码:

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        38
        //acquireSharedInterruptibly
        public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
        //检查线程的中断状态
        if (Thread.interrupted())
        throw new InterruptedException();
        //第一次竞争失败的话,调用 doAcquireSharedInterruptibly
        if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
        }

        //doAcquireSharedInterruptibly,
        private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
        for (;;) {
        final Node p = node.predecessor();
        if (p == head) {
        int r = tryAcquireShared(arg);
        if (r >= 0) {
        setHeadAndPropagate(node, r);
        p.next = null; // help GC
        failed = false;
        return;
        }
        }
        if (shouldParkAfterFailedAcquire(p, node) &&
        parkAndCheckInterrupt())
        //抛出中断异常代替返回中断状态
        throw new InterruptedException();
        }
        } finally {
        if (failed)
        cancelAcquire(node);
        }
        }
        • 小结:几乎是一样的逻辑,只是多了对中断状态的检查
    • tryAcquireSharedNanos(int arg,long nanosTimeout):限时共享式竞争锁,响应中断

      • 代码:

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        //tryAcquireSharedNanos
        public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
        throws InterruptedException {
        if (Thread.interrupted())
        throw new InterruptedException();
        return tryAcquireShared(arg) >= 0 ||
        doAcquireSharedNanos(arg, nanosTimeout);
        }

        //doAcquireSharedNanos方法和tryAcquireNanos很类似,不再赘述
    • acquireShared(int arg): 共享式竞争锁,不响应中断

      • 代码:

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        38
        39
        40
        41
        42
        43
        44
        45
        46
        47
        48
        49
        50
        51
        52
        53
        54
        55
        56
        57
        58
        59
        60
        61
        62
        63
        64
        65
        66
        67
        68
        69
        70
        71
        72
        73
        74
        75
        76
        77
        78
        79
        80
        81
        82
        83
        84
        85
        86
        87
        //acquireShared
        public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
        }

        //tryAcquireShared,需要自行实现的方法,返回值负数表示竞争失败;0表示当前节点竞争成功,但后续节点会共享竞争锁失败;
        //正数表示后续节点会共享竞争锁成功,这就是共享模式的特点
        protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
        }

        //doAcquireShared
        private void doAcquireShared(int arg) {
        //往等待队列添加共享模式节点
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
        boolean interrupted = false;
        for (;;) {
        final Node p = node.predecessor();
        if (p == head) {
        int r = tryAcquireShared(arg);
        if (r >= 0) {
        //竞争锁成功且返回正数,后续的共享节点也可以竞争获取锁,设置头节点
        setHeadAndPropagate(node, r);
        p.next = null; // help GC
        if (interrupted)
        selfInterrupt();
        failed = false;
        return;
        }
        }
        if (shouldParkAfterFailedAcquire(p, node) &&
        parkAndCheckInterrupt())
        interrupted = true;
        }
        } finally {
        if (failed)
        cancelAcquire(node);
        }
        }

        //setHeadAndPropagate,设置队列的头节点,并且检查竞争锁成功的方法是否处于共享模式的等待状态
        private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head;
        //重新设置头节点
        setHead(node);
        //判断后续节点是否可以共享竞争锁成功
        if (propagate > 0
        //判断是否空队列
        || h == null
        //判断头节点状态是否等待唤醒,或者条件等待,或者传播共享锁竞争状态
        || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        //如果第二个节点是空节点或者是共享状态节点
        if (s == null || s.isShared())
        //共享式释放锁
        doReleaseShared();
        }
        }

        //doReleaseShared,共享式释放锁,通知竞争锁成功的节点并且传播到后续节点
        private void doReleaseShared() {
        //死循环
        for (;;) {
        Node h = head;
        if (h != null && h != tail) {
        int ws = h.waitStatus;
        //当节点处于等待通知的状态
        if (ws == Node.SIGNAL) {
        //CAS设置节点为无状态,失败就一直循环
        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
        continue; // loop to recheck cases
        //唤醒竞争锁成功节点
        unparkSuccessor(h);
        }
        else if (ws == 0 &&
        !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
        continue; // loop on failed CAS
        }
        //如果这个判断为true,表示此时头节点发生了变化,需要再次循环
        if (h == head) // loop if head changed
        break;
        }
        }
        • 小结:
          • 当共享式竞争锁成功且返回非负数,会去重新设置头节点
          • 后续的流程和acquireQueued类似,检查是否需要阻塞中断线程或者取消线程
    • acquireSharedInterruptibly(int arg) : 共享式请求锁,响应中断

      • 代码:

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        38
        //acquireSharedInterruptibly
        public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
        //检查线程的中断状态
        if (Thread.interrupted())
        throw new InterruptedException();
        //第一次竞争失败的话,调用 doAcquireSharedInterruptibly
        if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
        }

        //doAcquireSharedInterruptibly,
        private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
        for (;;) {
        final Node p = node.predecessor();
        if (p == head) {
        int r = tryAcquireShared(arg);
        if (r >= 0) {
        setHeadAndPropagate(node, r);
        p.next = null; // help GC
        failed = false;
        return;
        }
        }
        if (shouldParkAfterFailedAcquire(p, node) &&
        parkAndCheckInterrupt())
        //抛出中断异常代替返回中断状态
        throw new InterruptedException();
        }
        } finally {
        if (failed)
        cancelAcquire(node);
        }
        }
        • 小结:几乎是一样的逻辑,只是多了对中断状态的检查
    • tryAcquireSharedNanos(int arg,long nanosTimeout):限时共享式竞争锁,响应中断

      • 代码:

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        //tryAcquireSharedNanos
        public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
        throws InterruptedException {
        if (Thread.interrupted())
        throw new InterruptedException();
        return tryAcquireShared(arg) >= 0 ||
        doAcquireSharedNanos(arg, nanosTimeout);
        }

        //doAcquireSharedNanos方法和tryAcquireNanos很类似,不再赘述

总结:

  • 以上概括了AQS同步器类的基本结构和一些使用技巧,列举的两个独占式和共享式同步器实现可以看出继承AQS类大致的流程
  • 继承AQS实现的同步器需要做好对状态字段的管理,可以不依赖原本的状态值
  • 要自行实现 tryAcquire 和 tryRelease 等方法
  • 从独占式获取锁的三个类似方法来看,每次进行锁竞争,竞争失败时都会考虑阻塞线程;当竞争节点是第一个竞争的节点却竞争失败时将其置为取消状态
  • 共享式获取锁的方法竞争锁成功会传播给后续节点,其他流程和独占式方法类似