Condition、CountDownLatch、CyclicBarrier 和 Semaphore


这周我们啃完 AQS。


Condition

Condition 类可以看做 Java 重新实现了一遍 object.wait()object.notify(),使用起来两者的思路是一致的,都是在执行 wait/await 或者 notify/signal 方法前需要获取锁,调用 wait/await 会将锁打开,并将线程包装存起来,调用 notify/signal 会将线程唤起,一直等到它获取到锁,然后继续运行。

看 Condition 类的源码能够更加理解这套使用思路的原因。

下面是 Condition 的代码,首先是 await() 方法(只分析了无参方法)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
/* —————————————————————— 主方法 —————————————————————— */

public final void await() throws InterruptedException {
// 线程中断,抛异常出去
if (Thread.interrupted())
throw new InterruptedException();
// 将本线程包装起来,放入 condition 队列的末尾
Node node = addConditionWaiter();
// 解锁,记录解锁前的 AQS 状态
int savedState = fullyRelease(node);
// 中断状态,这里要联合下面才能理解,总体上有三种值:
// 0:没有中断
// THROW_IE(-1):发生中断,且在 signal 前
// REINTERRUPT(1):发生中断,且在 signal 后
int interruptMode = 0;
// 如果不在 AQS 阻塞队列
while (!isOnSyncQueue(node)) {
// 挂起线程
LockSupport.park(this);

// 线程被唤醒了,有以下几种可能
// 1.(正常情况)signal 使自己进入 AQS 阻塞队列,等到前置节点执行完毕,唤醒自己
// 2. signal 时,前置节点取消等待
// 3. signal 时,CAS 设置前置节点的状态,从 0 修改为 -1 失败
// 4. 中断
// 5. 假唤醒
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 从 while 出来时,已经确保节点在 AQS 阻塞队列中了
// 判断是否需要阻塞(判断的时候抢一次锁),即判断是否发生了中断
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
// 如果中断不是 signal 前发生,那么设为 signal 后发生
interruptMode = REINTERRUPT;
// 如果中断发生在 signal 前,没有断开 condition 条件队列,那么断开后继节点
if (node.nextWaiter != null)
unlinkCancelledWaiters();
// 如果发生中断了,那么根据情况处理
if (interruptMode != 0)
// 中断在 signal 前,抛出中断异常
// 中断在 signal 后,再次中断线程
reportInterruptAfterWait(interruptMode);
}

/* ———————————————————————————————————————————————— */








/* —————————————————— addConditionWaiter —————————————————— */

/**
* 将当前线程包装成结点,插入到 condition 队列中
*/
private Node addConditionWaiter() {
// 获取 condition 队列尾结点
Node t = lastWaiter;

// 如果尾结点失效了,清除所有失效结点
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}

// 将当前线程包成结点
Node node = new Node(Thread.currentThread(), Node.CONDITION);

// 如果尾结点为空,代表没有 condition 队列为空,设置头结点
if (t == null)
firstWaiter = node;
// 否则插入尾结点后
else
t.nextWaiter = node;

// 重新设置尾结点
lastWaiter = node;
return node;
}

/**
* 清除所有失效结点
*/
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
} else
trail = t;
t = next;
}
}

/* ———————————————————————————————————————————————— */








/* —————————————————— fullyRelease —————————————————— */

/**
* 解开当前线程的所有锁
*/
final int fullyRelease(Node node) {
// failed 记录的是,有没有解锁失败
boolean failed = true;
try {
// 记录当前 AQS 状态(本线程上了几把重入锁,state就是几)
int savedState = getState();
// 解锁,并唤起 AQS 后继节点
if (release(savedState)) {
failed = false;
return savedState;
} else {
// 调用 condition 的方法,必然需要先拿到锁,
// 执行到这里,应该能够把一开始拿到的锁全部解开,
// 但是没有,说明出现了异常
throw new IllegalMonitorStateException();
}
} finally {
// 如果解锁失败(抛出来异常),那么本 condition 节点取消
if (failed)
node.waitStatus = Node.CANCELLED;
}
}

/**
* 本线程全解锁,并唤醒 AQS 的后继线程
*/
public final boolean release(int arg) {
// 尝试本线程解锁
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 唤起 AQS 后继节点
unparkSuccessor(h);
return true;
}
return false;
}

/**
* 解锁成功以后,唤醒 AQS 下一个节点
*/
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}

/* ———————————————————————————————————————————————— */









/* —————————————————— isOnSyncQueue —————————————————— */

