并发编程04-juc-Tools

Juc-Tools

以下都是共享模式

Semaphore

构造方法

​ public Semaphore(int permits)

​ public Semaphore(int permits, boolean fair)

permits 表示许可线程的数量 fair 表示公平性,如果这个设为 true 的话,下次执行的线程会是等待最久的线 程

重要方法

 public void acquire()

​ public void release()

​ public tryAcquire(long timeout, TimeUnit unit)

acquire() 表示阻塞并获取许可 release() 表示释放许可

基本使用

需求场景:资源访问,服务限流。

example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class SemaphoreSample {

public static void main(String[] args) {
Semaphore semaphore = new Semaphore(2);
for (int i=0;i<5;i++){
new Task(semaphore,"handsome+"+i).start();
}
}

static class Task extends Thread{
Semaphore semaphore;

public Task(Semaphore semaphore,String tname){
this.semaphore = semaphore;
this.setName(tname);
}

@Override
public void run() {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+":aquire() at time:"+System.currentTimeMillis());
Thread.sleep(1000);

System.out.println(Thread.currentThread().getName()+":release() at time:"+System.currentTimeMillis());
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}

}
}
}

CountDownLatch

有主次之分,注重对分支线程的结果在主线程汇总处理

例如多excel处理多个sheet,最后对每个sheet的处理结果汇总处理

CountDownLatch是什么?

​ CountDownLatch这个类能够使一个线程等待其他线程完成各自的工作后再执行。例如,应用程序的主线程希望在负责启动框架服务的线程已经启动所有的框架服务之后再执行

API

​ CountDownLatch.countDown()

​ CountDownLatch.await();

CountDownLatch应用场景例子

​ 比如陪媳妇去看病。 医院里边排队的人很多,如果一个人的话,要先看大夫,看完大夫再去排队交钱取药。 现在我们是双核,可以同时做这两个事(多线程)。

​ 假设看大夫花3秒钟,排队交费取药花5秒钟。我们同时搞的话,5秒钟我们就能完成,然后 一起回家(回到主线程)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class CountDownLaunchSample {
public static void main(String[] args) throws InterruptedException {
long now = System.currentTimeMillis();
CountDownLatch countDownLatch = new CountDownLatch(2);
new Thread(new SeeDoctorTask(countDownLatch)).start();
new Thread(new QueueTask(countDownLatch)).start();
//等待线程池中的2个任务执行完毕,否则一直
countDownLatch.await();
System.out.println("over,回家 cost:"+(System.currentTimeMillis()-now));
}
}
public class QueueTask implements Runnable {

private CountDownLatch countDownLatch;

public QueueTask(CountDownLatch countDownLatch){
this.countDownLatch = countDownLatch;
}

@Override
public void run() {
try {
System.out.println("开始在医院药房排队买药....");
Thread.sleep(5000);
System.out.println("排队成功,可以开始缴费买药");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
if (countDownLatch != null)
countDownLatch.countDown();
}
}
}
public class SeeDoctorTask implements Runnable {
private CountDownLatch countDownLatch;

public SeeDoctorTask(CountDownLatch countDownLatch){
this.countDownLatch = countDownLatch;
}

@Override
public void run() {
try {
System.out.println("开始看医生");
Thread.sleep(2000);
System.out.println("看医生结束,准备离开病房");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
if (countDownLatch != null)
countDownLatch.countDown();
}
}
}

原理解析

CountDownLatch内部结构

image-20221102084938034

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class CountDownLatch {

private final Sync sync;

private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}

//当state状态值变成0时,表示获取锁失败
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

//释放锁,减法后state=0,返回true, >1返回false
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
public void await() throws InterruptedException {
//共享模式入队等待获取锁
sync.acquireSharedInterruptibly(1);
}
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

public void countDown() {
//1、释放一个资源 2、如果释放后state=0,则唤醒所有共享模式的排队节点
sync.releaseShared(1);
}
public long getCount() {
return sync.getCount();
}
}

调用链路

图中浅黄色部分为CountDownLatch的方法调用,浅绿色为AQS,浅蓝色为内部类Sync。这里模拟了在主线程中提交两个子任务。

image-20221102090927183

​ 至此为止,我们就从头到尾将CountDownLatch从应用到源码都讲解了一遍。在应用演示部分,只是举了一个很简单的例子,但足够具有代表性。在对源码的解读部分,主要还是基于对AQS中共享模式的理解。我个人认为,AQS是一个抽象程度比较高的框架,CountDownLatch是利用这种抽象实现了一种具体的功能。所以,如果业务中出现了某种特殊的应用场景,又没有通用的组件可以直接使用,那么从什么角度去利用AQS的抽象,将是我们需要思考的问题。

CyclicBarrier

是什么

​ 栅栏屏障,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程 到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。 CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线 程数量,每个线程调用await方法告CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class CyclicBarrierTest implements Runnable {
private CyclicBarrier cyclicBarrier;
private int index ;

public CyclicBarrierTest(CyclicBarrier cyclicBarrier, int index) {
this.cyclicBarrier = cyclicBarrier;
this.index = index;
}

@Override
public void run() {
try {
System.out.println("index: " + index + "准备就绪");
cyclicBarrier.await();
System.out.println("index: " + index + "开始干活");

} catch (Exception e) {
e.printStackTrace();
}
}

public static void main(String[] args) throws Exception {
CyclicBarrier cyclicBarrier = new CyclicBarrier(10, new Runnable() {
@Override
public void run() {
System.out.println("所有特工到达屏障,准备开始执行秘密任务");
}
});
for (int i = 0; i < 10; i++) {
new Thread(new CyclicBarrierTest(cyclicBarrier, i)).start();
}
}

}
index: 1准备就绪
index: 2准备就绪
index: 0准备就绪
index: 5准备就绪
index: 6准备就绪
index: 4准备就绪
index: 3准备就绪
index: 7准备就绪
index: 8准备就绪
index: 9准备就绪
所有特工到达屏障,准备开始执行秘密任务
index: 9开始干活
index: 0开始干活
index: 4开始干活
index: 2开始干活
index: 1开始干活
index: 8开始干活
index: 7开始干活
index: 3开始干活
index: 6开始干活
index: 5开始干活

CyclicBarrier和CountDownLatch的区别

CountDownLatch(基于AQS):有主次之分,注重对分支线程的结果在主线程汇总处理;一旦计数器归零,就无法重置,因此不能重用。

CyclicBarrier(基于ReentrantLock):基本具备CountDownLatch的能力,且可重复触发的。比如3个线程想斗地主,得凑齐3个人才能进行,一轮结束后,继续凑够三人又可以进行下一轮,无需重复触发,像CountDownLatch需要从新await才能起作用。

CountDownLatch CyclicBarrier
底层实现 基于AQS,CAS操作将state 基于Reentranlock,Condition等待唤醒
可重用性 一次性 可重用(触发一次屏障操作后,计数器重新初始化)
使用场景 子任务全部执行完,主任务汇总 所有任务等待达到某个状态,再一起开始执行