无锁队列

无锁队列


概念:

  1. 通常队列要实现安全的并发访问,需要对入队和出队方法加锁,例如在基于链表实现的队列中,入队和出队都涉及到头/尾节点指针的操作,不安全的并发控制很容易导致读取到错误数据甚至破坏链表结构

  2. 例如线程A插入数据x , 获取到当前的尾节点 tail ,此时线程 B插入数据y ,将尾节点更新为为值为y的节点 ,而后线程 A 又将旧的尾节点更新,这就导致丢失了线程B插入的节点 y

  3. 在多线程并发访问的情况下这是完全有可能发生的,解决这个问题可以直接在入队和出队方法加锁,例如使用synchronized 关键字控制对出入队方法的访问,这样可以实现并发控制,但是在软件工程里面,效率也是一个很重要的问题,锁的粒度越大,效率越低;反之亦然。

  4. 无锁队列并不是真正的没有锁,而是使用CAS模拟出了细粒度的锁,控制线程间的共享变量的访问 – 我们在实现并发控制的时候,是没有必要锁住整个对象的 – 这样线程之间就可以安全地通信


实现

  1. 实现思路是在入队和出对方法,对head和tail 节点进行修改的时候,多次通过CAS操作确认当前获取到的是否是正确的 head 和 tail 节点以及它们的next节点,确保插入和更新操作的正确性

  2. 1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108

    public class LockFreeQueue<E extends Comparable<E>> {

    //头尾节点
    public Node<E> head;
    public Node<E> tail;

    LockFreeQueue() {
    }

    //enQueue
    public boolean enqueue(LockFreeQueue<E> queue, Integer value) {
    System.out.print(value+",");
    //队列为空时
    if (head == null) {
    Node<E> inNode = new Node(value);
    inNode.next = null;
    head = inNode;
    tail = inNode;
    return true;
    }
    //新的尾节点
    Node newNode = new Node(value);
    newNode.next = null;
    Node oldP;
    Node next;

    while (true) {
    //获取尾指针快照
    oldP = tail;
    next = oldP.next;

    //判断尾指针是否已经变更
    if (!oldP.equals(tail)) {
    continue;
    }
    //判断next指针是否已经变更,已经变更就重置尾节点
    if (next != null) {
    casNode(tail, oldP, next);
    continue;
    }
    //cas设置尾节点next
    if (oldP.next == next) {
    oldP.next = newNode;
    break;
    }
    }
    //cas设置尾节点
    if (tail.equals(oldP)) {
    tail = newNode;
    }
    return true;
    }

    public boolean isNull() {
    return head == null;
    }

    //CAS 操作更新节点值
    boolean casNode(Node<E> node, Node<E> oldVal, Node<E> newVal) {
    if (node == oldVal || node.equals(oldVal)) {
    node = newVal;
    return true;
    }
    return false;
    }

    //并发的deDueue
    public E deQueue() {

    E out;
    //并发获取头节点
    Node<E> oldHead = head;
    Node<E> oldTail = tail;
    Node<E> next = oldHead.next;

    while (true) {
    //什么样的情况下认为head指针已经被移动了?
    if (oldHead != head) {
    continue;
    }
    //判断队列已空
    if (oldHead == tail && next == null) {
    out = head.value;
    head = null;
    return out;
    }
    //判断head指针是否先于tail指针移动过
    if (oldHead == tail && next != null) {
    //尾指针没有移动到正确的位置
    if(tail == oldTail){
    tail = next;
    }
    continue;
    }

    //正确获得了队列的快照
    if(head == oldHead){
    head = next;
    out = oldHead.value;
    break;
    }
    }
    //释放内存
    oldHead = null;
    return out;
    }
    }
  3. 实现得很难看,尤其是deQueue 的方法和casNode,所以有的地方改成了简单的 if 判断实现CAS操作

  4. 先来回忆一下插入节点的操作,声明新的尾节点,获得当前尾节点,更新当前尾节点的next,更新队列尾节点为新插入的节点,所以并发访问队列入队的时候有这样几种情况:

    1. 线程A获得尾节点之后,进行插入数据之前,尾节点已经被B线程更新了
    2. 线程A获得正确的尾节点之后,插入数据之前,next节点被B线程更新了
  5. 对于上述的两种情况:

    1. 线程A获取尾节点之后,判断当前的尾节点快照是否是队列的尾节点,否的话使用CAS操作进行更新并重新循环
    2. 线程A获取正确的尾节点之后,next节点并不指向null,因为线程B刚好更新了尾节点,CAS操作更新尾节点重新循环
  6. 综上所述:并发访问入队方法,要保证每次插入新节点时,都是在正确的尾节点的基础上进行更新的,出队方法也类似

  7. 队列中出队的过程是:获取当前head节点,保存head节点的值,将 队列的head指针更新为 head.next ,head节点置空,同样并发访问出队方法会有下面这几种情况

    1. 线程A获取head节点之后,出队之前head节点被更新(后移)了
    2. 线程A获取正确的 head 节点之后,出队之前,head节点先于tail节点移动了(会出现在只有一个节点的场景中,因为此时 head == tail ),需要更新tail节点
  8. 其实相比处理入队的情况,出队似乎要简单一点;

    1. head节点被更新的情况下,需要重新循环更新head节点
    2. head节点如果先于tail节点移动,只需要将tail节点更新到正确的位置
  9. 最后就是更新队列的 head 节点为 head.next,并将旧的head节点置空


