全网最详细的AbstractQueuedSynchronizer(AQS)源码剖析(一)AQS基础

AbstractQueuedSynchronizer(以下简称AQS)的内容确实有点多,博主考虑再三,还是决定把它拆成三期。原因有三,一是放入同一篇博客势必影响阅读体验,而是为了表达对这个伟大基础并发组件的崇敬之情。第三点其实是为了偷懒。
又扯这么多没用的,还是直接步入正题吧~

AQS介绍

AQS是一个抽象类,它是实现多种并发同步工具的核心组件。比如大名鼎鼎的可重入锁ReentrantLock),它的底层实现就是借助内部类Sync,而Sync类就是继承了AQS并实现了AQS定义的若干钩子方法。这些并发同步工具包括:

从设计模式上来看,AQS主要使用的是模板方法模式(Template Method Pattern)。它提供了若干钩子方法供子类实现(如tryAcquiretryRelease等),AQS的模板方法(如acquirerelease等)会调用这些钩子方法。子类使用AQS的方式就是直接调用AQS的模板方法,并重写这些模板方法涉及到的特定钩子方法即可。不需要调用的钩子方法可以不用重写,AQS为它们均提供了默认实现抛出UnsupportedOperationException异常

此外,AQS也提供了其他一些方法供子类调用,如getStatehasQueuedPredecessors等方法,方便子类获取、判断同步器的状态

什么是钩子方法?
钩子方法的概念源于模板方法模式,这种模式是在一个方法中定义了算法的骨架,某些关键步骤会交给子类去实现。模板方法在不改变算法本身结构的情况下,允许子类自定义其中一些关键步骤
这些关键步骤可以由父类定义成方法,这些方法可以是抽象方法,或钩子方法

  • 抽象方法:父类定义但不实现,由abstract关键字标识
  • 钩子方法:父类定义且实现,但这种实现一般都是空实现,并没有任何意义,这么做只是为了方便子类根据需要重写特定的钩子方法,而不用实现所有的钩子方法

AQS的核心思想:

作者:酒冽 出处:https://www.cnblogs.com/frankiedyz/p/15673957.html
版权:本文版权归作者和博客园共有
转载:欢迎转载,但未经作者同意,必须保留此段声明;必须在文章中给出原文连接;否则必究法律责任

AQS的基本结构

状态state

AQS使用volatile int变量state来作为核心状态,所有的同步控制都是围绕这个state来进行的,volatile保证其内存可见性,并使用CAS确保state的修改是原子的。volatile和CAS同时存在,就保证了state的线程安全性

对于不同的同步工具实现来说,语义是不同的,如下:

针对state这个核心状态,AQS提供了getStatesetState等多个获取、修改方法,源码如下:

