1. 首页 >  小程序定制 >  Java中AQS的原理与实现

Java中AQS的原理与实现

目录

1:什么是AQS?

2:AQS都有那些用途?

3:我们如何使用AQS

4:AQS的实现原理

5:对AQS的设计与实现的一些思考

1:什么是AQS

​ 随着计算机的算力越来越强大,各种各样的并行编程模型也随即踊跃而来,但当我们要在并行计算中使用共享资源的时候,就需要利用一种手段控制对共享资源的访问和修改来保证我们程序的正确的运行。而Java中除了在语言级别实现的synchronized锁之外,还有另一种对共享资源访问控制的实现,而这些实现都依赖于一个叫做抽象队列同步器(AbstractQueuedSynchronizer)的模板框架类,简称AQS。所以我们想要更深刻的理解Java中对共享资源的访问控制,就不可避免的要对AQS深入的了解和探讨。

​ 我们首先看一下官方对于AQS的描述:提供一个依赖先进先出(FIFO)等待队列的挂起锁和相关同步器(信号量、事件等)框架。此类被设计为大多数类型的同步器的基类,这些同步器依赖于单个原子int值来表示状态。子类必须定义更改该状态的受保护方法,以及定义该状态在获取或释放该对象方面的含义。考虑到这些,类中的其他方法实现排队和挂起机制。子类可以维护状态字段,但只有使用方法getState、setState和compareAndSetState方法才可以更改状态。

2:AQS有哪些用途

​ AQS的用途有很多,几乎Java中所有的共享资源控制的实现都离不开它,比如我们常用的锁ReentrantLock、是基于AQS实现了一套可重入的公平和非公平互斥锁的实现。上图中我列举了我们常用的一些对于共享资源访问控制的一些工具,也都是基于AQS实现的。

3:如何使用AQS

​ 我们要是用AQS实现我们自己的锁,都离不开AQS中一个叫做int类型state的变量,而这个变量的具体意义是由使用者自已定义的,比如我们要基于AQS实现一个不可重入的互斥锁,我们可以定义state为1代表有线程已经获取了锁,state为0为空闲状态。

下面是AQS文档中的一段使用AQS自定义互斥锁的一段示例代码,我把它放到此处,方便大家查阅。

class Mutex implements Lock, java.io.Serializable {

    // Our internal helper class
    private static class Sync extends AbstractQueuedSynchronizer {
        //Reports whether in locked state
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        // Acquires the lock if state is zero
        public boolean tryAcquire(int acquires) {
            assert acquires == 1; // Otherwise unused
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        // Releases the lock by setting state to zero
        protected boolean tryRelease(int releases) {
            assert releases == 1; // Otherwise unused
            if (getState() == 0) throw new IllegalMonitorStateException();
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        // Provides a Condition
        Condition newCondition() {
            return new ConditionObject();
        }

        // Deserializes properly
        private void readObject(ObjectInputStream s)
                throws IOException, ClassNotFoundException {
            s.defaultReadObject();
            setState(0); // reset to unlocked state
        }
    }

    // The sync object does all the hard work. We just forward to it.
    private final Sync sync = new Sync();

    public void lock() {
        sync.acquire(1);
    }

    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    public void unlock() {
        sync.release(1);
    }

    public Condition newCondition() {
        return sync.newCondition();
    }

    public boolean isLocked() {
        return sync.isHeldExclusively();
    }

    public boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }

    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
}

4:AQS的实现原理

​ AQS实现的核心思想是,如果被请求的共享资源空闲,那么就将当前请求资源的线程设置为有效的工作线程,将共享资源设置为锁定状态;如果共享资源被占用,就需要将此线程放入一个叫做CLH(三个人名Craig、Landin and Hagersten)的等待队列中,然后通过挂起和唤醒机制来保证锁的分配。而将资源设置为锁定状态主要是通过说到的一个叫做int类型的state的变量来控制的,队列的FIFO操作则是利用CLH队列来实现。

等待队列是“CLH”(Craig、Landin和Hagersten)锁队列的变体。我们使用它们来作为同步器,在其节点中存储有关线程的一些控制信息。每个节点中的status字段跟踪线程是否应该挂起。当节点的前驱节点释放时,节点会发出信号。队列的每个节点在其他方面都充当一个特定的通知样式监视器,其中包含一个等待线程。要进入CLH锁的队列,可以将其原子性地拼接在队列的尾部。要退出队列,只需将其放在队列头部,也就是将此节点设置为head字段,原理图如下。

4.1:AQS的数据结构

首先我们先看一下AQS中最基本的数据结构,也就是CLH队列中的节点,是一个名为Node的静态内部类

static final class Node {
        /** 标记此节点是一个以共享模式等待的锁 */
        static final Node SHARED = new Node();
        /** 标记此节点是一个以互斥模式等待的锁 */
        static final Node EXCLUSIVE = null;

