`
jiangwenfeng762
  • 浏览: 285581 次
  • 性别: Icon_minigender_1
  • 来自: 北京
文章分类
社区版块
存档分类
最新评论

通过CountDownLatch来分析AbstractQueuedSynchronizer的源码

阅读更多

CountDownLatch:计数门闩,可以用来协调多个线程的协作,使用CountDownLatch的典型场景:某项工作需要多个线程共同来完成,并且其中一个线程(往往是主线程)需要等待其他线程都已经完成了自己的工作时才能继续进行,否则要等待。下面分析CountDownLatch源码:

 

package java.util.concurrent;
import java.util.concurrent.locks.*;
import java.util.concurrent.atomic.*;

public class CountDownLatch {
    /**
     * Synchronization control For CountDownLatch.
     * Uses AQS state to represent count.
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        public int tryAcquireShared(int acquires) {
            return getState() == 0? 1 : -1;
        }

        public boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

    private final Sync sync;

    /**
     * 构造函数,关键代码是构建了Sync对象
     *
     */
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

    /**
	 * 如果门闩值>0,则当前线程等待。否则该方法立刻返回。
	 * 调用countDown方法会使门闩值减少。
     */
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    /**
     * 如果门闩值>0,则当前线程等待,直到门闩值<=0|| timeout时间到。
	 * 否则该方法立刻返回。
	 * 调用countDown方法会使门闩值减少。
     */
    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    /**
     * 减少门闩值, 如果门闩值==0,则释放所有等待的线程。
     *
     */
    public void countDown() {
        sync.releaseShared(1);
    }

    public long getCount() {
        return sync.getCount();
    }

    public String toString() {
        return super.toString() + "[Count = " + sync.getCount() + "]";
    }
}

 CountDownLatch代码非常简单,关键方法已经加上了注释,CountDownLatch的核心逻辑都委托给了Sync这个静态内部类。Sync类继承了AbstractQueuedSynchronizer 。AbstractQueuedSynchronizer 为实现依赖于先进先出 (FIFO) 等待队列的阻塞锁定和相关同步器(信号量、事件,等等)提供一个框架。此类的设计目标是成为依靠单个原子 int 值来表示状态的大多数同步器的一个有用基础。通俗点说,用java的人都知道synchronized能够对一个需要确保线程安全的对象,方法实现多线程并发控制,这是在java语法层次的实现,而AbstractQueuedSynchronizer 则是在应用层次而不是语法层次(更高的层次)提供了实现多线程并发控制组件的基础框架,通过AbstractQueuedSynchronizer

我们可以根据自己的需要设计灵活的多线程并发控制组件,CountDownLatch就是这种组件的典型代表。AbstractQueuedSynchronizer 代码比较复杂,先由浅至深的开始学习,

首先看一下其类层次结构:




 可以看出AbstractQueuedSynchronizer的子类都是静态内部类(比如CountDownLatch.Sync),其javadoc已明确说明了具体用法:

* <p>Subclasses should be defined as non-public internal helper
* classes that are used to implement the synchronization properties
* of their enclosing class. 

 

AbstractQueuedSynchronizer的子类都被定义成非公有内部帮助类,来实现附属类的同步策略。

 

AbstractQueuedSynchronizer的关键设计思想:

1. 使用等待队列来维护所有等待的线程,等待队列的具体实现是一个双向链表,链表的每个节点由AbstractQueuedSynchronizer.Node类体现。AbstractQueuedSynchronizer的私有成员head,tail是链表的头尾节点:

 

    /**
     * Head of the wait queue, lazily initialized.  Except for
     * initialization, it is modified only via method setHead.  Note:
     * If head exists, its waitStatus is guaranteed not to be
     * CANCELLED.
     */
    private transient volatile Node head;

    /**
     * Tail of the wait queue, lazily initialized.  Modified only via
     * method enq to add new wait node.
     */
    private transient volatile Node tail;

AbstractQueuedSynchronizer的等待队列是"CLH"队列的变体,"CLH"队列经常用来实现"自旋锁",而这里主要是借鉴"CLH"队列的思想来持有等待线程的控制信息。队列中的每个节点保存如下信息:

 

        /**
	 * 状态域,取值仅限于下面几个:
         *   SIGNAL:     
	 *				当前节点的后继节点被阻塞(通过park),所以当当前节点释放或者撤销的时候必须启动(unpark)它的后继节点。
	*				为了避免竞争,acquire的相关方法必须首先指出他们需要一个signal,然后不断地重试原子的acquire,如果失败就阻塞(block)。
		 
         *   CANCELLED:  This node is cancelled due to timeout or interrupt.
         *               Nodes never leave this state. In particular,
         *               a thread with cancelled node never again blocks.
         *   CONDITION:  This node is currently on a condition queue.
         *               It will not be used as a sync queue node
         *               until transferred, at which time the status
         *               will be set to 0. (Use of this value here has
         *               nothing to do with the other uses of the
         *               field, but simplifies mechanics.)
         *   PROPAGATE:  A releaseShared should be propagated to other
         *               nodes. This is set (for head node only) in
         *               doReleaseShared to ensure propagation
         *               continues, even if other operations have
         *               since intervened. 
         *   0:          None of the above
         *
         * The values are arranged numerically to simplify use.
         * Non-negative values mean that a node doesn't need to
         * signal. So, most code doesn't need to check for particular
         * values, just for sign.
         *
         * The field is initialized to 0 for normal sync nodes, and
         * CONDITION for condition nodes.  It is modified only using
         * CAS.
         */
        volatile int waitStatus;

        /**
	*
         */
        volatile Node prev;

        /**
         * 
         */
        volatile Node next;

        /**
         * The thread that enqueued this node.  Initialized on
         * construction and nulled out after use.
         */
        volatile Thread thread;

        /**
         * Link to next node waiting on condition, or the special
         * value SHARED.  Because condition queues are accessed only
         * when holding in exclusive mode, we just need a simple
         * linked queue to hold nodes while they are waiting on
         * conditions. They are then transferred to the queue to
         * re-acquire. And because conditions can only be exclusive,
         * we save a field by using special value to indicate shared
         * mode.
         */
        Node nextWaiter;
 

 

 2. 使用int值来代表同步器的状态:

 

    /**
     * The synchronization state.
     */
    private volatile int state;

    /**
     * Returns the current value of synchronization state.
     * This operation has memory semantics of a <tt>volatile</tt> read.
     * @return current state value
     */
    protected final int getState() {
        return state;
    }

    /**
     * Sets the value of synchronization state.
     * This operation has memory semantics of a <tt>volatile</tt> write.
     * @param newState the new state value
     */
    protected final void setState(int newState) {
        state = newState;
    }

3. 使用"自旋锁"的思想通过对volatile的类成员的安全访问来实现锁控制和管理

以CountDownLatch.await方法为例:

 

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
 

 F3进去:

 

    public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

 F3进去:

 

    /**
     * Acquires in shared interruptible mode.
     * @param arg the acquire argument
     */
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    break;
            }
        } catch (RuntimeException ex) {
            cancelAcquire(node);
            throw ex;
        }
        // Arrive here only if interrupted
        cancelAcquire(node);
        throw new InterruptedException();
    }
  • 大小: 69.3 KB
分享到:
评论

相关推荐

    Java并发包源码分析(JDK1.8)

    Java并发包源码分析(JDK1.8):囊括了java.util.concurrent包中大部分类的源码分析,其中涉及automic包,locks包(AbstractQueuedSynchronizer、ReentrantLock、ReentrantReadWriteLock、LockSupport等),queue...

    java8源码-java8-source:java8

    java8 源码 java8源码+注释 AbstractQueuedSynchronizer ReentrantLock Condition CountDownLatch Semaphore

    Java并发编程原理与实战

    CountDownLatch,CyclicBarrier,Semaphore源码解析.mp4 提前完成任务之FutureTask使用.mp4 Future设计模式实现(实现类似于JDK提供的Future).mp4 Future源码解读.mp4 ForkJoin框架详解.mp4 同步容器与并发容器.mp4 ...

    【2018最新最详细】并发多线程教程

    18.一篇文章,从源码深入详解ThreadLocal内存泄漏问题 19.并发容器之BlockingQueue 20.并发容器之ArrayBlockingQueue和LinkedBlockingQueue实现原理详解 21.线程池ThreadPoolExecutor实现原理 22.线程池之...

    龙果 java并发编程原理实战

    第41节CountDownLatch,CyclicBarrier,Semaphore源码解析00:29:57分钟 | 第42节提前完成任务之FutureTask使用00:11:43分钟 | 第43节Future设计模式实现(实现类似于JDK提供的Future)00:19:20分钟 | 第44节Future...

    Java 并发编程原理与实战视频

    第41节CountDownLatch,CyclicBarrier,Semaphore源码解析00:29:57分钟 | 第42节提前完成任务之FutureTask使用00:11:43分钟 | 第43节Future设计模式实现(实现类似于JDK提供的Future)00:19:20分钟 | 第44节Future...

    龙果java并发编程完整视频

    第41节CountDownLatch,CyclicBarrier,Semaphore源码解析00:29:57分钟 | 第42节提前完成任务之FutureTask使用00:11:43分钟 | 第43节Future设计模式实现(实现类似于JDK提供的Future)00:19:20分钟 | 第44节Future...

    java并发编程

    第41节CountDownLatch,CyclicBarrier,Semaphore源码解析00:29:57分钟 | 第42节提前完成任务之FutureTask使用00:11:43分钟 | 第43节Future设计模式实现(实现类似于JDK提供的Future)00:19:20分钟 | 第44节Future...

    Java并发编程:用AQS写一把可重入锁

    AQS是J.U.C包下AbstractQueuedSynchronizer抽象的队列式的同步器的简称,这是一个抽象类,它定义了一套多线程访问共享资源的同步器框架,J.U.C包下的许多同步类实现都依赖于它,比如ReentrantLock/Semaphore/...

Global site tag (gtag.js) - Google Analytics