线程同步工具类CountDownLatch

发布时间 2023-04-12 08:14:43作者: _mcj

1.说明

CountDownLatch是线程同步计数器:使线程能够在满足一些条件之后再执行。其是通过计数器实现的,当满足一个条件之后,计数器数量减一,直到其数量为0时,被挂起的线程恢复执行。
其内部维护了一个静态内部类Sync,该类继承了AbstractQueuedSynchronizer,这个类在前面的AQS这里讲过了一些。
CountDownLatch中常用的方法如下为:

public CountDownLatch(int count);
public void await() throws InterruptedException;
public void countDown();

countDownLatch是构造函数初始化值,也就是要满足的条件数量。
await是调用后,其所在的线程挂起。
countDown这个方法是在满足其中一个条件后执行,然后将计数器数量减一。

2.示例

public static void main(String[] args) throws InterruptedException {
        // 用于通知线程池的线程开始执行
        CountDownLatch notice = new CountDownLatch(1);
        // 用于通知四个线程执行完毕
        CountDownLatch countDownLatch = new CountDownLatch(4);
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 4; i++) {
            fixedThreadPool.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println("线程:" + Thread.currentThread().getName() + "等待执行,当前时间:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
                    try {
                        // 等待开始执行通知
                        notice.await();
                        Thread.sleep(5000);
                        System.out.println("线程:" + Thread.currentThread().getName() + "执行完成,当前时间:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
                        // 一个线程执行完,计数器数量减一
                        countDownLatch.countDown();
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
            });
        }
        Thread.sleep(10000);
        System.out.println("线程:" + Thread.currentThread().getName() + "发送执行通知,当前时间:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
        // 发送开始执行通知
        notice.countDown();
        // 等待四个线程执行结束
        countDownLatch.await();
        System.out.println("四个线程执行完毕,当前时间:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
    }

执行结果:
image
结果分析:
从上面的执行结果可以看出四个线程在得到主线程发出执行通知之前一直是处于等待状态,当主线程发出通知之后,那四个线程才开始进行执行。同时主线程也一直在等待四个线程执行完成,执行完成之后才会进行后面的输出。

3.分析

await()方法:

public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }


// acquireSharedInterruptibly方法:

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        // 当计数器的数量不等于0时
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }


// tryAcquireShared()方法,该方法只有当计数器的数量等于0的时候返回值才大于0,其它情况的返回值都小于0
protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        // 向队列中添加一个共享节点,其中addWaiter在AQS中已经说明
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                // 该节点的前驱节点
                final Node p = node.predecessor();
                // 前驱节点是头节点
                if (p == head) {
                    //获取计数器数量是否为0
                    int r = tryAcquireShared(arg);
                    // 计数器数量为0
                    if (r >= 0) {
                        // 设置头节点
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }


// setHeadAndPropagate方法,设置头节点
private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        // 设置当前节点为头节点
        setHead(node);

        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                // 恢复后继节点
                doReleaseShared();
        }
    }

countDown()方法:

// 递减计数器的计数,如果计数达到0,则释放所有等待的线程
// 如果当前计数器大于0,则递减
// 如果当前计数器等于0,则不会进行任何操作
public void countDown() {
        sync.releaseShared(1);
    }
public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            // 恢复后继节点
            doReleaseShared();
            return true;
        }
        return false;
    }
// 只有当计数器的数量为0的时候,返回值才是true,其余的情况返回值则为false
protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
private void doReleaseShared() {
        for (;;) {
            Node h = head;
            // 头节点不为null,并且不等于尾节点
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                // 当前头结点的后续节点需要取消停靠
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    // 恢复后继节点,这个方法在AQS那里也有说明
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }