并发编程03AQS&ReentrantLock

同步框架AQS

并发之父

Doug Lea:生平不识Doug Lea,学懂并发也枉然

image-20221018090541555

AQS是什么

​ Java并发编程核心在于java.util.concurrent包而juc当中的大多数同步器实现都是围绕着共同的基础行为,比如等待队列、条件队列、独占获取、共享获取等,而这个行为的抽象就是基于AbstractQueuedSynchronizer简称AQS,AQS定义了一套多线程访问共享资源的同步器框架,是一个依赖状态(state)的同步器

AQS具备特性

  1. 阻塞等待队列
  2. 共享/独占
  3. 公平/非公平
  4. 可重入
  5. 可中断

例如Java.concurrent.util当中同步器的实现如Lock,Latch,Barrier等,都是基 于AQS框架实现,一般通过定义内部类Sync继承AQS将同步器所有调用都映射到Sync对应的方法 AQS内部维护属性volatile int state (32位) state表示资源的可用状态

State三种访问方式

​ getState()、setState()、compareAndSetState()

AQS定义两种资源共享方式

​ Exclusive-独占,只有一个线程能执行,如ReentrantLock

​ Share-共享,多个线程可以同时执行,如Semaphore/CountDownLatch

AQS定义两种队列

​ 同步等待队列

​ 条件等待队列

​ 不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器 实现时主要实现以下几种方法:

​ **isHeldExclusively()**:该线程是否正在独占资源。只有用到 condition才需要去实现它。

​ **tryAcquire(int)**:独占方式。尝试获取资源,成功则返回true,失败则返回false。

​ **tryRelease(int)**:独占方式。尝试释放资源,成功则返回true,失败则返回false。

​ **tryAcquireShared(int)**:共享方式。尝试获取资源。负数表示失败; 0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。

​ **tryReleaseShared(int)**:共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

同步等待队列

​ AQS当中的同步等待队列也称CLH队列,CLH队列是Craig、Landin、Hagersten三人发明的一种基于双向链表数据结构的队列,是FIFO先入先出线程等待队列,Java中的CLH队列是原CLH队列的一个变种,线程由原自旋机制改为阻塞机制。

image-20211026231223938

条件等待队列

​ Condition是一个多线程间协调通信的工具类,使得某个,或者某些线程一 起等待某个条件(Condition),只有当该条件具备时,这些等待线程才会被唤醒,从而重新争夺锁

image-20211026231300145

注意:

如果Node在条件队列当中,Node必须是独占模式

AQS结构

详细结构解析查看:https://www.bilibili.com/video/BV12K411G7Fg

AbstractQueuedSynchronizer

image-20211026231827654

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {

/**
* 指向同步等待队列的头节点
*/
private transient volatile Node head;

/**
* 指向同步等待队列的尾节点
*/
private transient volatile Node tail;

/**
* 同步资源状态
*/
private volatile int state;

/**
* 给各自子类实现
*/
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}

