跳到主要内容

15、Java JUC源码分析 - CountDownLatch原理、源码解析

一、概述

 

1、作用

允许一个或多个线程等待,直到在其他线程中执行的一组操作完成,再继续执行。

2、demo

常用方法:

// 创建一个count为2的计数器;
CountDownLatch count = new CountDownLatch(2);

// 对计数器进行递减1操作,当计数器递减至0时,当前线程会去唤醒阻塞队列里的所有线程;
count.countDown();

//阻塞当前线程,将当前线程加入阻塞队列;直到count为0时 唤醒当前线程。
count.await();

// 在timeout的时间之内阻塞当前线程,时间一过当前线程就可以继续执行。
count.await(long timeout, TimeUnit unit);

该demo源自CountDownLatch源码上的注释。

public class CountDownLatchTest {
   
     

    static class WorkerRunnable implements Runnable {
   
     
        private final CountDownLatch doneSignal;
        private final int i;
        WorkerRunnable(CountDownLatch doneSignal, int i) {
   
     
            this.doneSignal = doneSignal;
            this.i = i;
        }
        public void run() {
   
     
            doWork(i);
            doneSignal.countDown();
        }

        void doWork(int i) {
   
     
            System.out.println(i);
        }
    }

    public static void main(String[] args) throws InterruptedException {
   
     
        CountDownLatch doneSignal = new CountDownLatch(5);
        ExecutorService e = Executors.newFixedThreadPool(3);

        for (int i = 0; i < 5; ++i) // create and start threads
            e.execute(new WorkerRunnable(doneSignal, i));

        doneSignal.await();           // wait for all to finish
        System.out.println("所有线程执行结束,回到主线程");
        e.shutdown();
    }
}

3、应用场景

CountDownLatch可以用于 一个或者多个线程在执行之前 依赖于 某些前提业务先执行的场景。

  1. 多线程加载文件中的数据到内存中,等到所有的文件数据都加载完之后;主线程继续执行。

源码中的应用:

  • RocketMQ中BrokerOuterAPI#registerBrokerAll()方法中,Broker采用线程池机制向所有的NameServer中发送心跳时用CountDownLatch来阻塞主线程,保证同步发送

二、实现原理

1> CountDownLatch在并发编程中充当一个计时器,其维护一个变量count用于表示操作数;

  • 当一个操作执行完成时,原子性的减少一个count;
  • 当count变量为0时,代表所有操作均已完成,唤醒等待的线程。

2> 具体代码实现:

  • CountDownLatch内部集成一个Sync类,从其构造函数可以看到:表示操作数的count值和核心操作都由同步器类Sync完成。

  • 而Sync集成自AQS,所以本质上CountDownLatch就是使用AQS的共享锁机制,实现锁的获取、线程的阻塞/唤醒;

  • 当调用countDownLatch.await()方法时,会尝试获取共享锁,只有当count的值为0时才能获取到;

  • 未获取到锁的话,则创建一个线程节点Node,通过AQS的addWaiter()方法加入到AQS的阻塞队列,并把当前线程阻塞挂起。

  • 当调用countDownLatch.countDown()方法时,会在AQS中释放一次共享锁,即对count,也就是AQS的state变量进行减一操作;

  • 当state=0时,将AQS阻塞队列里的阻塞线程全部唤醒;即阻塞在await()方法那的线程获取到共享锁,继续往下执行。

3> 另外根据happens-before原则,await()方法一定happens-before countDown()方法。

三、源码解析

1、核心变量和构造函数

CountDownLatch下就一个Sync变量,由此可见:CountDownLatch的所有操作都是围绕Sync类来的。

// Sync同步器,通过该对象调用AQS的方法,进而实现线程的阻塞和唤醒
private final Sync sync;

// 根据传入的count值初始化计数器
public CountDownLatch(int count) {
   
     
    if (count < 0) throw new IllegalArgumentException("count < 0");
    // 创建一个Sync对象 并传入count值,Sync内部将会执行setState(count)
    this.sync = new Sync(count);
}

private static final class Sync extends AbstractQueuedSynchronizer {
   
     

    Sync(int count) {
   
     
        setState(count);
    }
}

2、await()

当我们调用countDownLatch.wait()的时候,会创建一个线程节点Node,加入到AQS的阻塞队列,并把改线程挂起阻塞。

内部调用AQS的实现方法acquireSharedInterruptibly(),我们可以看到模板方法acquireSharedInterruptibly(),响应线程中断式的获取锁。

public void await() throws InterruptedException {
   
     
    sync.acquireSharedInterruptibly(1);
}

下面是AQS的acquireSharedInterruptibly()方法:

// AQS类的模板方法
public final void acquireSharedInterruptibly(int arg)
    throws InterruptedException {
   
     
    // 响应线程的中断
    if (Thread.interrupted())
        throw new InterruptedException();
    
    // 调用子类tryAcquireShared()方法的实现,让子类实现自己获取信号量的机制。
    // tryAcquireShared()方法返回-1表示获取锁失败,阻塞当前线程节点
    if (tryAcquireShared(arg) < 0)
        // 线程获取锁失败,加入到AQS的同步队列中阻塞等待。
        doAcquireSharedInterruptibly(arg);
}

CountDownLatch的内部类Sync中实现了尝试获取共享锁(tryAcquireShared)逻辑;

// Sync类中
protected int tryAcquireShared(int acquires) {
   
     
    // 如果state变量为0,表示共享锁已经被取完了,无法再获取共享锁。即:当AQS中的state变量不为0时,也就是CountDownLatch中的count还不为0,需要阻塞线程。
    return (getState() == 0) ? 1 : -1;
}

CountDownLatch实际上是一种共享锁机制,即锁可以同时被多个线程获取。因为一旦count被减小为0,所有的线程走到await()方法时,都能够顺利通过,不会因为获取不到锁而阻塞。
而且Sync直接将count值作为AQS的state的值,只有state的值为0,线程才能获取锁,即获得执行线程的权限。

3、countDown()

当我们调用countDownLatch.down()方法的时候,会对计数器进行减1操作,AQS内部是通过释放共享锁的方式,对state进行减1操作;当state=0的时候会将AQS阻塞队列里的阻塞线程全部唤醒。

public void countDown() {
   
     
    sync.releaseShared(1);
}

滋滋滋,又是完成依赖AQS的实现;

// 调用AQS类的模板方法
public final boolean releaseShared(int arg) {
   
     
    // 子类自己实现自己的释放锁逻辑
    if (tryReleaseShared(arg)) {
   
     
        // 释放锁成功之后,唤醒阻塞队列中的线程。
        doReleaseShared();
        return true;
    }
    return false;
}

CountDownLatch的内部类Sync中实现了尝试释放共享锁(tryAcquireShared)逻辑;

protected boolean tryReleaseShared(int releases) {
   
     
    // Decrement count; signal when transition to zero
    for (;;) {
   
     
        int c = getState();
        // 如果 state == 0,表明没有线程持有锁
        if (c == 0)
            return false;
        int nextc = c-1;
        // CAS 修改state变量,如果修改后的state = 0 表明释放锁成功
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

四、总结

啊这,CountDownLatch真的是非常简单,代码包括注释也就300多行,除了其内部类Sync重写了AQS的两个方法,底层真的是几乎完全靠AQS实现。

关于AQS的实现原理,我们后面的文章见。

版权声明:本文不是「DDKK.COM 弟弟快看,程序员编程资料站」的原创文章,版权归原作者所有

原文地址:https://saint.blog.csdn.net/article/details/129029852