/**
* 判断节点是否在 AQS 阻塞队列中
*/
final boolean isOnSyncQueue(Node node) {
// 如果是在 condition 队列中,则不在 AQS 队列里
// 如果 prev 是 null,说明不在 AQS 阻塞队列中(否则会有前置节点)
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
// 如果 next 不是 null,说明在 AQS 阻塞队列中(后继节点)
if (node.next != null)
return true;
// 在 AQS 队列中寻找(从后往前找),返回是否找到
return findNodeFromTail(node);
}

/**
* AQS 队列从后往前找,找到指定节点为止
*/
private boolean findNodeFromTail(Node node) {
// 从最后开始
Node t = tail;
// 一直往前找,找到返回 true,没找到返回 false
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}

/* ———————————————————————————————————————————————— */









/* ——————————— checkInterruptWhileWaiting ——————————— */

/**
* 获取节点的线程状态(没有中断、在signal前中断、在signal后中断)
*/
private int checkInterruptWhileWaiting(Node node) {
// 线程没有中断:返回 0
// 线程中断,是在 signal 之前的,返回 THROW_IE
// 线程中断,是在 signal 之后的,返回 REINTERRUPT
return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;
}

/**
* 线程发生中断,判断是在signal前还是后
* true:在signal之前 false:在signal之后
*/
final boolean transferAfterCancelledWait(Node node) {
// CAS 修改状态,从 -2 改为 0
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
// 如果改成功了,说明原来 -2,中断发生在 signal 之前
// 加入阻塞队列(也就是说,中断也会加进阻塞队列)
enq(node);
return true;
}
// 执行到这里,说明中断发生在 signal 之后
// while 一直执行,直到节点加入到 AQS 阻塞队列中(因为有可能 signal 没执行完)
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}

/**
* 判断节点是否在 AQS 阻塞队列中
*/
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null)
return true;
return findNodeFromTail(node);
}

/**
* 从后往前找,判断节点是否在 AQS 队列中
*/
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}

/* ———————————————————————————————————————————————— */









/* ——————————— reportInterruptAfterWait ——————————— */

/**
* 处理中断
* 如果发生在 signal 前,抛异常,如果发生在 signal 后,再次中断
*/
private void reportInterruptAfterWait(int interruptMode) throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}

/**
* 中断
*/
static void selfInterrupt() {
Thread.currentThread().interrupt();
}

/* ———————————————————————————————————————————————— */

然后贴一下 signal() 方法的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
/* —————————————————————— 主方法 —————————————————————— */

public final void signal() {
// 调用 signal() 方法必须首先获取锁
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 获取 condition 队列首个节点
Node first = firstWaiter;
if (first != null)
// 如果非空,唤醒它
doSignal(first);
}

/* ———————————————————————————————————————————————— */








/* —————————————————— isHeldExclusively —————————————————— */

/**
* 是否在持有锁(ReentrantLock的重写实现)
*/
protected final boolean isHeldExclusively() {
// 判断 AQS 的持锁线程是否是当前线程
return getExclusiveOwnerThread() == Thread.currentThread();
}

/* ———————————————————————————————————————————————— */







/* —————————————————— doSignal —————————————————— */

/**
* 唤醒 condition 队列的首个节点
*/
private void doSignal(Node first) {
do {
// 判断 condition 队列是否只有自己,如果是,队列尾节点置空
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
}
// 如果 first 节点取消等待,并且队列不为空,唤醒下一个
while (!transferForSignal(first) && (first = firstWaiter) != null);
}

/**
* 将 condition 队列节点转移到 AQS 队列中
* 如果返回 false,代表节点在 condition 中取消等待
*/
final boolean transferForSignal(Node node) {
// 节点状态从 condition 转为初始化(如果失败说明已经取消,之间返回false)
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 尾插插入 AQS 队列,并拿到 AQS 队列的倒数第二个节点(不达目的 CAS 不罢休)
Node p = enq(node);
// 拿到前置节点的状态
int ws = p.waitStatus;
// 如果前置节点状态 > 0,表示前置节点取消了等待,直接唤醒前一个节点
// 如果前置节点的状态 <= 0,那么把前置节点的状态设为 -1(可以唤醒后继节点)
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
// 如果前置节点取消等待,或者 CAS 设置状态失败,唤醒前置节点
// 这里结合 await() 方法继续看
LockSupport.unpark(node.thread);
return true;
}

/* ———————————————————————————————————————————————— */

过程还是很繁琐的,尤其是 await() 方法,看得我要吐了。

跟上一篇文章一样,本篇也整理了流程图,下图是最简单情况下的 condition 节点变化图(不考虑中断、锁的争抢、且锁是公平锁):

AQS_Condition

顺便附带一下测试上图的代码吧,一共起了四个线程,第一个 await(),第二个 signal(),后面两个打酱油,模拟 AQS 阻塞队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock(true);
Condition condition = lock.newCondition();

// 起 1 个线程,调用 condition.await() 方法
new Thread(() -> {
lock.lock();
try {
condition.await();
} catch (InterruptedException e) {
System.out.println("condition唤醒");
} finally {
lock.unlock();
}
}).start();

// 再起 1 个线程,调用 condition.signal() 方法
new Thread(() -> {
lock.lock();
try {
condition.signal();
} finally {
lock.unlock();
}
}).start();

// 再起 2 个线程,在 AQS 队列里阻塞
for (int i = 0; i < 2; i++) {
new Thread(() -> {
lock.lock();
try {
// 什么也不做
} finally {
lock.unlock();
}
}).start();
}
}

CountDownLatch

CountDownLatch 的意思是 带有倒数功能的栅栏,有两个核心方法:await()countDown(),前者是等待,后者是倒数。

举一个使用的例子:首先创建一个栅栏,然后一堆线程执行了 await() 方法,被堵在了栅栏门口,被迫等待。这个栅栏开始时有一个数字 N,每执行一次 countDown() 方法(任何线程任何时候都可以),栅栏的数字 N 就会减一,一直到 N 被减为 0,栅栏口打开,所有的线程继续执行。

画了一张图,描述了一下 CountDownLatch 的使用场景:

CountDownLatch_diagram

CountDownLatch 的源码相对简单很多:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
/* ——————————————————— 构造方法 ——————————————————— */

public CountDownLatch(int count) {
// count 必须大于 0
if (count < 0) throw new IllegalArgumentException("count < 0");

this.sync = new Sync(count);
}

/* —————————————————————————————————————————————— */








/* ——————————————————— sync ——————————————————— */

/**
* 内部维护一个 AQS 队列,老套路
*/
private static final class Sync extends AbstractQueuedSynchronizer {
/**
* 把 count 数字设置为 AQS 队列的状态
*/
Sync(int count) {
setState(count);
}
// ...(略)
}

/**
* AQS 设置队列状态
*/
protected final void setState(int newState) {
state = newState;
}

/* —————————————————————————————————————————————— */







/* ——————————————————— countDown ——————————————————— */

public void countDown() {
sync.releaseShared(1);
}

/**
* countDown -1,如果减到 0,唤醒 AQS 队列
*/
public final boolean releaseShared(int arg) {
// 试着将 AQS state 减一,并返回是否 countDown 到 0
// 如果已经到 0,不再减一,返回 false
if (tryReleaseShared(arg)) {
// 唤醒 AQS 队列
doReleaseShared();
return true;
}
return false;
}

/**
* 将 AQS 队列的 state 减一
* 重写 AQS 的 tryReleaseShared 方法
*/
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}

/**
* 唤醒第一个需要被唤醒的节点
*/
private void doReleaseShared() {
for (;;) {
// 获取头结点
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
// 如果头结点需要唤醒后继节点
if (ws == Node.SIGNAL) {
// CAS 设置 head 的状态失败,那再来一遍循环
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
// 唤醒 h 的后继节点
unparkSuccessor(h);
}
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)
break;
}
}

/**
* 获取一个节点(头结点),唤醒它的后继节点
*/
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 获取后继节点
Node s = node.next;
// 如果后继节点为 null 或取消,从后往前找到一个需要被唤醒的节点
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 如果找到了一个需要被唤醒的节点,唤醒它
if (s != null)
LockSupport.unpark(s.thread);
}

/* —————————————————————————————————————————————— */








/* ——————————————————— await ——————————————————— */

public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

/**
* 如果 countDown 到 0,直接返回,否则去休眠,等被唤醒
*/
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 首先判断是否已经 countDown 到 0 了,如果已经减到 0,直接返回
if (tryAcquireShared(arg) < 0)
// 如果没减到 0,唤醒自己及之后的整个队列
doAcquireSharedInterruptibly(arg);
}

/**
* 判断 state 是否减到 0
* 如果减到了 0 返回 1,如果没减到 0 返回 -1
*/
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

/**
* countDown 到 0,直接返回,否则挂起线程,直到被唤醒
*/
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
// 创建一个节点
// 将该节点的 nextWaiter 指向 Node.SHARED(空节点)
// 将该节点插入到阻塞队列末尾
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
// 判断是否已经 countDown 到 0
int r = tryAcquireShared(arg);
// 当 countDown 到 0 时,会进入下面的判断体中
if (r >= 0) {
// 唤醒 node 的后继节点(后继节点也会执行到这里,会链式唤醒所有)
setHeadAndPropagate(node, r);
p.next = null;
failed = false;
return;
}
}
// 首次进来,会设置前置节点为 -1,本循环结束
// 再次进来,会判断可以休眠 然后休眠
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
// 如果休眠过程中返回 false,说明发生中断,响应中断异常
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

