同步框架AQS 并发之父 Doug Lea :生平不识Doug Lea ,学懂并发也枉然
AQS是什么 Java并发编程核心在于java.util.concurrent包而juc当中的大多数同步器实现都是围绕着共同的基础行为,比如等待队列、条件队列、独占获取、共享获取等,而这个行为的抽象就是基于AbstractQueuedSynchronizer简称AQS,AQS定义了一套多线程访问共享资源的同步器框架,是一个依赖状态(state)的同步器 。
AQS具备特性
阻塞等待队列
共享/独占
公平/非公平
可重入
可中断
例如Java.concurrent.util当中同步器的实现如Lock,Latch,Barrier等,都是基 于AQS框架实现,一般通过定义内部类Sync继承AQS将同步器所有调用都映射到Sync对应的方法 AQS内部维护属性volatile int state (32位) state表示资源的可用状态
State三种访问方式
getState()、setState()、compareAndSetState()
AQS定义两种资源共享方式
Exclusive-独占,只有一个线程能执行,如ReentrantLock
Share-共享,多个线程可以同时执行,如Semaphore/CountDownLatch
AQS定义两种队列
同步等待队列
条件等待队列
不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器 实现时主要实现以下几种方法:
**isHeldExclusively()**:该线程是否正在独占资源。只有用到 condition才需要去实现它。
**tryAcquire(int)**:独占方式。尝试获取资源,成功则返回true,失败则返回false。
**tryRelease(int)**:独占方式。尝试释放资源,成功则返回true,失败则返回false。
**tryAcquireShared(int)**:共享方式。尝试获取资源。负数表示失败; 0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
**tryReleaseShared(int)**:共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
同步等待队列
AQS当中的同步等待队列也称CLH队列,CLH队列是Craig、Landin、Hagersten三人发明的一种基于双向链表数据结构的队列,是FIFO先入先出线程等待队列,Java中的CLH队列是原CLH队列的一个变种,线程由原自旋机制改为阻塞机制。
条件等待队列
Condition是一个多线程间协调通信的工具类,使得某个,或者某些线程一 起等待某个条件(Condition),只有当该条件具备时,这些等待线程才会被唤醒,从而重新争夺锁
注意: 如果Node在条件队列当中,Node必须是独占模式
AQS结构 详细结构解析查看:https://www.bilibili.com/video/BV12K411G7Fg
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java .io .Serializable { private transient volatile Node head; private transient volatile Node tail; private volatile int state; protected boolean tryAcquire (int arg) { throw new UnsupportedOperationException(); } public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } } private static boolean shouldParkAfterFailedAcquire (Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true ; if (ws > 0 ) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0 ); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false ; } private Node addWaiter (Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; if (pred != null ) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 AQS的静态内部类static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null ; static final int CANCELLED = 1 ; static final int SIGNAL = -1 ; static final int CONDITION = -2 ; static final int PROPAGATE = -3 ; volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter; final boolean isShared () { return nextWaiter == SHARED; } final Node predecessor () throws NullPointerException { Node p = prev; if (p == null ) throw new NullPointerException(); else return p; } Node() { } Node(Thread thread, Node mode) { this .nextWaiter = mode; this .thread = thread; } Node(Thread thread, int waitStatus) { this .waitStatus = waitStatus; this .thread = thread; } }
acquire的过程
tryAcquire尝试获取锁
尝试失败,入队等待队列
入队后循环尝试获取锁
判断该节前驱结点为头结点,tryacquire
判断是否要阻塞该线程
判断是否要阻塞
前驱结点是就绪状态,说明前置结点也只等待拿锁,则本节点可以阻塞 返回true
前驱结点状态>0是取消,则递归移除前驱结点 返回false
否则设置该节点的前置结点为signal就绪状态 返回false
根据第一点结果是否阻塞阻塞
阻塞
不阻塞,从新回到循环 2.判断是否要阻塞该线程
release的过程
tryRelease
释放成功 返回true
如果头节点不为空且状态不=0
将等待状态waitStatus设置为初始值0
后继结点为空,或状态为CANCEL(已失效),则从后尾部往前遍历找到最前的一个处于正常阻塞状态的结点进行唤醒
唤醒查到的节点
失败,返回false
ReentrantLock的特点
1、是独占锁
2、同步等待队列的Node是通过魔术类UnSafe.park()/unPark() 方法来实现阻塞和唤醒
state字段 state是一个非常重要的组成部分,它是一个volatile修饰的变量,用来记录同步状态的值。这个state的含义并不是一成不变的,它会根据具体实现类的作用不同而表示不同的含义。例如,在Semaphore信号量中,state表示的是剩余许可证的数量;在ReentrantLock中,state用来标记是否有线程占有锁以及重入的次数;在CountDownLatch中,state用来标记占有锁的线程个数。因此,state的具体含义完全取决于用户如何使用AQS。
另外,AQS提供了compareAndSetState()方法用于修改state的值,这是一个原子操作,保证了在多线程环境下的安全性。
问题 1、唤醒操作为啥 不从head开始向后遍历,而是从尾节点开始向前遍历
入队操作时:enq()方法
if条件内的代码并不是原子的,线程A通过CAS进入if语句块之后,发生上下文切换,此时线程B同样执行了该方法,并且执行完毕。然后线程C调用了unparkSuccessor方法。就可能造成遍历到node2后,没有后续节点了,但是tail节点 != node2,从尾部向前遍历则不存在这个问题
2、parkAndCheckInterupt()方法返回return Thread.interrupterd()是什么意思?
Thread.interrupterd()该方法用于检测线程是否中断,以及清除中断标志位。当第二次调用这个方法的时候就会返回false。
1、当前线程处于等待队列中(并且处于挂起中时)时无法响应外部的中断请求,因为unsafe.park()操作导致阻塞的线程遇到中断时不像wait(),sleep()遇到中断时一样,即可抛出中断异常,而是修改线程的中断标志位。
2、只有当该线程拿到锁后,通过获取判断该标志位才能响应中断请求
3、Thread中Thread.interrupted()和this.isInterrupted()方法的区别
Thread.interrupted()
this.isInterrupted()
1
静态方法,作用于当前线程
成员方法,作用于Thread实例
2
返回中断状态同时清除了状态位状态
只返回中断状态
ReenTrantLock 是什么 ReentrantLock,直译可重入锁,基于AQS,它实现了公平锁与非公平锁,可重入 ,在开发中可以使用它对资源进行同步操作。
与Synchronized(基于对象锁Monitor)的区别是它基于CAS算法 ,无需锁住同步资源(这里指对象锁)属于乐观锁。
继承结构
Lock接口 ReentrantLock继承了Lock,继承以下方法
1 2 3 4 5 6 7 8 9 10 11 12 13 void lock () ; void lockInterruptibly () throws InterruptedException ; boolean tryLock () ; boolean tryLock (long time, TimeUnit unit) throws InterruptedException ; void unlock () ; Condition newCondition () ;
Sync内部类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L ; abstract void lock () ; final boolean nonfairTryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; } protected final boolean tryRelease (int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false ; if (c == 0 ) { free = true ; setExclusiveOwnerThread(null ); } setState(c); return free; } protected final boolean isHeldExclusively () { return getExclusiveOwnerThread() == Thread.currentThread(); } final ConditionObject newCondition () { return new ConditionObject(); } final Thread getOwner () { return getState() == 0 ? null : getExclusiveOwnerThread(); } final int getHoldCount () { return isHeldExclusively() ? getState() : 0 ; } final boolean isLocked () { return getState() != 0 ; } private void readObject (java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { s.defaultReadObject(); setState(0 ); } }
NonfairSync 与 FairSync 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L ; final void lock () { if (compareAndSetState(0 , 1 )) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1 ); } protected final boolean tryAcquire (int acquires) { return nonfairTryAcquire(acquires); } }static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L ; final void lock () { acquire(1 ); } protected final boolean tryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (!hasQueuedPredecessors() && compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; } }
公平与非公平
无论定义为公平锁还是非公平锁,tryLock 都是非公平的,都会尝试插队,这也是nonfairTryAcquire方法定义在Sync类的原因。
但是lock()就如同上边所说,他们的过程如下
FairSync.lock() 1、lock() -> accquire(1)
2、tryAcquire(1) && acquireQueued(addWaiter(Node.EXCLUSIVE), 1)
3、tryAcquire(arg):如果state=0 且队列中无等节点 ,CAS获取锁,否则正常排队
NonFairSync.lock() 1、compareAndSetState(0, 1)第一次先尝试插队
2、否则 acquire(1),tryAcquire(1) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
3、tryAcquire(1) -> nonfairTryAcquire(1) -> 如果state=0,不管队列中有无等待节点 ,CAS获取锁
4、获取失败再和公平锁一样乖乖排队
可重入与不可重入 可重入:即获取锁时,如果当前线程已经是锁的持有者,允许当前线程再进入锁,state累加1