优化

  1. 在队列使用中,声明一个dummy节点,可以省去一些边界条件的判断

  2. 对代码进行优化:

  3. 1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106

    public class LockFreeQueue<E extends Comparable<E>> {


    public Node<E> head;
    public Node<E> tail;

    LockFreeQueue() {

    Node<E> dummy = new Node<>();
    dummy.next = null;
    head = tail = dummy;

    }

    //enQueue
    public boolean enqueue(LockFreeQueue<E> queue, Integer value) {
    System.out.print(value + ",");
    //新的尾节点
    Node newNode = new Node(value);
    newNode.next = null;
    Node oldP;
    Node next;

    while (true) {
    //获取尾指针快照
    oldP = tail;
    next = oldP.next;

    //判断尾指针是否已经变更
    if (!oldP.equals(tail)) {
    continue;
    }

    //判断next指针是否已经变更,已经变更就重置尾节点
    if (next != null) {
    casNode(tail, oldP, next);
    continue;
    }
    //cas设置尾节点next
    if (oldP.next == next) {
    oldP.next = newNode;
    break;
    }
    }
    //cas设置尾节点
    if (tail.equals(oldP)) {
    tail = newNode;
    }
    return true;
    }

    public boolean isNull() {
    //判断队列为空的条件变了,当head.next == null时意味着没有任何节点存在
    return head.next == null;
    }

    //CAS 操作更新节点值
    boolean casNode(Node<E> node, Node<E> oldVal, Node<E> newVal) {
    if (node == oldVal || node.equals(oldVal)) {
    node = newVal;
    return true;
    }
    return false;
    }

    //并发的deDueue
    public E deQueue() {

    E out;
    //并发获取头节点
    Node<E> oldHead = head;
    Node<E> oldTail = tail;
    Node<E> next = oldHead.next;

    while (true) {
    //什么样的情况下认为head指针已经被移动了?
    if (oldHead != head) {
    continue;
    }

    //判断队列已空
    if (oldHead == tail && next == null) {
    return null;
    }
    //判断head指针是否先于tail指针移动过
    if (oldHead == tail && next != null) {
    //尾指针没有移动到正确的位置
    if (tail == oldTail) {
    tail = next;
    }
    continue;
    }

    //正确获得了队列的快照
    if (head == oldHead) {
    head = next;
    //因为head节点是dummy节点,不参与队列的操作,所以出队的是head.next
    out = next.value;
    break;
    }
    }
    oldHead = null;
    return out;
    }
    }
  4. 主要是插入和删除节点的逻辑改变了,插入时不再判断是否空队列,删除节点需要讲将head.next 出队而不是 head本身