private volatile int state; protected final int getState() {    return state;} protected final void setState(int newState) {    state = newState;} protected final boolean compareAndSetState(int expect, int update) {    // See below for intrinsics setup to support this    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);}
作者:酒冽 出处:https://www.cnblogs.com/frankiedyz/p/15673957.html
版权:本文版权归作者和博客园共有
转载:欢迎转载,但未经作者同意,必须保留此段声明;必须在文章中给出原文连接;否则必究法律责任

同步队列

Node类

AQS内部维护了一个同步队列(网上有些文章会叫它为CLH队列,至于为啥叫这个我也不知道-_-||,但不重要~)。队列中的每个节点都是Node类型其源码如下:

static final class Node {        static final Node SHARED = new Node();    static final Node EXCLUSIVE = null;     static final int CANCELLED =  1;    static final int SIGNAL    = -1;    static final int CONDITION = -2;    static final int PROPAGATE = -3;     volatile int waitStatus;     volatile Node prev;    volatile Node next;     volatile Thread thread;     Node nextWaiter;     final boolean isShared() {        return nextWaiter == SHARED;    }     final Node predecessor() throws NullPointerException {        Node p = prev;        if (p == null)            throw new NullPointerException();        else            return p;    }     Node() {    // Used to establish initial head or SHARED marker    }     Node(Thread thread, Node mode) {     // Used by addWaiter        this.nextWaiter = mode;        this.thread = thread;    }     Node(Thread thread, int waitStatus) { // Used by Condition        this.waitStatus = waitStatus;        this.thread = thread;    }}

prevnext用于保存该节点的前驱、后继节点,表明这个同步队列是一个双向队列

Nodethread域保存了对应的线程,只有在创建时赋值,使用完要null掉,以方便GC

Node使用SHAREDEXCLUSIVE两个常量来标记该线程是由于获取共享资源、互斥资源失败,而被阻塞并放入到同步队列中进行等待

Node使用waitStatus来记录当前线程的等待状态,通过CAS进行修改。它的取值可以是:

AQS:为什么需要PROPAGATE?
AQS源码深入分析之共享模式-你知道为什么AQS中要有PROPAGATE这个状态吗?

同步队列的结构

AQS中维护了一个同步队列,它通过两个指针标记队头、队尾,分别是headtail,源码如下:

private transient volatile Node head; private transient volatile Node tail;

该队列的出入规则遵循FIFO(First In, First Out)

注意:如果该同步队列非空,那么head其实并不是指向第一个线程对应的Node,而是指向一个空的Node

接下来让我们剖析一下AQS针对这个同步队列设计的入队、出队算法

入队算法

入队事件主要在线程尝试获取资源失败时触发。当线程尝试获取资源失败之后,会将该线程加入到同步队列的队尾

入队算法的源码见AQS的addWaiter方法,如下:

// mode可以是Node.EXCLUSIVE或Node.SHAREDprivate Node addWaiter(Node mode) {    Node node = new Node(Thread.currentThread(), mode);    // Try the fast path of enq; backup to full enq on failure    Node pred = tail;    if (pred != null) {        node.prev = pred;        if (compareAndSetTail(pred, node)) {            pred.next = node;            return node;        }    }    enq(node);    return node;}

首先为该线程创建一个Node节点,mode可以是Node.EXCLUSIVENode.SHARED,表示两种不同的模式。
之后直接CAS试图将其入队。这里注意,如果队列本身为空,或CAS竞争失败,才会进入enq方法。这里addWaiter方法出于性能考虑,先尝试快捷的入队方式,不成功才执行enq方法

enq方法是完整的入队逻辑,源码如下:

private Node enq(final Node node) {    for (;;) {        Node t = tail;        if (t == null) { // 如果队列为空,则将head和tail初始化为同一个空Node            if (compareAndSetHead(new Node()))                tail = head;        } else {            node.prev = t;            if (compareAndSetTail(t, node)) {	// 不断CAS直到成功为止                t.next = node;                 return t;            }        }    }}

enq中的代码都包含在for循环中,如果CAS失败,就会不断循环CAS直到成功为止

注意,这段代码也体现出同步队列的三个特点:

出队算法

出队事件主要发生在:位于同步队列中的线程再次获取资源,并成功获得时

出队算法在AQS中并没有直接对应的方法,而是零散分布在某些方法中。因为获取资源失败而被阻塞的线程被唤醒后,会重新尝试获取资源。如果获取成功,则会执行出队逻辑

例如,在acquireQueued方法中,就包含了出队事件:

final boolean acquireQueued(final Node node, int arg) {    boolean failed = true;    try {        boolean interrupted = false;        for (;;) {            final Node p = node.predecessor();            if (p == head && tryAcquire(arg)) {                setHead(node);                p.next = null; // help GC                failed = false;                return interrupted;            }            if (shouldParkAfterFailedAcquire(p, node) &&                parkAndCheckInterrupt())                interrupted = true;            }        } finally {            if (failed)                cancelAcquire(node);    }} private void setHead(Node node) {    head = node;    node.thread = null;    node.prev = null;}

出队的逻辑体现在第6-9行,此时p指向head指向的空节点,而node是队首元素(不是第一个空节点)
首先调用setHead方法,将head指向node、将nodethread域、prev域置空,然后将headnext域置空,以方便该节点的GC

节点的取消

线程会因为超时或中断而被取消,之后不会再参与锁的竞争,会等待GC

取消的过程见cancelAcquire方法,该方法的调用时机都是在获取资源失败之后,而失败就是由于超时或中断。其源码如下:

private void cancelAcquire(Node node) {    if (node == null)        return;     node.thread = null;		// 将thread域置空以方便GC     // 向前遍历并跳过被取消的Node    Node pred = node.prev;    while (pred.waitStatus > 0)        node.prev = pred = pred.prev;     Node predNext = pred.next;    node.waitStatus = Node.CANCELLED;     // 如果是tail,那么将tail修改为pred    if (node == tail && compareAndSetTail(node, pred)) {        compareAndSetNext(pred, predNext, null);    } else {        int ws;        if (pred != head &&            ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&            pred.thread != null) {            // 如果node的next需要signal,那么就将pred的next设为node的next            Node next = node.next;            if (next != null && next.waitStatus <= 0)                compareAndSetNext(pred, predNext, next);        } else {            unparkSuccessor(node);        }        node.next = node; // help GC    }}

总之,cancelAcquire方法就是将目标节点nodethread域置空,并将waitStatus置为CANCELLED

这里有一个问题:node的后继节点nextprev指针仍然指向node,没有更新为pred,这不仅语义上是错误的,而且会阻碍node被GC。那么何时进行更新?
答:任何其他线程尝试获取锁失败之后,都会被放入同步队列,然后调用shouldParkAfterFailedAcquire方法判断是否应该被阻塞。如果发现当前节点的前驱节点被置为CANCELLED,就会执行:

do {    node.prev = pred = pred.prev;} while (pred.waitStatus > 0);

此外,cancelAcquire方法也会做类似的操作,如下:

Node pred = node.prev;while (pred.waitStatus > 0)	node.prev = pred = pred.prev;

这两处都会更新被取消节点的后继节点的prev指针,所以前面说到的的问题根本不存在

注意:cancelAcquire的调用时机一般都是在获取锁逻辑后面的finally块中,如果获取失败就会调用cancelAcquire方法。获取失败的原因主要有两个,中断或超时

总结:

好了,到这里为止,我们就完成了对AQS基本结构的分析。这里如果有不懂的地方,可以暂时跳过,等看完后续博客再回头看这篇,应该就能明白了
下一篇我们会逐步剖析AQS如何实现对资源的获取和释放,go go go!

全网最详细的AbstractQueuedSynchronizer(AQS)源码剖析(一)AQS基础
全网最详细的AbstractQueuedSynchronizer(AQS)源码剖析(二)资源的获取和释放
全网最详细的AbstractQueuedSynchronizer(AQS)源码剖析(三)条件变量