/**
* 唤醒下一个节点
* 翻译名字:设置头,并传播(propagate)下去
*/
private void setHeadAndPropagate(Node node, int propagate) {
// 保存头结点
Node h = head;
// 重新设置头结点(实际上这个头结点,是某阻塞节点的前置节点)
setHead(node);
// 唤醒下一个节点
if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
// 在 signal 里也是在调用这个方法
doReleaseShared();
}
}

/**
* 把节点设置为头
*/
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}

/* —————————————————————————————————————————————— */

CountDownLatch 的原理是,依旧维护一个 sync 队列(AQS 队列),在 countDown 到 0 之前,所有 await() 的线程都包装成节点,按顺序阻塞在 sync 队列中。当 CountDownLatch 内部的计数器 count 减到 0 时,唤醒头结点(一个空节点),让空节点唤醒后继节点,后继节点醒来再唤醒它的后继,链式地叫醒所有。

画一张简图,描述一下内部队列在阻塞时的数据结构(就不画流程图了):

CountDownLatch_sync

CyclicBarrier

CyclicBarrier 字面上的意思是周期性的栅栏,好像跟 CountDownLatch 相像,但其实还是不太一样的。

CyclicBarrier 的作用是凑齐一波线程就放行一波,否则线程们都等待。比如初始化的时候,设置 CyclicBarrier 为每 10 个线程一组,那么前 9 个线程出现时,都会挡在 CyclicBarrier 的栅栏前,被迫等待,直到第 10 个线程出现,10 个线程一起放行。之后重复这个行为,循环往复。

CountDownLatch 是外部改变栅栏的计数,countDown 一次,栅栏计数减 1,直到减到 0 栅栏打开。而对于 CyclicBarrier,被挡在栅栏外的线程数量是决定因素,凑够线程数,栅栏打开。

CyclicBarrier_diagram

CyclicBarrier 的源码就相对简单很多了,就是对 Condition 的简单使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
/* ————————————————————— 基础信息 ————————————————————— */

/**
* 内部一个锁
*/
private final ReentrantLock lock = new ReentrantLock();

/**
* 内部一个由锁生成的 Condition
*/
private final Condition trip = lock.newCondition();

/**
* 预设线程数,当达到这个数时,栅栏打开
*/
private final int parties;

/**
* 倒数,当该数减到 0 时,栅栏打开,初始化值设为 parties
*/
private int count;

/**
* 每当栅栏将打开时,最后一个抵达的线程会执行这个 Runnable
*/
private final Runnable barrierCommand;

/**
* 代,一代线程,每次 count 到 0 或者 broken 时,将会生成新的一代
*/
private Generation generation = new Generation();

private static class Generation {
/**
* 栅栏是否破裂
*/
boolean broken = false;
}

/* —————————————————————————————————————————————————— */









/* ————————————————————— 构造方法 ————————————————————— */

/**
* 初始化 parties、count、barrierCommand
*/
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}

/* —————————————————————————————————————————————————— */








/* ————————————————————— await ————————————————————— */

public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}

/**
* 主流程
*/
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
final ReentrantLock lock = this.lock;
// 先获取锁
// 如果是最后一个线程,叫醒其他线程
// 如果不是最后一个线程,睡
lock.lock();
try {
final Generation g = generation;

if (g.broken)
throw new BrokenBarrierException();

if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}

int index = --count;
// 最后一个线程会进入这个分支(count减到0)
if (index == 0) {
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
// 由最后一个抵达的线程触发 Runnable
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}

for (;;) {
try {
if (!timed)
// 不带时间等待
trip.await();
else if (nanos > 0L)
// 带时间等待
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}

// 如果 CyclicBarrier 被打破了
if (g.broken)
throw new BrokenBarrierException();

// 当最后一个线程抵达时,它会 signal 所有休眠线程,并重置 generation
// 此时 generation 会变成新的一代,然后返回休眠前 count 的值
if (g != generation)
return index;

if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}

/**
* 唤起所有 await 的线程,重置 count,重置 generation
*/
private void nextGeneration() {
trip.signalAll();
count = parties;
generation = new Generation();
}

/* —————————————————————————————————————————————————— */








/* ————————————————————— reset ————————————————————— */

