zhujiangtaotaise 2019-12-26
CountDownLatch基于AQS实现,代码不多,所以很好分析。本文只分析CountDownLatch实现, 关于AQS的实现在另外一篇文章中叙述。
下面是CountDownLatch的类图:
接下来贴上来CountDownLatch的源码:
public class CountDownLatch { private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } int getCount() { return getState(); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } } private final Sync sync; public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } public void countDown() { sync.releaseShared(1); } public long getCount() { return sync.getCount(); } public String toString() { return super.toString() + "[Count = " + sync.getCount() + "]"; } }
首先,实例化CountDownLatch,没有太多需要分析的,只是在构造函数中传入一个数字,我们可以理解为“总操作数”。因为是多线程共享访问这个操作数,所以后续会调用AQS中的相关共享访问方法。
其次,await()为等待函数, 当剩余的“操作数”大于0时候,阻塞当前线程。通过Sync调用AQS:: acquireSharedInterruptibly(),代码如下:
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
可见acquireSharedInterruptibly()会回调tryAcquiredShred()方法,去获取共享模式以进行后续的操作,如果获取成功则会继续进行后续的操作,获取失败则阻塞。
tryAcquiredShared实现时候要注意返回值有三个:负值表示失败,零表示成功获取共享模式但是后续的获取共享模式不会成功,正值表示获取共享模式成功并且后续再次获取也可能成功。所以如果操作数大于0时候要返回负值,这样才能阻塞。CountDownLatch::tryAcquireShared代码就是这么实现的,如下:
protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
第三,countDown()表示消耗一个操作数, 通过countDown()调用 AQS::releaseShared()方法,该方法判断是否消耗成功,如果成功则返回true,否则返回false。
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
调用tryReleaseShared() 返回true表示允许一个await()的线程获得共享资源,中断阻塞。
protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }