AQS队列同步器(二)

AQS 队列同步器(二)


概述:

​ 在《AQS队列同步器(一)》中对AQS的概念和基本属性了解了一下,对独占式和共享模式竞争释放锁的过程进行了了解。通过继承AQS类,JDK实现了一些很好的共享或者独占式同步工具类,包括 ReentrantLock,CountDownLatch,Semaphore,CyclicBarrier等,接下来通过它们各自的同步器实现过程,了解AQS的实际运用。

辅助类:

  • 描述: 在了解AQS的方法时,有两个类出现的机率很频繁—LockSupport 和 UnsafeLockSupport 类的parkunpark方法用来在竞争锁失败时阻塞线程和释放锁时唤醒线程;Unsafe类提供了一系列基于底层实现的CAS操作,用来对同步器的队列头尾节点,节点状态,锁状态进行设置。在了解具体同步工具类实现之前,先了解一下这两个类。

  • LockSupport :

    • 类说明:摘自 JDK1.8.0_191版本

      • 用来创建锁和其他同步工具类的阻塞原语—操作系统中原始的不可分割的操作单元

      • 该类与使用它的每一个线程都关联一个许可,当这个许可可用时,park方法会立刻返回,在线程中消费这个许可,否则会阻塞线程;相对的,unpark方法会释放许可,让许可变成可用状态。

      • 很容易联想到 Semaphore 同步工具类,初始化一组许可,每次线程执行之前先请求许可,许可用完就阻塞后来线程,直到前面的线程释放许可。

      • park和unpark方法提供了高效的阻塞和释放线程的方法,由于许可的存在,同时进行park和unpark并不会导致死锁。park方法响应中断而且也提供了超时的重载方法,不过也有可能在任意时刻“毫无原因”地返回,所有在返回之前必须在循环中重复检查条件状态是否满足。在这个场景下,park方法表现得像优化版本“忙等待”,没有太多的自旋时间,但是必须要和unpark方法一起使用才会生效。

      • park和unpark的系列方法,是为构建高性能并发模块服务的,例如 park方法在这种结构下使用:

        while(!process()){ LockSupport.park(this) };

      • 只允许一个线程获取一个许可,其他任何非标准用法可能回导致非预期的结果。

    • 方法说明

      • unpark(): 释放许可,唤醒线程

        • 代码:

          1
          2
          3
          4
          5
          6
          7
          //取消线程的阻塞状态,使许可变成可用
          //但是当线程没有启动时,可能不会生效
          public static void unpark(Thread thread) {
          if (thread != null)
          //unsafe类的unpark方法
          UNSAFE.unpark(thread);
          }
          • 逻辑很少,判断线程非空,调用 UNSAFE.unpark(thread) 方法

          • //TODO

同步工具类

CountDownLatch : 共享锁

  • 类描述 :摘自 JDK1.8_191

    • 可以让一个或一组线程等待一系列操作完成之后再执行的同步器
    • CountDownLatch 初始化带有一个计数值,每次指定的操作完成都会调用countDown()方法去递减计数器;再计数值为0之前,所有线程都会阻塞,直到所有指定的操作完成
    • 计数值不能再次被重置,所以 CountDownLatch 更像是一道打开后不会关闭的门,条件未满足时,处于关闭状态,所有通过者都要等待;条件满足门打开,且一直保持开启状态。如果需要实现能够重置门状态的工具,考虑使用CyclicBarrier(栅栏)
    • CountDownLatch 可以用来实现多种功能,例如初始化计数为1,所有线程都等待直到某个线程调用CountDown方法递减计数器,那个线程拥有打开大门的钥匙;或者初始化计数为N,直到N个线程完成了执行或者指定的动作执行了N次,放行所有线程
    • 另一个典型的用法是:当一个大型任务可以被分割成独立的任务,使用多线程执行,当每一部分都执行完成之后再汇总,进行下一步
  • 类属性和方法

    • 先看一下CountDownLatch的同步器实现:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      //Sync
      private static final class Sync extends AbstractQueuedSynchronizer {
      private static final long serialVersionUID = 4982264981922014374L;

      //count表示初始化时的计数量,可以注意到这和ReentrantLock 有很大的区别,
      //State 不再表示各种状态了,只有0和非0
      Sync(int count) {
      setState(count);
      }
      //返回剩余的计数量
      int getCount() {
      return getState();
      }
      //共享式竞争锁,当计数完毕,返回1表示成功,否则返回负数表示失败
      protected int tryAcquireShared(int acquires) {
      return (getState() == 0) ? 1 : -1;
      }
      //共享式释放锁
      protected boolean tryReleaseShared(int releases) {
      //死循环
      for (;;) {
      int c = getState();
      if (c == 0)
      return false;
      //每一次释放锁,计数递减1
      int nextc = c-1;
      if (compareAndSetState(c, nextc))
      return nextc == 0;
      }
      }
      }
      • 和AQS最明显的区别是状态表示法,state不再代表各种状态,只有0和非0代表"门"的开闭
      • 初始化指定计数量,每次countDown都会释放一次锁
      • 竞争锁时如果当前计数为0才能竞争成功
    • CountDownLatch(int count) : 构造函数,参数为指定的计数量

      1
      2
      3
      4
      public CountDownLatch(int count) {
      if (count < 0) throw new IllegalArgumentException("count < 0");
      this.sync = new Sync(count);
      }
      • 用参数的计数值初始化了内部的同步器,代表锁的数量
    • await() :当计数不为0时让线程处于响应中断的等待状态

      • 代码:await(){sync.acquireSharedInterruptibly(1);}
      • 线程会一直尝试获取锁,直到计数为0或者线程被中断
    • await(long timeout,TimeUnit unit) : 限时等待,响应中断

      • 代码: await(long timeout,TimeUnit unit)

        {return sync.tryAcquireSharedNanos(1,unit.toNanos(timeout));}

      • 在限时内且计数不为0的状态下,持续地尝试获取锁,直到成功或者被中断,抛出中断异常

    • countDown() : 递减计数器的方法

      • 代码: countDown(){sync.releaseShared(1);}
      • countDown 方法递减计数器,原理是每次都调用共享式释放锁的方法,将计数递减,代表指定动作已经完成一次
  • 小结:

    • CountDownLatch用来实现"条件门"这样的场景十分合适,条件满足之前所有线程都在门前等待;条件满足之后门保持常开状态
    • 由于是共享式获取锁,所以当计数条件为0,所有尝试获取锁的线程都会成功