/**
*默认实现,且不允许子类修改
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}


/**
* 已经在队列当中的Thread节点,准备阻塞等待获取锁
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {//死循环
final Node p = node.predecessor();//找到当前结点的前驱结点
if (p == head && tryAcquire(arg)) {//如果前驱结点是头结点,才tryAcquire,其他结点是没有机会tryAcquire的。
setHead(node);//获取同步状态成功,将当前结点设置为头结点。
p.next = null; // help GC
failed = false;
return interrupted;
}
/**
* 如果前驱节点不是Head,通过shouldParkAfterFailedAcquire判断是否应该阻塞
* 前驱节点信号量为-1,当前线程可以安全被parkAndCheckInterrupt用来阻塞线程
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* 若前驱结点的状态是SIGNAL,意味着当前结点可以被安全地park
*/
return true;
if (ws > 0) {
/*
* 前驱节点状态如果被取消状态,将被移除出队列
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* 当前驱节点waitStatus为 0 or PROPAGATE状态时
* 将其设置为SIGNAL状态,然后当前结点才可以可以被安全地park
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

private Node addWaiter(Node mode) {
// 1. 将当前线程构建成Node类型
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
// 2. 1当前尾节点是否为null?
if (pred != null) {
// 2.2 将当前节点尾插入的方式
node.prev = pred;
// 2.3 CAS将节点插入同步队列的尾部
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
AQS的静态内部类
static final class Node {
/**
* 标记节点未共享模式
* */
static final Node SHARED = new Node();
/**
* 标记节点为独占模式
*/
static final Node EXCLUSIVE = null;

/**
* 在同步队列中等待的线程等待超时或者被中断,需要从同步队列中取消等待
* */
static final int CANCELLED = 1;
/**
* 后继节点的线程处于等待状态,而当前的节点如果释放了同步状态或者被取消,
* 将会通知后继节点,使后继节点的线程得以运行。
*/
static final int SIGNAL = -1;
/**
* 节点在等待队列中,节点的线程等待在Condition上,当其他线程对Condition调用了signal()方法后,
* 该节点会从等待队列中转移到同步队列中,加入到同步状态的获取中
*/
static final int CONDITION = -2;
/**
* 表示下一次共享式同步状态获取将会被无条件地传播下去
*/
static final int PROPAGATE = -3;

/**
* 标记当前节点的信号量状态 (1,0,-1,-2,-3)5种状态
* 使用CAS更改状态,volatile保证线程可见性,高并发场景下,
* 即被一个线程修改后,状态会立马让其他线程可见。
*/
volatile int waitStatus;

/**
* 前驱节点,当前节点加入到同步队列中被设置
*/
volatile Node prev;

/**
* 后继节点
*/
volatile Node next;

/**
* 节点同步状态的线程
*/
volatile Thread thread;

/**
* 等待队列中的后继节点,如果当前节点是共享的,那么这个字段是一个SHARED常量,
* 也就是说节点类型(独占和共享)和等待队列中的后继节点共用同一个字段。
*/
Node nextWaiter;

/**
* Returns true if node is waiting in shared mode.
*/
final boolean isShared() {
return nextWaiter == SHARED;
}

/**
* 返回前驱节点
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}

Node() { // Used to establish initial head or SHARED marker
}

Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}

Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}

acquire的过程

  1. tryAcquire尝试获取锁
    1. 尝试失败,入队等待队列
    2. 入队后循环尝试获取锁
      1. 判断该节前驱结点为头结点,tryacquire
      2. 判断是否要阻塞该线程
        1. 判断是否要阻塞
          1. 前驱结点是就绪状态,说明前置结点也只等待拿锁,则本节点可以阻塞 返回true
          2. 前驱结点状态>0是取消,则递归移除前驱结点 返回false
          3. 否则设置该节点的前置结点为signal就绪状态 返回false
        2. 根据第一点结果是否阻塞阻塞
          1. 阻塞
          2. 不阻塞,从新回到循环 2.判断是否要阻塞该线程

release的过程

  1. tryRelease
    1. 释放成功 返回true
      1. 如果头节点不为空且状态不=0
        1. 将等待状态waitStatus设置为初始值0
        2. 后继结点为空,或状态为CANCEL(已失效),则从后尾部往前遍历找到最前的一个处于正常阻塞状态的结点进行唤醒
        3. 唤醒查到的节点
    2. 失败,返回false

ReentrantLock的特点

1、是独占锁

2、同步等待队列的Node是通过魔术类UnSafe.park()/unPark() 方法来实现阻塞和唤醒

state字段

​ state是一个非常重要的组成部分,它是一个volatile修饰的变量,用来记录同步状态的值。这个state的含义并不是一成不变的,它会根据具体实现类的作用不同而表示不同的含义。例如,在Semaphore信号量中,state表示的是剩余许可证的数量;在ReentrantLock中,state用来标记是否有线程占有锁以及重入的次数;在CountDownLatch中,state用来标记占有锁的线程个数。因此,state的具体含义完全取决于用户如何使用AQS。

​ 另外,AQS提供了compareAndSetState()方法用于修改state的值,这是一个原子操作,保证了在多线程环境下的安全性。

问题

1、唤醒操作为啥 不从head开始向后遍历,而是从尾节点开始向前遍历

image-20221026231349547

入队操作时:enq()方法

if条件内的代码并不是原子的,线程A通过CAS进入if语句块之后,发生上下文切换,此时线程B同样执行了该方法,并且执行完毕。然后线程C调用了unparkSuccessor方法。就可能造成遍历到node2后,没有后续节点了,但是tail节点 != node2,从尾部向前遍历则不存在这个问题

image-20221026231638666

2、parkAndCheckInterupt()方法返回return Thread.interrupterd()是什么意思?

Thread.interrupterd()该方法用于检测线程是否中断,以及清除中断标志位。当第二次调用这个方法的时候就会返回false。

1、当前线程处于等待队列中(并且处于挂起中时)时无法响应外部的中断请求,因为unsafe.park()操作导致阻塞的线程遇到中断时不像wait(),sleep()遇到中断时一样,即可抛出中断异常,而是修改线程的中断标志位。

2、只有当该线程拿到锁后,通过获取判断该标志位才能响应中断请求

3、Thread中Thread.interrupted()和this.isInterrupted()方法的区别

Thread.interrupted() this.isInterrupted()
1 静态方法,作用于当前线程 成员方法,作用于Thread实例
2 返回中断状态同时清除了状态位状态 只返回中断状态

image-20221030201335028

ReenTrantLock

是什么

ReentrantLock,直译可重入锁,基于AQS,它实现了公平锁与非公平锁,可重入,在开发中可以使用它对资源进行同步操作。

与Synchronized(基于对象锁Monitor)的区别是它基于CAS算法,无需锁住同步资源(这里指对象锁)属于乐观锁。

图片

继承结构

ReentrantLock

Lock接口

ReentrantLock继承了Lock,继承以下方法

1
2
3
4
5
6
7
8
9
10
11
12
13
//获取锁
void lock();
//获取锁,区别在于如果当前线程在等待锁的过程中被中断,则退出等待,抛出中断异常
void lockInterruptibly() throws InterruptedException;
//尝试获取锁,成功与否都立即返回,不等待,获取成功返回true,失败返回false
boolean tryLock();
//尝试获取锁,失败则等待,设置最多等待时长,超时后,返回false
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
//释放锁
void unlock();
//新建一个绑定在当前lock对象上的condition对象
//TIPS∶Condition对象是什么?简单来说,它表示一个条件,不同线程可以通过该条件来进行通信。比如某线程可以通过 await 方法注册在condition对象上进行等待,然后通过condition对象的signal方法将该线程唤醒。这有点类似Object锁的wait和notify方法。但不同的是,一个Lock对象可以关联多个Condition对象,多个线程可以被绑定在不同的Condition对象上,这样就可以分组等待,唤醒。此外,Condition对象还提供了和限时、中断相关的功能,丰富了线程的调度策略。
Condition newCondition();

Sync内部类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;

abstract void lock();

final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}

final ConditionObject newCondition() {
return new ConditionObject();
}

final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}

final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}

final boolean isLocked() {
return getState() != 0;
}

private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}

NonfairSync 与 FairSync

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
final void lock() {
//第一次尝试插队
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
//内部调用AQS的,会先尝试tryAcquire,第二次尝试插队
acquire(1);
}

protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}

/**
* Sync object for fair locks
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;

final void lock() {
acquire(1);
}

protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

公平与非公平

image-20221030200540053

image-20221030200550275

无论定义为公平锁还是非公平锁,tryLock都是非公平的,都会尝试插队,这也是nonfairTryAcquire方法定义在Sync类的原因。

但是lock()就如同上边所说,他们的过程如下

FairSync.lock()

1、lock() -> accquire(1)

2、tryAcquire(1) && acquireQueued(addWaiter(Node.EXCLUSIVE), 1)

3、tryAcquire(arg):如果state=0 且队列中无等节点,CAS获取锁,否则正常排队

NonFairSync.lock()

1、compareAndSetState(0, 1)第一次先尝试插队

2、否则 acquire(1),tryAcquire(1) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

3、tryAcquire(1) -> nonfairTryAcquire(1) -> 如果state=0,不管队列中有无等待节点,CAS获取锁

4、获取失败再和公平锁一样乖乖排队

可重入与不可重入

可重入:即获取锁时,如果当前线程已经是锁的持有者,允许当前线程再进入锁,state累加1

image-20221030200921906

image-20221030200930372