CountDownLatch:计数门闩,可以用来协调多个线程的协作,使用CountDownLatch的典型场景:某项工作需要多个线程共同来完成,并且其中一个线程(往往是主线程)需要等待其他线程都已经完成了自己的工作时才能继续进行,否则要等待。下面分析CountDownLatch源码:
package java.util.concurrent;
import java.util.concurrent.locks.*;
import java.util.concurrent.atomic.*;
public class CountDownLatch {
/**
* Synchronization control For CountDownLatch.
* Uses AQS state to represent count.
*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
public int tryAcquireShared(int acquires) {
return getState() == 0? 1 : -1;
}
public boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
private final Sync sync;
/**
* 构造函数,关键代码是构建了Sync对象
*
*/
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
/**
* 如果门闩值>0,则当前线程等待。否则该方法立刻返回。
* 调用countDown方法会使门闩值减少。
*/
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
/**
* 如果门闩值>0,则当前线程等待,直到门闩值<=0|| timeout时间到。
* 否则该方法立刻返回。
* 调用countDown方法会使门闩值减少。
*/
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
/**
* 减少门闩值, 如果门闩值==0,则释放所有等待的线程。
*
*/
public void countDown() {
sync.releaseShared(1);
}
public long getCount() {
return sync.getCount();
}
public String toString() {
return super.toString() + "[Count = " + sync.getCount() + "]";
}
}
CountDownLatch代码非常简单,关键方法已经加上了注释,CountDownLatch的核心逻辑都委托给了Sync这个静态内部类。Sync类继承了AbstractQueuedSynchronizer 。AbstractQueuedSynchronizer 为实现依赖于先进先出 (FIFO) 等待队列的阻塞锁定和相关同步器(信号量、事件,等等)提供一个框架。此类的设计目标是成为依靠单个原子 int 值来表示状态的大多数同步器的一个有用基础。通俗点说,用java的人都知道synchronized能够对一个需要确保线程安全的对象,方法实现多线程并发控制,这是在java语法层次的实现,而AbstractQueuedSynchronizer 则是在应用层次而不是语法层次(更高的层次)提供了实现多线程并发控制组件的基础框架,通过AbstractQueuedSynchronizer
我们可以根据自己的需要设计灵活的多线程并发控制组件,CountDownLatch就是这种组件的典型代表。AbstractQueuedSynchronizer 代码比较复杂,先由浅至深的开始学习,
首先看一下其类层次结构:
可以看出AbstractQueuedSynchronizer的子类都是静态内部类(比如CountDownLatch.Sync),其javadoc已明确说明了具体用法:
* <p>Subclasses should be defined as non-public internal helper
* classes that are used to implement the synchronization properties
* of their enclosing class.
AbstractQueuedSynchronizer的子类都被定义成非公有内部帮助类,来实现附属类的同步策略。
AbstractQueuedSynchronizer的关键设计思想:
1. 使用等待队列来维护所有等待的线程,等待队列的具体实现是一个双向链表,链表的每个节点由AbstractQueuedSynchronizer.Node类体现。AbstractQueuedSynchronizer的私有成员head,tail是链表的头尾节点:
/**
* Head of the wait queue, lazily initialized. Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
*/
private transient volatile Node head;
/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail;
AbstractQueuedSynchronizer的等待队列是"CLH"队列的变体,"CLH"队列经常用来实现"自旋锁",而这里主要是借鉴"CLH"队列的思想来持有等待线程的控制信息。队列中的每个节点保存如下信息:
/**
* 状态域,取值仅限于下面几个:
* SIGNAL:
* 当前节点的后继节点被阻塞(通过park),所以当当前节点释放或者撤销的时候必须启动(unpark)它的后继节点。
* 为了避免竞争,acquire的相关方法必须首先指出他们需要一个signal,然后不断地重试原子的acquire,如果失败就阻塞(block)。
* CANCELLED: This node is cancelled due to timeout or interrupt.
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
* CONDITION: This node is currently on a condition queue.
* It will not be used as a sync queue node
* until transferred, at which time the status
* will be set to 0. (Use of this value here has
* nothing to do with the other uses of the
* field, but simplifies mechanics.)
* PROPAGATE: A releaseShared should be propagated to other
* nodes. This is set (for head node only) in
* doReleaseShared to ensure propagation
* continues, even if other operations have
* since intervened.
* 0: None of the above
*
* The values are arranged numerically to simplify use.
* Non-negative values mean that a node doesn't need to
* signal. So, most code doesn't need to check for particular
* values, just for sign.
*
* The field is initialized to 0 for normal sync nodes, and
* CONDITION for condition nodes. It is modified only using
* CAS.
*/
volatile int waitStatus;
/**
*
*/
volatile Node prev;
/**
*
*/
volatile Node next;
/**
* The thread that enqueued this node. Initialized on
* construction and nulled out after use.
*/
volatile Thread thread;
/**
* Link to next node waiting on condition, or the special
* value SHARED. Because condition queues are accessed only
* when holding in exclusive mode, we just need a simple
* linked queue to hold nodes while they are waiting on
* conditions. They are then transferred to the queue to
* re-acquire. And because conditions can only be exclusive,
* we save a field by using special value to indicate shared
* mode.
*/
Node nextWaiter;
2. 使用int值来代表同步器的状态:
/**
* The synchronization state.
*/
private volatile int state;
/**
* Returns the current value of synchronization state.
* This operation has memory semantics of a <tt>volatile</tt> read.
* @return current state value
*/
protected final int getState() {
return state;
}
/**
* Sets the value of synchronization state.
* This operation has memory semantics of a <tt>volatile</tt> write.
* @param newState the new state value
*/
protected final void setState(int newState) {
state = newState;
}
3. 使用"自旋锁"的思想通过对volatile的类成员的安全访问来实现锁控制和管理
以CountDownLatch.await方法为例:
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
F3进去:
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
F3进去:
/**
* Acquires in shared interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
break;
}
} catch (RuntimeException ex) {
cancelAcquire(node);
throw ex;
}
// Arrive here only if interrupted
cancelAcquire(node);
throw new InterruptedException();
}
- 大小: 69.3 KB
分享到:
相关推荐
Java并发包源码分析(JDK1.8):囊括了java.util.concurrent包中大部分类的源码分析,其中涉及automic包,locks包(AbstractQueuedSynchronizer、ReentrantLock、ReentrantReadWriteLock、LockSupport等),queue...
java8 源码 java8源码+注释 AbstractQueuedSynchronizer ReentrantLock Condition CountDownLatch Semaphore
CountDownLatch,CyclicBarrier,Semaphore源码解析.mp4 提前完成任务之FutureTask使用.mp4 Future设计模式实现(实现类似于JDK提供的Future).mp4 Future源码解读.mp4 ForkJoin框架详解.mp4 同步容器与并发容器.mp4 ...
18.一篇文章,从源码深入详解ThreadLocal内存泄漏问题 19.并发容器之BlockingQueue 20.并发容器之ArrayBlockingQueue和LinkedBlockingQueue实现原理详解 21.线程池ThreadPoolExecutor实现原理 22.线程池之...
第41节CountDownLatch,CyclicBarrier,Semaphore源码解析00:29:57分钟 | 第42节提前完成任务之FutureTask使用00:11:43分钟 | 第43节Future设计模式实现(实现类似于JDK提供的Future)00:19:20分钟 | 第44节Future...
第41节CountDownLatch,CyclicBarrier,Semaphore源码解析00:29:57分钟 | 第42节提前完成任务之FutureTask使用00:11:43分钟 | 第43节Future设计模式实现(实现类似于JDK提供的Future)00:19:20分钟 | 第44节Future...
第41节CountDownLatch,CyclicBarrier,Semaphore源码解析00:29:57分钟 | 第42节提前完成任务之FutureTask使用00:11:43分钟 | 第43节Future设计模式实现(实现类似于JDK提供的Future)00:19:20分钟 | 第44节Future...
第41节CountDownLatch,CyclicBarrier,Semaphore源码解析00:29:57分钟 | 第42节提前完成任务之FutureTask使用00:11:43分钟 | 第43节Future设计模式实现(实现类似于JDK提供的Future)00:19:20分钟 | 第44节Future...
AQS是J.U.C包下AbstractQueuedSynchronizer抽象的队列式的同步器的简称,这是一个抽象类,它定义了一套多线程访问共享资源的同步器框架,J.U.C包下的许多同步类实现都依赖于它,比如ReentrantLock/Semaphore/...