        /** 表示线程获取锁的线程已经取消了*/
        static final int CANCELLED =  1;
        /** 原文注释:waitStatus value to indicate successor's thread needs unparking
        	表示此线程的后继线程需要通过unpark()方法唤醒。
        	我的理解是此线程的后继节点需要被唤醒,
        	但当前节点必须是释放锁或者取消获取锁,后继线程等待被唤醒获取锁,后续可以通过源码解释。
         */
        static final int SIGNAL    = -1;
        /** 表示节点在等待队列中,节点线程等待唤醒,在使用Conditional的时候会有此状态 */
        static final int CONDITION = -2;
        /**
         * 当前线程处在SHARED情况下,该字段才会使用
         */
        static final int PROPAGATE = -3;

        /**
     		这些值以数字形式排列,以简化使用。非负值表示节点不需要信号。因此,大多数代码不需要检查特定的值,仅用检查符号就可以			 了。对于正常同步节点,该字段初始化为0,并且条件节点的CONDITION。使用CAS进行修改。
         */
        volatile int waitStatus;

        /**
			前驱节点
         */
        volatile Node prev;

        /**
    		后继节点
         */
        volatile Node next;

        /**
         * 此节点的线程
         */
        volatile Thread thread;

        /**
          指向下一个处于CONDITION状态的节点,使用Conditional模式才会用到此节点,
          篇幅原因,本片不讲述Condition Queue队列,故对此字段不多作介绍。
         */
        Node nextWaiter;

        /**
         * 是否是共享模式
         */
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        /**
         * 返回当前节点的前驱节点,前驱节点为null,则抛出NPE
         */
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // Used to establish initial head or SHARED marker
        }

        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

AQS中其他重要字段

    /**
     * 队列的头节点(虚节点,不存储数据),懒初始化。除了初始化之外,仅能通过setHead方法修改。
     * 注:假如头节点存在,一定会保证waitStatus变量的值不是CANCELLED
     */
    private transient volatile Node head;

    /**
     * 队列的尾节点(虚节点,不存储数据),懒初始化,仅仅可以通过enq方法初始化
     */
    private transient volatile Node tail;

    /**
     * 同步状态
     */
    private volatile int state;

由静态内部类Node中的prev和next字段我们可以知道CLH变体队列是一个由Node组成的双向队列,由字段head和tail可以知道AQS中保存了头节点和尾节点。state字段则是用来作为同步的重要字段,AQS中提供了三个final类型的方法来访问该字段。

    protected final int getState() {
        return state;
    }
    protected final void setState(int newState) {
        state = newState;
    }
    protected final boolean compareAndSetState(int expect, int update) {
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

4.2:模板方法

    /**
     尝试以独占模式获取锁,此方法此方法应查询对象的状态是否允许以独占模式获取该对象,若允许的话则可以获取它。
	 此方法总是由执行acquire方法的线程调用。如果此方法返回false且线程尚未排队,则acquire方法可以对线程进行排队,直到某  	 	  个其他线程发出释放信号,则该线程停止挂起。
     */
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

    /**
     尝试以独占模型释放锁
     */
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

    /**
     尝试以共享模式获取锁,此方法此方法应查询对象的状态是否允许以独占模式获取该对象,若允许的话则可以获取它。
	 此方法总是由执行acquire方法的线程调用。如果此方法返回false且线程尚未排队,则acquire方法可以对线程进行排队,直到某  	 	  个其他线程发出释放信号,则该线程停止挂起。
     */
    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }

    /**
    尝试以共享模式释放锁
     */
    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }

    /**
		返回是否以独占模式持有锁
     */
    protected boolean isHeldExclusively() {
        throw new UnsupportedOperationException();
    }

思考:AQS作为一个模板框架类,为什么 tryLock和tryRelease等方法,那么为什么这些方法不定义成abstract方法,而要定义成普通方法,且在方法中抛出异常呢?我的理解是AQS作为一个模板框架提供了加锁和释放锁的通用逻辑,但是又不仅仅是提供了独占锁或共享锁的逻辑,而是作为一个底层的通用执行模板类,如何定义成抽象的模板方法,那么所有的子类都要实现所有的模板方法,但是有些子类仅仅需要独占锁,比如ReentrantLock,那么就会要多先实现共享锁的逻辑(即使是空方法也要实现),所以我猜想是为了子类不必要实现多余的方法,所以才定义成普通的方法并在方法内部抛出异常。

4.3:获取锁源码及流程分析

由于篇幅原因,本文将以独占且忽略中断的模式的方法未入口分析,首先看一下获取锁的方法,获取锁的总体流程图如下:

