¶队列
¶概述:
- 队列也是基于最基础的数组或者链表实现的一种数据结构,它类比生活中的例子就是排队结账的场景:排在前面的人先结账离开,需要结账的人在最后面排队,是一种先进先出,头出尾入的数据结构,和栈一样属于一种操作受限的结构
- 从头出尾入这个特点,可能很容易联想到
生产-消费者
模型:- 生产者产生实例从尾部入队
- 消费者从队列头部依次获取实例进行消费
- 再考虑一下阻塞队列的概念:基于1,2的基础之上
- 当实例队列已满,生产者停止生产,直到接收到生产队列非满的通知,才继续进行生产入队
- 当实例队列为空,消费者停止获取实例的动作,直到接收到队列非空的通知,再继续获取实例消费
¶队列实现:
和栈比较类似,基于数组实现的队列叫顺序队列,是一种有界队列;
而基于链表实现的队列叫链式队列,默认实现是无界队列;
前者在资源控制上应用比较广泛,例如构造线程池的ArrayBlockingQueue(有界阻塞队列)
顺序队列:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71/**
* 基于数组实现的顺序队列
*/
static class ArrayQueue {
private String[] items;
private int length;
private int tail = 0;
private int head = 0;
ArrayQueue(int length) {
this.length = length;
this.items = new String[this.length];
}
private boolean isFull() {
if (this.tail >= this.length) {
return true;
}
return false;
}
//enQueue
public boolean enQueue(String item) {
if (null != items && !isFull()) {
items[tail] = item;
tail++;
return true;
}
return false;
}
//enQueueNew,deQueue时不立刻进行数据的移动操作,
//而是改变头指针的位置,当enQueue容量不足时再一次性移动数据
public boolean enQueueNew(String item) {
if(isFull()){
// 队列已满,但是head不指向头部的时候,进行0~head位置的填充
// 将head~tail之间的元素移动到0~head之间
int i=head;
for(;head<tail;){
//这个head-i是我没有想到的,比使用i=0;i<tail-head;要巧妙
items[head-i] = items[head];
head++;
}
tail -=i;
head = 0;
}
if (null != items && !isFull()) {
items[tail] = item;
tail++;
return true;
}
return false;
}
//deQueue
public String deQueue() {
String result = "";
if(head == tail){
return null;
}
if (null != this.items) {
result = items[head];
head++;
}
return result;
}
}- 最开始实现的出队方法(deQueue)是每次将head出队之后,进行一次head~tail元素的整体向前移动,这也是数组为了维护内存空间连续性的操作;但是很明显这个方法比较笨,会导致每一次的deQueue操作都变成O(n)级别复杂度;
- 之前在数组那一部分有说过,对于需要频繁移动数组内元素的场景,可以先进行元素标记,当数组空间不足时在进行一次整体的元素移动操作,
- enQueueNew方法在入队时进行判断,当head与tail在同一个位置,表示队列已满;
- 进行一次整体的元素移动,将 head ~ tail 间的元素移动到 0 ~ tail-head,循环次数为tail-head
链式队列:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62// Node 类
static class Node<E extends Comparable> {
E value = null;
Node next;
public Node() {
}
public Node(E value) {
this.value = value;
}
public boolean compare(Node<E> that){
return this.value.compareTo(that.value)>=0;
}
public boolean equals(Object obj) {
return this.value.equals(((Node<E>)obj).value);
}
}
static class LinkQueue<E extends Comparable>{
//这里没有指定容量,默认实现为无界队列
private Node<E> head;
private Node<E> tail;
LinkQueue(){
}
//enQueue
public boolean enqueue(E value){
//队列为空时
if(head == null){
head = new Node(value);
head.next=null;
tail=head;
return true;
}
Node newNode = new Node(value);
tail.next= newNode;
tail=tail.next;
return true;
}
//dequeue
public E deQueue(){
Node<E> out;
//需要判断队列为空
if(head.next == tail.next){
return null;
}
//头节点出
out = head;
head = head.next;
return out.value;
}
}- 因为队列不存在随机访问这种需求,链表的实现方式看起来很顺畅,出队,入队操作比较简便,如果再加上容量限制,整体的时间复杂度是低于顺序队列
循环队列:
概述:
正常的队列是线形,从头至尾一线排列过去。而循环队列则是个环状–队满时,tail的位置为空,下一个位置指向head;
我们可以联想一下循环链表在循环链表中tail.next=head,将头节点和尾节点连接起来了,这个判断条件是很明确的
但是判断循环队列已满的条件不是这样的,因为出队在头部,head的位置会变化,而 tail 的位置可以是在 tail ~ head 之间
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39static class CircleQueue{
//数组实现
private String[] items;
private int head;
private int tail;
private int length;
CircleQueue(int n){
this.items=new String[n];
this.head=0;
this.tail=0;
}
//入队
public boolean enQueue(String item){
//判断队满
if((tail+1)%length==head){
return false;
}
items[tail]=item;
//计算tail位置
tail = (tail+1)%length;
return true;
}
//出队
public String deQueue(){
if(head==tail){
return null;
}
String result = items[head];
//和判断队满类似,head始终<=n
head=(head+1)%n;
return result;
}
}- 判断队满分两种情况:1. head>0,tail>=0 ; 2. tail = n-1 ,head =0
- n 长的队列,当 head>0 时,tail +1 = head 的位置,此时的 head 位置为 head%n ,因为 head<n ,所以求余都是它本身
- 当 tail =n-1 ,head = 0时,根据上面的公式 tail +1 = n , head = head%n; head = (tail+1)%n;
- 因此判断队满的条件是 (tail+1)%n = head ;
- 判断队满分两种情况:1. head>0,tail>=0 ; 2. tail = n-1 ,head =0
¶阻塞队列
概念:
- 基于生产-消费者模式,当队空时,消费者等待队满信号再去获取队里数据消费;当队满时,生产者等待队空信号,再继续生产入队
- 简单化的场景是单个生产者服务于单个消费者,存在检测队列是否满条件来通知生产者暂停生产;也存在检测队列是否为空的条件通知消费者暂停消费,他们等待信号通知,继续进行各自的动作
实现:
轮询:
- 首先想到的是简单的轮询:空队列时消费者等待一定时间;满队列时生产者等待一定时间,但这个不知道算不算阻塞队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165public class BlockingQueue {
private String[] items;
private int length;
private int head;
private int tail;
private boolean isFull = false;
private boolean isEmpty = true;
BlockingQueue(int n) {
this.length = n;
this.items = new String[n];
}
private boolean isEmpty() {
return isEmpty;
}
private boolean isFull() {
return isFull;
}
//enQueue
public boolean enQueue(String item) {
if (tail < length) {
items[tail] = item;
tail++;
isEmpty=false;
return true;
}
isFull = true;
isEmpty =false;
return false;
}
//enQueueNew,deQueue时不立刻进行数据的移动操作,
//而是改变头指针的位置,当enQueue容量不足时再一次性移动数据
public boolean enQueueNew(String item) {
if (isFull()) {
// 队列已满,但是head不指向头部的时候,进行0~head位置的填充
// 将head~tail之间的元素移动到0~head之间
int i = head;
for (; head < tail; ) {
//这个head-i是我没有想到的,比使用i=0;i<tail-head;要巧妙
items[head - i] = items[head];
head++;
}
tail -= i;
head = 0;
}
if (tail < length) {
items[tail] = item;
tail++;
//因为时刻都被消费着,所以每次入队成功都需要将 isEmpty 置为 false;
isEmpty=false;
return true;
}
//队列已满的状态下 isEmpty 也是 false;
isEmpty=false;
return false;
}
//deQueue
public String deQueue() {
String result = "";
if (head == tail) {
isEmpty = true;
head=0;
return null;
}
if (null != this.items) {
result = items[head];
head++;
//成功出队则 isFull为false
isFull=false;
}
//同上
isFull = false;
return result;
}
public static void main(String[] args) {
BlockingQueue queue = new BlockingQueue(20);
Thread producer = new Thread(() -> {
while (true) {
if (!queue.isFull()) {
String str = new Date().toString();
queue.enQueueNew(str);
System.out.println("生产 : " + str);
try {
Thread.sleep(1000);
}catch (Exception e){
e.printStackTrace();
}
} else {
System.out.println("队列已满,等待5秒!");
try {
Thread.sleep(5000);
} catch (Exception e) {
System.out.println("生产者线程被中断!");
e.printStackTrace();
}
}
}
}, "producer");
Thread consumer = new Thread(() -> {
while (true) {
if (!queue.isEmpty()) {
String str = queue.deQueue();
System.out.println("消费 : " + str);
try {
Thread.sleep(1500);
}catch (Exception e){
e.printStackTrace();
}
} else {
System.out.println("队列为空,等待5秒!");
try {
Thread.sleep(3000);
} catch (Exception e) {
System.out.println("消费者线程被中断!");
e.printStackTrace();
}
}
}
}, "producer");
producer.start();
consumer.start();
}
}
// 输出结果
Connected to the target VM, address: '127.0.0.1:57923', transport: 'socket'
队列为空,等待5秒!
生产 : Sat May 23 08:59:06 CST 2020
生产 : Sat May 23 08:59:07 CST 2020
生产 : Sat May 23 08:59:08 CST 2020
消费 : Sat May 23 08:59:06 CST 2020
生产 : Sat May 23 08:59:09 CST 2020
生产 : Sat May 23 08:59:10 CST 2020
消费 : Sat May 23 08:59:07 CST 2020
生产 : Sat May 23 08:59:11 CST 2020
消费 : Sat May 23 08:59:08 CST 2020
生产 : Sat May 23 08:59:12 CST 2020
生产 : Sat May 23 08:59:13 CST 2020
消费 : Sat May 23 08:59:09 CST 2020
生产 : Sat May 23 08:59:14 CST 2020
消费 : Sat May 23 08:59:10 CST 2020
生产 : Sat May 23 08:59:15 CST 2020
生产 : Sat May 23 08:59:16 CST 2020
消费 : Sat May 23 08:59:11 CST 2020
生产 : Sat May 23 08:59:17 CST 2020
消费 : Sat May 23 08:59:12 CST 2020
生产 : Sat May 23 08:59:18 CST 2020
生产 : Sat May 23 08:59:19 CST 2020这种基于轮询的方式实现的阻塞队列,对于 isFull 和 isEmpty 的状态更新要比较小心,否则会造成其中一端的无故等待甚至永久轮询
实现方式比较粗糙,这里线程间是通过修改共享变量进行通信,更加合适的方式应该是
生产者生产直到队列满,暂停生产,等待队列不满的信号
消费者消费,直到队空,等待队列不空的信号
通过对象的锁来实现通信
对象锁:
通过竞争对象的锁来进行各自的生产和消费
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
public class BlockingQueue {
/**
* 基于数组实现的阻塞队列
*/
private String[] items;
private int length;
private int head;
private int tail;
private boolean isFull = false;
private boolean isEmpty = true;
BlockingQueue(int n) {
this.length = n;
this.items = new String[n];
}
private boolean isEmpty() {
return isEmpty;
}
private boolean isFull() {
return isFull;
}
//enQueue
public boolean enQueue(String item) {
if (tail < length) {
items[tail] = item;
tail++;
isEmpty = false;
return true;
}
isFull = true;
isEmpty = false;
return false;
}
public synchronized boolean enQueueNew(String item) throws InterruptedException {
while (isFull()) {
// 队列已满,但是head不指向头部的时候,进行0~head位置的填充
// 将head~tail之间的元素移动到0~head之间
int i = head;
for (; head < tail; ) {
items[head - i] = items[head];
head++;
}
tail = tail > head ? tail -= i : i - tail;
head = 0;
wait();
}
if (tail < length) {
items[tail] = item;
tail++;
isEmpty = false;
notifyAll();
return true;
}
isEmpty = false;
notifyAll();
return false;
}
//deQueue
public synchronized String deQueue() throws InterruptedException {
while (isEmpty) {
wait();
}
String result = "";
if (head < length) {
result = items[head];
head++;
isFull = false;
}else{
isFull=true;
}
notifyAll();
return result;
}
public static void main(String[] args) {
BlockingQueue queue = new BlockingQueue(20);
Thread producer = new Thread(() -> {
while (true) {
try {
System.out.println("生产者线程被唤醒----------------");
String str = new Date().toString();
queue.enQueueNew(str);
System.out.println("生产 : " + str);
Thread.sleep(500);
} catch (Exception e) {
e.printStackTrace();
}
}
}, "producer");
Thread consumer = new Thread(() -> {
while (true) {
try {
String str = queue.deQueue();
System.out.println("消费者0消费------------- : " + str);
Thread.sleep(800);
} catch (Exception e) {
if (e instanceof InterruptedException) {
System.out.println("消费者被唤醒-----------------");
} else {
e.printStackTrace();
}
}
}
}, "producer");
Thread consumer1 = new Thread(() -> {
while (true) {
try {
String str = queue.deQueue();
System.out.println("消费者1消费------------------------- : " + str);
Thread.sleep(600);
} catch (Exception e) {
if (e instanceof InterruptedException) {
System.out.println("消费者被唤醒-----------------");
} else {
e.printStackTrace();
}
}
}
}, "producer");
producer.start();
consumer.start();
consumer1.start();
}
}
// 运行结果
生产者线程被唤醒----------------
消费者1消费------------------------- : Sat May 23 10:28:24 CST 2020
生产 : Sat May 23 10:28:24 CST 2020
消费者0消费------------- : null
生产者线程被唤醒----------------
生产 : Sat May 23 10:28:25 CST 2020
消费者1消费------------------------- : null
消费者0消费------------- : null
生产者线程被唤醒----------------
生产 : Sat May 23 10:28:25 CST 2020
消费者1消费------------------------- : null
生产者线程被唤醒----------------
生产 : Sat May 23 10:28:26 CST 2020
消费者0消费------------- : null
消费者1消费------------------------- : null
生产者线程被唤醒----------------
生产 : Sat May 23 10:28:26 CST 2020
消费者1消费------------------------- : null
消费者0消费------------- : null
生产者线程被唤醒----------------
生产 : Sat May 23 10:28:27 CST 2020
消费者1消费------------------------- : null
生产者线程被唤醒----------------
生产 : Sat May 23 10:28:27 CST 2020
消费者0消费------------- : null
生产者线程被唤醒----------------
生产 : Sat May 23 10:28:28 CST 2020
消费者1消费------------------------- : null
生产者线程被唤醒----------------
消费者0消费------------- : null
生产 : Sat May 23 10:28:28 CST 2020
消费者1消费------------------------- : null
生产者线程被唤醒----------------
生产 : Sat May 23 10:28:29 CST 2020
消费者1消费------------------------- : null
消费者0消费------------- : null
生产者线程被唤醒----------------
生产 : Sat May 23 10:28:29 CST 2020
消费者1消费------------------------- : null
生产者线程被唤醒----------------
生产 : Sat May 23 10:28:30 CST 2020
消费者0消费------------- : null
消费者1消费------------------------- : null
生产者线程被唤醒----------------
生产 : Sat May 23 10:28:30 CST 2020
消费者0消费------------- : null
生产者线程被唤醒----------------
生产 : Sat May 23 10:28:31 CST 2020
消费者1消费------------------------- :
生产者线程被唤醒----------------
消费者1消费------------------------- : Sat May 23 10:28:24 CST 2020
生产 : Sat May 23 10:28:31 CST 2020
消费者0消费------------- : Sat May 23 10:28:25 CST 2020
生产者线程被唤醒----------------
生产 : Sat May 23 10:28:32 CST 2020
消费者1消费------------------------- : Sat May 23 10:28:25 CST 2020
消费者0消费------------- : Sat May 23 10:28:26 CST 2020
生产者线程被唤醒----------------
生产 : Sat May 23 10:28:33 CST 2020
消费者1消费------------------------- : Sat May 23 10:28:26 CST 2020
生产者线程被唤醒----------------
生产 : Sat May 23 10:28:33 CST 2020
消费者0消费------------- : Sat May 23 10:28:27 CST 2020
消费者1消费------------------------- : Sat May 23 10:28:31 CST 2020
生产者线程被唤醒----------------
生产 : Sat May 23 10:28:34 CST 2020
消费者1消费------------------------- : Sat May 23 10:28:32 CST 2020
消费者0消费------------- : Sat May 23 10:28:33 CST 2020
生产者线程被唤醒----------------
生产 : Sat May 23 10:28:34 CST 2020
生产者线程被唤醒----------------
消费者1消费------------------------- : Sat May 23 10:28:33 CST 2020
生产 : Sat May 23 10:28:35 CST 2020可以看出来还是有问题的:最开始的一小段时间消费者没有消费到任何内容,通过断点发现 head > tail 且 head <length ;所以一直消费空内容
生产者每 500 ms 生产一次,消费0 800ms 消费一次,消费者1 600ms 消费一次,两个消费者导致 head 的递增速度超过了tail ,因此需要更改 deQueue() 方法的判断条件和 enQueue()方法的判断条件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44//enQueue判断队满的条件变更为当head指针超过了tail指针,也需要进行元素的移动
//这里的条件不能是 head>=tail,因为初始化的时候 head ==tail,会死循环
while (isFull() || head>tail) {
//因为 head > tail ,所以起始位置需要变化
int i = head>tail ? head=tail: head;
for (; head < tail; ) {
items[head - i] = items[head];
head++;
}
//同上,不然tail可能会是负值
tail = tail > head ? tail -= i : i - tail;
head = 0;
wait();
}
//deQueue ,当head > tail 不再进行消费
while (isEmpty || head >=tail) {
wait();
}
//输出内容
Sat May 23 10:49:18 CST 2020---------------消费者1消费------------ : Sat May 23 10:49:18 CST 2020
生产 : Sat May 23 10:49:18 CST 2020
Sat May 23 10:49:18 CST 2020-----------消费者0消费------------- : Sat May 23 10:49:18 CST 2020
生产 : Sat May 23 10:49:18 CST 2020
Sat May 23 10:49:19 CST 2020---------------消费者1消费------------ : Sat May 23 10:49:19 CST 2020
生产 : Sat May 23 10:49:19 CST 2020
生产 : Sat May 23 10:49:19 CST 2020
Sat May 23 10:49:19 CST 2020-----------消费者0消费------------- : Sat May 23 10:49:19 CST 2020
生产 : Sat May 23 10:49:20 CST 2020
Sat May 23 10:49:20 CST 2020---------------消费者1消费------------ : Sat May 23 10:49:20 CST 2020
生产 : Sat May 23 10:49:20 CST 2020
Sat May 23 10:49:20 CST 2020-----------消费者0消费------------- : Sat May 23 10:49:20 CST 2020
生产 : Sat May 23 10:49:21 CST 2020
Sat May 23 10:49:21 CST 2020---------------消费者1消费------------ : Sat May 23 10:49:21 CST 2020
生产 : Sat May 23 10:49:21 CST 2020
Sat May 23 10:49:21 CST 2020-----------消费者0消费------------- : Sat May 23 10:49:21 CST 2020
生产 : Sat May 23 10:49:22 CST 2020
Sat May 23 10:49:22 CST 2020---------------消费者1消费------------ : Sat May 23 10:49:22 CST 2020
生产 : Sat May 23 10:49:22 CST 2020
Sat May 23 10:49:22 CST 2020-----------消费者0消费------------- : Sat May 23 10:49:22 CST 2020
生产 : Sat May 23 10:49:23 CST 2020
Sat May 23 10:49:23 CST 2020---------------消费者1消费------------ : Sat May 23 10:49:23 CST 2020漂亮!-
¶小结:
- 最有价值的部分大概是阻塞队列的实现,也让我知道了不能忘记锁是对象的锁,因最开始我试着用线程去控制锁,犯了低级错误
- 这个生产者-消费者模型实现并不完美,条件控制应该还存在不足,而且代码并不美观,需要改进
- 之后尝试通过数组实现的循环队列去实现一个并发队列