Semaphore : 信号量

  • 类描述:摘自 JDK1.8_191

    • 一个计数信号量,从概念上来说,它维持着一组许可,每一个请求锁的acquire方法都会阻塞直到有可用的许可,然后获得许可运行
    • 每一次调用release释放锁,都会增加一个可用许可,可能会直接释放一个阻塞中的请求锁的方法。
    • 然而, 并不真正存在许可这样的对象,信号量Sqmaphore只是维护一个可用计数
    • 信号量通常用来限制可访问指定资源的线程的数量,例如线程池:初始化指定大小的线程池,当有新任务到来,请求一个线程去执行;当没有可用线程时,任务加入工作队列或者被拒绝。
    • 当想要使用信号量保护的资源时,需要先从信号量获取到一个使用许可,确保有资源可用;在使用完资源之后,需要归还许可,保证其他线程可以获得许可去申请和使用资该源
    • 当调用acquire()方法请求锁时,并没有持有任何同步锁,防止因此导致资源无法归还到资源池中;信号量封装了同步机制以限制对资源池的访问。
    • 当信号量的许可初始化为1时,表示只有一个线程可以获得资源的使用许可,等于是实现了排他锁的功能,这也被称为二值信号量 :表示它只有两个状态:许可可用,无许可。当信号量作为二值信号量使用时,它会具备这样的属性:被持有的锁会被其他线程锁释放而不是持有锁的线程本身,因为信号量没有所有权这样的概念。这个特性在某些场景下十分有用,例如死锁恢复。
    • 信号量的构造方法也接受一个fair参数,表示使用公平锁还是非公平锁模式;公平锁模式下对于线程获得许可的顺序提供了保证,它保证会按照调用acquire()方法执行的先进先出顺序发放许可。同时也需要注意,无时限的tryAcquire()方法不会遵循公平锁模式的规则。
    • 通常用来控制线程对资源的访问应该将信号量构造参数初始化为true,使用公平锁模式,防止线程饥饿的问题;
  • 属性和方法

    • sync : 内部同步器,具有公平模式和非公平模式两种实现

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      //sync
      abstract static class Sync extends AbstractQueuedSynchronizer {
      private static final long serialVersionUID = 1192457210091910933L;
      //初始化的许可数量,和 CountDownLatch 类似,作为状态值存在
      Sync(int permits) {
      setState(permits);
      }
      //获取许可数量,调用getState()方法,一目了然
      final int getPermits() {
      return getState();
      }
      //非公平模式共享式竞争锁,如果存在许可,CAS设置剩余许可数量,并返回剩余许可
      final int nonfairTryAcquireShared(int acquires) {
      for (;;) {
      int available = getState();
      int remaining = available - acquires;
      if (remaining < 0 ||
      compareAndSetState(available, remaining))
      return remaining;
      }
      }
      //共享式释放锁
      protected final boolean tryReleaseShared(int releases) {
      for (;;) {
      int current = getState();
      int next = current + releases;
      if (next < current) // overflow
      throw new Error("Maximum permit count exceeded");
      if (compareAndSetState(current, next))
      return true;
      }
      }
      //减少许可
      final void reducePermits(int reductions) {
      for (;;) {
      int current = getState();
      int next = current - reductions;
      if (next > current) // underflow
      throw new Error("Permit count underflow");
      if (compareAndSetState(current, next))
      return;
      }
      }
      //返回所有许可
      final int drainPermits() {
      for (;;) {
      int current = getState();
      if (current == 0 || compareAndSetState(current, 0))
      return current;
      }
      }
      }
      • 可用许可初始化为state,每次请求锁成功许可减1,CAS设置 state值;每次成功释放锁,许可加1.
      • 还提供了一个reducePermits(int)方法,可以批量减少许可的数量
    • FairSync : 公平锁模式同步器实现:

    • 代码:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      //FairSync
      static final class FairSync extends Sync {
      private static final long serialVersionUID = 2014338818796000944L;

      FairSync(int permits) {
      super(permits);
      }

      protected int tryAcquireShared(int acquires) {
      for (;;) {
      //保障公平模式方法,查找是否存在等待许可更久的线程
      if (hasQueuedPredecessors())
      return -1;
      int available = getState();
      int remaining = available - acquires;
      if (remaining < 0 ||
      compareAndSetState(available, remaining))
      return remaining;
      }
      }
      }
      • 通过当前队列不为空或者当前节点是否是头节点来查找等待队列中是否存在等待获取许可更久的节点
    • NonFairSync : 非公平模式的锁实现

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      //NonFairSync
      static final class NonfairSync extends Sync {
      private static final long serialVersionUID = -2694183684443567898L;

      NonfairSync(int permits) {
      super(permits);
      }
      //共享式竞争锁
      protected int tryAcquireShared(int acquires) {
      return nonfairTryAcquireShared(acquires);
      }
      }
      • 非公平锁的实现看起来比较简单,只声明了一个共享式竞争锁的方法。
    • acquire() : 请求锁

      1
      2
      3
      public void acquire() throws InterruptedException {
      sync.acquireSharedInterruptibly(1);
      }
      • 调用了AQS类的可中断共享请求锁方法,存在可用许可会立即返回,否则一直阻塞到获得许可或者被中断
    • acquireUninterruptibly() : 不响应中断的请求锁方法

      1
      2
      3
      public void acquireUninterruptibly{
      sync.acquireShared(1);
      }
      • 调用了当前同步器的共享竞争所锁方法,由于不响应中断,当没有许可可用时,当前线程会由于线程调度的原因而进入休眠状态,直到某个线程调用了release方法释放一个许可,当前线程就会马上获得这个许可。
      • 如果当前线程在等待许可期间被中断它会继续处于等待状态,**但是相对于线程没有中断发生的情况下,线程获得许可的时间可能会发生一些变化。**当这个方法返回时会设置线程的中断状态。
    • tryAcquire(int permits,long timeout,TimeUnit unit) : 限时请求锁的方法

      1
      2
      3
      4
      5
      6
      public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
      throws InterruptedException {
      //添加了参数合理性判断
      if (permits < 0) throw new IllegalArgumentException();
      return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
      }
      • 实现还是调用了AQS类的tryAcquireSharedNanos(permits,unit.toNanos(timeout)),这个方法也会响应中断,其实从抛出的异常就可以看出来了;发生中断时停止竞争锁,超时则表示竞争锁失败
  • 小结

    • AQS类的共享模式实现,和之后的ReentrantLock类似,提供了公平和非公平的两种同步器实现
    • 因为实际上维持一个许可的概念,获得一个许可之后可以继续执行,执行完成释放许可。很适合做资源访问控制,避免不合理大量竞争
    • 借助AQS的模板方法,提供了响应中断和超时获取锁的方法,这也是和内置锁相比较的便利之处