public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 打破 generation,唤起所有休眠线程(唤起后将抛出 BrokenBarrierException)
breakBarrier();
// 重置 generation 和 count
nextGeneration();
} finally {
lock.unlock();
}
}

/**
* 将本代设为 broken,唤起所有休眠线程
*/
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}

/* —————————————————————————————————————————————————— */

不细说了,简单到我上我也行(膨胀了膨胀了)。

Semaphore

Semaphore 直译是信号,这是个不太容易理解的名字,实际上它的作用更像是一个资源池。例如创建一个容量为 10 的 Semaphore,如果有超过 10 个线程过来,那么就只能阻塞,一直等到有之前的线程释放资源,让容量空出来一个,才可以再运行。

再回来看 Semaphore 的意思:信号。它指的是,给线程一个信号,告知线程资源数量是否足够,你是否可以运行,如果已经满员了,那么你只能在我内部的阻塞队列里排队等候。

画一张简图描述一下这个类的作用:

Semaphore_diagram

Semaphore 类主要使用两个方法:acquire()release(),前者是申请资源,后者是归还资源。但 release() 方法有点奇怪,即使并没有申请过资源也是可以调用的,而且它会无节制地使资源数 +1,甚至超过最初设置的资源数量。例如下面这四行代码:

1
2
3
4
Semaphore semaphore = new Semaphore(10);
semaphore.release();
semaphore.release();
semaphore.release();

初始化一个 Semaphore,容量设为 10,但是在 release() 了 3 次之后,容量变成了 13,可以同时有 13 个线程申请到锁然后运行。


这个类的代码跟 Condition 类有点类似,总体上讲还是比较容易的,直接看代码就好。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
/* ———————————————————————— 基本信息 ———————————————————————— */

/**
* AQS 阻塞队列,老套路
*/
private final Sync sync;

abstract static class Sync extends AbstractQueuedSynchronizer {
/**
* AQS 阻塞队列的构造方法,传 permits 进去,设为 AQS 的状态(state)
*/
Sync(int permits) {
setState(permits);
}
// ...(略)
}

/**
* 非公平阻塞队列,老套路
*/
static final class NonfairSync extends Sync {
// ...(略)
}

/**
* 公平阻塞队列,老套路
*/
static final class FairSync extends Sync {
// ...(略)
}

/* —————————————————————————————————————————————————————— */








/* ———————————————————————— 构造方法 ———————————————————————— */

public Semaphore(int permits, boolean fair) {
// 双参数构造函数
// 前者 permits(即 AQS 的状态 state)
// 后者是否公平
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

public Semaphore(int permits) {
// 单参数构造函数,默认非公平锁
sync = new NonfairSync(permits);
}

/* —————————————————————————————————————————————————————— */








/* ———————————————————————— acquire ———————————————————————— */

public void acquire() throws InterruptedException {
// 获取一次共享锁
sync.acquireSharedInterruptibly(1);
}

/**
* 获取共享锁(响应中断)
*/
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 试着获取一次共享锁
if (tryAcquireShared(arg) < 0)
// 如果没获取到,
doAcquireSharedInterruptibly(arg);
}

/**
* (公平锁)尝试获取共享锁
*/
protected int tryAcquireShared(int acquires) {
for (;;) {
// 如果阻塞队列中,有节点排在自己前面
if (hasQueuedPredecessors())
return -1;
// 执行到这里时,自己就是最前面的节点
// 获取 AQS 的状态
int available = getState();
// 如果获取到锁,还剩多少资源可用
int remaining = available - acquires;
// 如果获取锁后资源够用,就 CAS 设置一遍(CAS 失败就循环重来)
if (remaining < 0 || compareAndSetState(available, remaining))
return remaining;
}
}

/**
* 尝试获取锁,获取不到就阻塞,与 Condition 完全相同
*/
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null;
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

/* —————————————————————————————————————————————————————— */









/* ———————————————————————— release ———————————————————————— */

/**
* 本方法会为 AQS 的状态 +1,调用多少次就加多少次,永无上限(并不是最多 permits)
*/
public void release() {
sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
// AQS 状态 +1
if (tryReleaseShared(arg)) {
// 解锁
doReleaseShared();
return true;
}
return false;
}

/**
* 将 AQS 的状态 +1(甚至会超过 permits)
*/
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current)
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}

/**
* 尝试解锁,跟 Condition 完全一致
*/
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h);
}
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)
break;
}
}

/* —————————————————————————————————————————————————————— */

代码学习得很顺利,基本熟知了 JUC 框架的各种实现,但是不太理解为什么要这么设计,沉淀一段时间再回来想吧。

下周学习线程池。