基于环形数组实现的无锁队列

  1. 之前在学习数组的时候,也见到有描述说“基于环形数组可以实现高效的并发队列”,先抛开并发队列,理解清楚环形数组的概念

  2. **Ring Buffer **:环形数组

    1. 一种通过单个固定长度的缓冲过去实现的端到端相连的数据结构,基于结构特性适合用来做数据流缓冲

    2. 这是抽象出来的缓冲区模型,现实中的计算机物理内存缓冲区并没有这样的实现,只是它的概念表示类似于一个环状

    3. 它长得像这样:

  3. 特点和适用场景

    1. 我们知道一般的线性数组,每次有数据被移除时,都需要进行数组元素的移动以确保数组内存空间连续;而环形数组不需要考虑这一点,它没有固定的头和尾,它的头和尾元素是随着读写缓冲区的指针而变化的,下面是从WIKI上面复制过来的动图:

    2. 蓝色是读指针,红色是写指针,环形缓冲区的数据存储在 read ~ write 之间,根据FIFO的特点:

      1. 从head出读取数据时,head后移, 缓冲区数据范围变成了 head +1 ~ tail 的区域
      2. 向缓冲区插入一个数据后,缓冲区数据存储范围是 head + 1 ~ tail + 1 ;
      3. 删除和新增数据都不需要进行数据元素的移动
      4. 随着读写的交叉进行,环形缓冲区中的数据存储区域是在游动的,所以说环形缓冲没有固定的head和tail元素,它们随着读写情况不断发生变化
  4. 实现

    1. 通过 java 代码实现一个环形数组

    2. 1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67

      public class RingBuffer {


      /**
      * 环状数组:
      * 1. 通过四个指针或者两个指针加两个整型计数来实现、
      * 2. 当 start 指针等于 end 指针的时候,环型数组为空
      * 3. 满的判断条件是 end + 1 = start
      */

      private Integer start;
      private Integer end;
      private int length;
      private Integer[] array;
      private int validData = 0;

      public RingBuffer(int n){
      this.length = n;
      this.array = new Integer[length];
      start = 0;
      end = 0;
      }

      public void getByIndex(){
      int k = start;
      while(k!=end){
      System.out.print(array[k]+",");
      k=(k+1)%length;
      }
      }

      //判断数组为空
      public boolean isEmpty(){
      return validData == 0 || start.equals(end);
      }

      //判断数组为满
      public boolean isFull(){
      return validData == length || (end+1) == start;
      }

      //插入数据
      public boolean insert(int value){

      if(isFull()){
      return false;
      }
      array[end] = value;
      end = (end+1)%length;
      validData++;
      return true;
      }

      //删除数据
      public int del(){

      if(isEmpty()){
      throw new NullPointerException("数组为空!");
      }
      int val = array[start];
      start = (start+1)%length;
      validData--;
      return val;
      }

      }
    3. 主要是移动start和end指针的方式,插入数据时,end指针不是单纯地 +1 ,因为环状数组从哪个位置开始插入都是无所谓的;删除数据也同理,在环状数组中,指针地址的递增和递减公式如下:

      1
      2
      3
      >  increment_address_one = (address + 1) % Length 
      > decrement_address_one = (address + Length - 1) % Length
      >

      指针地址递增的情况:插入数据时的end指针删除数据时的start指针,都是向后移动

      指针地址递减的情况:如果环状数组不从start位置删除,而是从end位置删除(栈结构),那么end 位置就得向前递减

  5. 基于环状数组的无锁队列实现

    1. 没有写比较简洁的CAS方法,用很多if判断代替了:

    2. 1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76
      77
      78
      79
      80
      81
      82
      83
      84
      85
      86
      87
      88
      89
      90
      91
      92
      93
      94
      95
      96
      97
      98
      99
      100
      101
      102
      103
      104
      105
      106
      107
      108
      109
      110
      111
      112
      113
      114
      115
      116
      117
      118
      119
      120
      121
      122
      123
      124
      125
      126
      127
      128
      129
      130
      131
      132
      133
      134
      135
      136
      137
      138
      139
      140
      141
      142
      143
      144
      145
      146
      147
      148
      149
      150
      151
      152
      153
      154
      155
      156
      157
      158
      159
      160
      161
      162
      163

      public class LockFreeQueueWithRingBuffer {

      /**
      * 基于环形数组实现的无锁队列
      */
      private Integer start;
      private Integer end;
      private int length;
      private Integer[] array;
      private int validData = 0;


      public LockFreeQueueWithRingBuffer(int n) {
      this.length = n;
      this.array = new Integer[length];
      start = 0;
      end = 0;
      }

      //这是顺序输出数组内容的方法
      public synchronized void getByIndexNew() {
      int k = start;
      int count = 0;
      while (count != validData) {
      System.out.print(array[k] + ",");
      k = (k + 1) % length;
      count++;
      }
      }

      //判断数组为空,将判断条件变得简单一点
      public boolean isEmpty() {
      return validData == 0 || array[start] == null;
      }

      //判断数组为满,只在enQueue方法中使用
      public boolean isFull() {
      return validData == length - 1 || ((end + 1) % length) == start;
      }


      //插入数据
      public boolean enQueueWhenNotFull(int value) {

      // 队列满时拒绝写入新数据
      if (isFull()) {
      return false;
      }
      //cas操作获取尾节点
      while (true) {
      //获取当前尾节点
      int oldEnd = end;
      int oldValid = validData;
      //空数组
      if (isEmpty() && array[oldEnd] == null) {

      if (null == array[end] && validData == 0) {
      array[oldEnd] = value;
      validData += 1;
      end = (end + 1) % length;
      return true;
      }

      } else if (!isEmpty() && null != array[end]) {
      continue;
      }
      //cas判断
      if (end != (oldEnd + 1) % length) {
      array[oldEnd] = value;
      if (end != (oldEnd + 1) % length) {
      end = (end + 1) % length;
      } else {
      continue;
      }
      if (oldValid + 1 != validData) {
      validData++;
      }
      break;
      } else {
      continue;
      }
      }
      return true;
      }


      //不考虑数组是否写满,一直持续写入数据会同时移动 start 和 end 指针
      public boolean enQueueWhenFull(int value) {
      //覆盖旧数据的方式是同时移动end和start指针,
      //确保从 start ~ end 为止的范围内存储的数据是顺序写入的
      //这种方式默认允许未读的已写入数据丢失
      // 缓冲区A[0~6]写入 0~6 ,start = 0,end = 6;继续写入,start = 1,end = 0
      //cas操作获取尾节点
      while (true) {
      //获取当前尾节点
      int oldEnd = end;
      //空数组
      if (isEmpty() && array[oldEnd] == null) {
      if (null == array[end] && validData == 0) {
      array[oldEnd] = value;
      validData += 1;
      end = (end + 1) % length;
      return true;
      }
      }

      //cas判断
      if (end != (oldEnd + 1) % length) {
      //写满了数组,先移动start指针
      if (end.equals(start)) {
      start = (start + 1) % length;
      }

      //尾指针还没有被移动
      if (array[end] == null || !array[end].equals(value)) {
      array[oldEnd] = value;
      end = (end + 1) % length;
      } else {
      continue;
      }

      if (length != validData) {
      validData++;
      }
      break;
      } else {
      continue;
      }
      }
      return true;
      }


      //出队
      public int deQueue() {

      if (isEmpty()) {
      throw new NullPointerException("数组为空!");
      }

      //判断队空
      while (true) {
      int oldStart = start;
      int oldValid = validData;
      Integer val = array[start];
      if (!isEmpty() && null == val) {
      continue;
      }
      //判断start指针是否移动
      if (oldStart == (start + 1) % length) {
      continue;
      }
      //判断start指针的是否移动
      if (val.equals(array[start]) && oldValid == validData) {
      array[start] = null;
      start = (start + 1) % length;
      oldValid--;
      return val;
      }
      }
      }
      }
    3. 这里实现了两个入队方法:enQueueWhenNotFull() 和 enQueueWhenFull,前者只会在数组未满时允许写入,后者不考虑数组是否已满,数组已满可以继续写入覆盖start指针位置,然后移动 start 指针


小结:

  1. 无锁并不是没有加锁,不加锁是无法实现并发控制的,这里的无锁只是频繁通过CAS确定共享变量是否已经被修改,也是一种细粒度的轻量级( 相对于给对象加锁 )锁机制
  2. 环状数组的结构比较神奇,将物理内存抽象成环状结构,应用在数据流读写的场景中,只要读写的速度持平,可以在不扩容的情况下一直进行高效的读写操作;因为 start 和 end 指针是随着读写情况变化的,任意时刻的读只需要确保start ~ end 范围的数据的是未覆盖的数据就行