ReentrantLock : 互斥可重入锁,具有AQS公平锁和非公平锁的实现

  • 类描述:摘自 JDK1.8_191

    • 一个可重入的独占锁,和java 内置锁具备相同的内存语义,但是可以被继承。

    • 可重入锁会被最后一个成功锁定但还未释放锁的线程持有。当锁未被其他线程持有时,请求锁的线程会成功获得锁并返回。如果请求锁的线程已经持有锁,再次请求锁时会立刻返回。可以通过 isHeldByCurrentThread 和 getHoldCount 来判断当前线程是否持有锁

    • 可重入锁的构造接受一个 fairness 的 boolean 参数,true 表示内部使用公平模式实现的同步器,在公平模式下,同步器会倾向于保证队列中等待时间最长的线程获得锁。相反,false 表示非公平模式的同步器,不会对锁的分配提供任何顺序的保证。在大量线程并发的情况下,公平模式可重入锁的吞吐量较低,但是获取锁的效率和非公平模式差不多,而且还能避免线程饥饿问题

    • 需要注意,公平模式下,可重入锁也不会对线程调度提供任何公平性保障(这是系统控制的)。因此公平模式下有可能某个线程会多次获得锁,当其他活跃线程没有在执行或者未持有锁的情况下。

    • 无参的 tayLock() 方法不会遵循公平锁模式的规则,当锁空闲时去竞争就会获得锁,即使存在等待更久的线程

    • 最佳实践:在调用 lock 方法获取锁之后,使用 try 保护同步的代码,并在finally 中释放锁。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      class X{
      private final ReentrantLock lock = new ReentrantLock();

      public void m(){
      //阻塞直到获取到锁
      lock.lock();
      try{

      }finally{
      lock.unlock();
      }
      }
      }
    • 序列化:对可重入锁序列化时存在和内置锁序列化一样的问题:无论序列化时锁的状态是如何的,反序列化之后都会处于未加锁状态。

    • 在同一个线程上,可重入锁最大支持2147483647次的递归锁定,超过这个限制将会在 lock 方法中抛出 error

  • 属性和方法

    • sync: 可重入锁内部继承 AQS 实现的基础同步器

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76
      77
      abstract static class Sync extends AbstractQueuedSynchronizer {
      private static final long serialVersionUID = -5179523762034025860L;

      /**
      * Performs {@link Lock#lock}. The main reason for subclassing
      * is to allow fast path for nonfair version.
      */
      abstract void lock();

      //非公平模式的请求锁
      final boolean nonfairTryAcquire(int acquires) {
      final Thread current = Thread.currentThread();
      int c = getState();
      //无锁状态竞争锁
      if (c == 0) {
      //CAS设置锁状态
      if (compareAndSetState(0, acquires)) {
      //成功表示竞争到了锁,设置持有锁的线程引用
      setExclusiveOwnerThread(current);
      return true;
      }
      }
      //如果请求锁的线程是持有锁的线程
      else if (current == getExclusiveOwnerThread()) {
      int nextc = c + acquires;
      if (nextc < 0) // overflow
      throw new Error("Maximum lock count exceeded");
      //state 代表 holdCount,因为请求锁的线程已经持有了锁,这里将 holdCount+1
      setState(nextc);
      return true;
      }
      return false;
      }

      //释放锁
      protected final boolean tryRelease(int releases) {
      int c = getState() - releases;
      //判断想要释放锁的线程是否是当前线程
      if (Thread.currentThread() != getExclusiveOwnerThread())
      throw new IllegalMonitorStateException();
      boolean free = false;
      if (c == 0) {
      free = true;
      //置空持有锁的线程引用
      setExclusiveOwnerThread(null);
      }
      setState(c);
      return free;
      }

      //判断是否是当前线程持有锁,而不是去判断状态,因为可重入锁的状态代表线程持有锁的次数,不足以作为锁状态的判断
      protected final boolean isHeldExclusively() {
      return getExclusiveOwnerThread() == Thread.currentThread();
      }

      final ConditionObject newCondition() {
      return new ConditionObject();
      }

      final Thread getOwner() {
      return getState() == 0 ? null : getExclusiveOwnerThread();
      }

      final int getHoldCount() {
      return isHeldExclusively() ? getState() : 0;
      }

      final boolean isLocked() {
      return getState() != 0;
      }

      private void readObject(java.io.ObjectInputStream s)
      throws java.io.IOException, ClassNotFoundException {
      s.defaultReadObject();
      setState(0); // reset to unlocked state
      }
      }
      • 和AQS锁状态字段定义最大的区别是:可重入锁的state 代表当前线程持有锁的次数,每次请求锁成功会加1。因此不能单纯地通过state状态字段判断锁的状态,取而代之可以判断是否是当前线程持有锁。
    • NonfairSync : 非公平模式同步器

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      static final class NonfairSync extends Sync {
      private static final long serialVersionUID = 7316153563782823691L;
      //直接CAS去竞争锁,失败会调用acquire进而调用到 Sync 同步器中定义的 nonfairTryAcquire 方法
      final void lock() {
      if (compareAndSetState(0, 1))
      setExclusiveOwnerThread(Thread.currentThread());
      else
      acquire(1);
      }

      protected final boolean tryAcquire(int acquires) {
      return nonfairTryAcquire(acquires);
      }
      }
      • 基本没有自己的特殊实现,加解锁方法都来自父类 Sync
    • FairSync : 公平模式同步器

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      static final class FairSync extends Sync {
      private static final long serialVersionUID = -3000897897090466540L;

      final void lock() {
      acquire(1);
      }

      //公平模式竞争锁
      protected final boolean tryAcquire(int acquires) {
      final Thread current = Thread.currentThread();
      int c = getState();
      if (c == 0) {
      //实现公平的关键在于 hasQueuedPredecessors() 方法,
      //查找队列中是否存在比当前线程等待更久的线程
      if (!hasQueuedPredecessors() &&
      compareAndSetState(0, acquires)) {
      //这是从AQS的父接口AbstractOwnableSynchronizer的方法,设置当前独占锁的线程引用
      setExclusiveOwnerThread(current);
      return true;
      }
      }
      else if (current == getExclusiveOwnerThread()) {
      int nextc = c + acquires;
      if (nextc < 0)
      throw new Error("Maximum lock count exceeded");
      setState(nextc);
      return true;
      }
      return false;
      }
      }

      //
      public final boolean hasQueuedPredecessors() {
      //如果当前队列为空或者当前线程是head 节点的线程,表示当前线程等待的时间最久,返回false
      Node t = tail; // Read fields in reverse initialization order
      Node h = head;
      Node s;
      return h != t &&
      ((s = h.next) == null || s.thread != Thread.currentThread());
      }
      • 公平模式竞争锁的方式,比非公平模式多了一个当前线程等待时间的判断,如果队列中存在等待更久的线程,则不会去竞争锁,从而实现公平。
    • ReentrantLock()/ReentrantLock(boolean fair) : 重入锁的构造方

      • 无参构造默认创建非公平模式的重入锁
      • 传参构造方法,fire = true才表示创建公平模式的重入锁
    • lock() : 请求锁方法,调用实际实现的同步器的 lock 方法

      1
      2
      3
      public void lock(){
      sync.lock();
      }
    • tryLock : 请求锁方法,需要注意的是,即便使用了公平模式的同步器,tryLock方法也会直接去竞争锁而不考虑是否存在等待锁更久的线程,这某种形式上更加灵活,插队竞争锁。

      1
      2
      3
      public boolean tryLock() {
      return sync.nonfairTryAcquire(1);
      }
      • 如果想要遵守公平模式同步器的规则,可以使用tryLock(0,TimeUnit)
    • tryLock(long timeout,TimeUnit unit) : 超时请求锁的方法

      1
      2
      3
      4
      public boolean tryLock(long timeout, TimeUnit unit)
      throws InterruptedException {
      return sync.tryAcquireNanos(1, unit.toNanos(timeout));
      }
      • 遵循公平锁模式去竞争锁,如果想要一个限时请求锁的方法在公平模式下去直接竞争锁,可以这样tryLock()||tryLock(timeout,unit) 去调用
    • isHeldByCurrentThread : 判断当前线程是否持有锁

      1
      2
      3
      4
      5
      6
      7
      public boolean isHeldByCurrentThread(){
      return sync.isHeldExclusively();
      }

      protected final boolean isHeldExclusively() {
      return getExclusiveOwnerThread() == Thread.currentThread();
      }
      • 通过AbstractOwnableSynchronizer类的getExclusiveOwnerThread() 获得持有锁的线程和当前线程进行比较
  • 小结

    • 可重入的互斥锁,可重入是因为当已经持有锁的线程再次请求锁时,会直接返回并将当前线程的heldCount+1,没有重新竞争锁的流程
    • 支持可中断和可超时的竞争锁方法,公平锁模式下也可以通过无参的tryLock()方法直接去竞争锁忽视其他等待线程
    • tryLock(0,TimeUnit)是一个比较好的实践,遵守了公平锁规则,又不会超时

CyclicBarrier : 栅栏,类似与可重置状态的 CountLatch

  • 概述://TODO