获取锁的入口方法

  public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            //acquireQueued方法会返回线程在挂起过程中是否被中断,然后返回线程中断的状态
            selfInterrupt();
    }

可以看到获取锁的方法是一个由final修饰的不可被子类重写的方法,首先调用了上面的模板方法(必须由子类重写获取逻辑)获取锁

如果获取成功则获取锁流程执行结束。否则执行addWaiter方法执行入队逻辑。

线程入队方法

    private Node addWaiter(Node mode) {
        //mode线程获取锁的模式(独占或者共享)
        Node node = new Node(Thread.currentThread(), mode);
        //尝试快速入队,失败则执行完整入队逻辑
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            //如果尾节点不为null,则以原子方式把当前节点设置为尾节点并返回
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //如果尾节点不存(说明头节点和尾节点未初始化)在或者由于竞争导致一次设置尾节点失败,
        //则执行完整入队逻辑(会进行头节点和尾节点初始化的工作)
        enq(node);
        return node;
    }

addWaiter方法会先进行快速入队操作,如果快速入队失败(由于竞争或者头、尾节点未初始化),则进行完整入队操作(如果头、尾节点未初始化的话会先进行初始化操作)

完整入队逻辑

    private Node enq(final Node node) {
        //自旋把当前节点链接到尾节点之后
        for (;;) {
            Node t = tail;
            if (t == null) {
                //尾节为空,说明队列为空,则需要进行头节点和尾节点的初始化
                //这里直接new Node(),一个虚节点作为头节点,然后将头节点和尾节点相同
                //这里说明头节点和尾节点不存储数据
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                //尾节点不为空,使用cas把当前节点设置为尾节点
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

enq方法会利用自旋先检查头节点和尾节点是否初始化,如果未初始化的话则先进行初始化。初始化完成之后以原子的方式插入node到队尾并且插入成功之后返回此节点。

挂起线程并等待获取锁

final boolean acquireQueued(final Node node, int arg) {
    	//是否失败,此线程被中断可能失败
        boolean failed = true;
        try {
            //线程是否被中断
            boolean interrupted = false;
            //自旋一直获取锁
            for (;;) {
                //获取当前节点的前驱节点
                final Node p = node.predecessor();
                //如果当前节点的前驱节点是头节点(因为头节点是虚节点,所以当前节点可以获取锁),
                //且当前节点获取所成功
                if (p == head && tryAcquire(arg)) {
                    //设置node为头节点,因为当前节点已经获取锁成功了,当前节点需要作为头节点
                    setHead(node);
                    p.next = null; // help GC
                    //设置失败标志为false
                    failed = false;
                    //返回中断状态
                    return interrupted;
                }
                //shouldParkAfterFailedAcquire方法检查并更新获取失败的节点的状态,如果线程应该挂起则返回true
                //parkAndCheckInterrupt则挂起线程并返回是否中断
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            //失败,则取消获取锁
            if (failed)
                cancelAcquire(node);
        }
    }

​ 通过上面流程分析可知,获取锁失败,会调用addWaiter方法把当前节点放到队尾,那么线程入队之后什么时候挂起线程,什么时候出队,我们一点一点分析acquireQueued方法这些问题就会逐渐显露出来。

​ 首先该方法会一直自旋获取锁(中间可能会被挂起,防止无效自旋),判断当前节点的前驱节点是否是头节点来判断当前是否有获取锁的资格,如果有的话则设置当前节点为头节点并返回中断状态。否则调用shouldParkAfterFailedAcquire判断获取锁失败后是否可以挂起,可以的话则调用parkAndCheckInterrupt进行线程挂起操作。

设置头节点

    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }

注:setHead方法是把当前节点置为虚节点,但并没有修改waitStatus,因为其他地方要用到。

检查并更新获取失败的节点的状态

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * node的状态已经是SIGNAL可以安全的挂起,直接返回true 
             */
            return true;
        if (ws > 0) {
            /*
             *	由之前的waitStatus变量枚举值可知,waitStatus大于0为取消状态,直接跳过此节点
             */
            do {
                //重新链接prev指针,至于为什么没有操作next指针后面会通过代码解释
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * 原子方式设置waitStatus的值为SIGNAL
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

挂起并检查线程的中断状态

    private final boolean parkAndCheckInterrupt() {
    	//使用LockSupport挂起此线程
        LockSupport.park(this);
        //返回并清除中断状态
        return Thread.interrupted();
    }

取消获取锁

private void cancelAcquire(Node node) {
        // 忽略不存在的节点
        if (node == null)
            return;
		//设置当前节点不持有线程
        node.thread = null;

        // 跳过取消的前驱节点
        Node pred = node.prev;
    	//waitStatus>0未已取消的节点
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        // 未取消的节点的后继节点
        Node predNext = pred.next;

        //设置状态未取消状态
        node.waitStatus = Node.CANCELLED;

        // 如果node为尾节点,设置pred为尾节点,然后设置尾节点的下一个节点为null
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            int ws;
        	// 如果当前节点不是head的后继节点,
            //1:判断当前节点前驱节点的是否为SIGNAL,
            //2:如果不是,则把前驱节点设置为SINGAL看是否成功
    		// 如果1和2中有一个为true,再判断当前节点的线程是否为null
   	 		// 如果上述条件都满足,把当前节点的前驱节点的后继指针指向当前节点的后继节点
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    //为什么没有自旋,如果此处设置失败,下次仍然会丢掉predNext到next中间节点,所以不会出现问题
                    compareAndSetNext(pred, predNext, next);
            } else {
                //当前节点是头节点的后继节点或者上述条件都不满足
                unparkSuccessor(node);
            }
			//为什么此处能help GC,不得不多说Doug Lea大神心思之缜密,考虑之周全。
            //解释参考http://www.codebaoku.com/question/question-sd-1010000043795300.html
            node.next = node; // help GC
        }
    }

当node==tail时,节点变化情况如下图

当pred==head时,节点的变化情况如下图

当pred!=head时,且上述条件满足时节点的变化情况如下图

通过上面的流程,我们对于取消获取锁的cancelAcquire方法对节点操作状态的产生和变化已经有了一定的了解,但是为什么所有的变化都是对next指针进行了操作,而没有对Prev指针进行操作呢?带着这个问题我们回顾一下shouldParkAfterFailedAcquire方法。

do {
	node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);

原因:执行cancelAcquire的时候,当前节点的前置节点可能已经从队列中出去了(已经执行过Try代码块中的shouldParkAfterFailedAcquire方法了),如果此时修改Prev指针,有可能会导致Prev指向另一个已经移除队列的Node,因此这块变化Prev指针不安全。 shouldParkAfterFailedAcquire方法中,会执行下面的代码,其实就是在处理Prev指针。shouldParkAfterFailedAcquire是获取锁失败的情况下才会执行,进入该方法后,说明共享资源已被获取,当前节点之前的节点都不会出现变化,因此这个时候变更Prev指针比较安全。

唤醒后继节点

    private void unparkSuccessor(Node node) {
        /*
         * node一般为head节点,如果waitStatus为负,则尝试清除信号,设置为0
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         * 正常情况下我们是要唤醒头节点的后继节点,但是如果后继节点为空或者已被取消,
         * 则需要从尾节点开始,找到离头节点最近的未被取消的后继节点。
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        // 如果当前节点的下个节点不为空,而且状态小于等于0,就把当前节点唤醒
        if (s != null)
            LockSupport.unpark(s.thread);
    }

为什么需要从后往前找呢?从美团技术团队中的一片文章中(https://tech.meituan.com/2019/12/05/aqs-theory-and- apply.html)找到了答案,我把大佬的解释放到下面,供大家参考!

4.4:释放锁源码及流程分析

释放锁流程图如下:

    public final boolean release(int arg) {
        //尝试释放锁成功
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        //尝试释放锁失败
        return false;
    }

释放锁的流程就比较简单了,先尝试释放锁,如果释放锁成功,(如果头节点不为null且头节点的状态不等于0则释放头节点的后继节点)返回true,否则返回false。

5:对AQS的设计与实现的一些思考

​ 1:设计方面 ,AQS作为底层一个通用的模板框架类,就要考虑到一些易用性和扩展性,比如作者模板方法使用的抛出异常,而不是作为抽象方法强制使用者实现所有的模板方法,而是使用者可以自由选择要使用的方法和特性选择性实现模板方法,当然也牺牲掉了一些其他东西,比如设计原则的最小职责性。这也就体现了一些折衷的思想,任何方案都不是完美的,我们只有权衡利弊之后达到一个相对完美的方案。

​ 2:实现方面 ,AQS的实现不得不令人惊叹,每一次读都会想到书读百遍,其意自现 这句名言,每次读有不一样的收获,看似一行不经意的代码,体现了作者对每一行代码细致又独到的思考。在读AQS代码的时候参考下面的链接,看大牛对AQS的的理解时见解,不仅加深了我对AQS核心思想的理解,也让我从另一方面看到了AQS的优秀之处(由于个人水平有限,理解不到位或者错误还请各位道友不吝赐教) 。路漫漫其修远兮,吾将上下而求索。

参考:

https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html

//help GC 相关的解释
http://www.codebaoku.com/question/question-sd-1010000043795300.html

/xiao-cheng-xu-ding-zhi/javazhong-aqsde-yuan-li-yu-shi-xian-207.html