跳到主要内容

06、Java JUC源码分析 - CyclicBarrier源码/原理解析

一、概述

 

1、作用?

允许一组线程互相等待,直到到达某个公共屏障(barrier)点。

  • 因为该barrier在释放等待线程后可以重用,所以称它为循环的barrier。

2、使用场景?

用于多线程计算数据,最后合并计算结果的场景。

3、常用类方法?

  • await():告诉CyclicBarrier,线程已经到达了屏障,计数减一;然后阻塞线程,直到count为0。
  • reset():重置CyclicBarrier的未加入到party的数量count和当前代Generation。

4、案例

public class CyclicBarrierTest {
   
     

    public static void main(String[] args) throws Exception {
   
     
        CyclicBarrier cyclicBarrier = new CyclicBarrier(2);

        new Thread(() -> {
   
     
            System.out.println("work thread start + 1");
            try {
   
     
                TimeUnit.SECONDS.sleep(1);
                cyclicBarrier.await();
                // 睡一会,让主线程先干活,work thread再继续
                TimeUnit.MILLISECONDS.sleep(100);
                System.out.println("main thread OK, work thread go on!");
            } catch (Exception e) {
   
     
                e.printStackTrace();
            }
        }, "work-thread").start();

        // 主线程在这等着,等其他线程调用await()方法之后,大家再一起执行
        cyclicBarrier.await();
        System.out.println("main end");
    }
}

二、原理

1)CyclicBarrier中一个generation代表了每一代,通过这个实现CyclicBarrier的复用。

  • parties变量用来表示参与party的线程数;
  • count变量代表了还没到party的线程数;
  • 外加个ReentrantLock锁和一个Condition条件变量实现线程的并发和阻塞。

2)在CyclicBarrier类的内部有一个计数器count,每个线程在到达屏障点的时候都会调用await()方法将自己阻塞排队,并将计数器count减1;
3)当计数器count减为0的时候,所有因调用await()方法而被阻塞的线程将被唤醒
4)线程的排队进入party通过ReentrantLock实现;进入party后睡眠等待所有参会者通过锁的条件等待Condition实现

三、源码解析

1、成员变量和构造器

  1. CyclicBarrier内部是通过条件队列trip对线程进行阻塞;
  2. 两个int型的变量partiescount
  • parties表示每次拦截的线程数,即party的所有参会者
  • count表示还未拦截的线程数,即还有多少参会者没到party;它的初始值和parties相同,调用await()方法减1,减为0时将所有阻塞在条件变量上线程唤醒。
  1. 静态内部类Generation,代表栅栏的当前代,就像玩游戏时代表的本局游戏,利用它可以实现循环等待
  2. barrierCommand,表示换代前执行的任务。当前代结束会执行该任务,然后自动开启下一代
public class CyclicBarrier {
   
     
    // 该类用于CyclicBarrier的一次协同是否完成(正常完成、异常完成)
    // reset后会复用该结构,每一次的party都会生成一个该类的新实例
    private static class Generation {
   
     
        // 当前party有没有被强制中断,false表示没有
        boolean broken = false;
    }

    // 同步锁,线程进入 条件等待 时需要获取锁
    private final ReentrantLock lock = new ReentrantLock();
    // 用于阻塞线程的条件变量(有未到party的线程,已到party的线程则等待在该条件变量上)
    private final Condition trip = lock.newCondition();
    // 参与party的人数
    private final int parties;
    // 当所有的线程都参与到了party中, 回调的方法
    private final Runnable barrierCommand;
    // 当前party
    private Generation generation = new Generation();

    // 还未到party的线程数
    private int count;
    
    // 构造器
    public CyclicBarrier(int parties, Runnable barrierAction) {
   
     
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }
}

2、await()方法

CyclicBarrier的核心方法是await(),该方法是线程相互等待的关键,它有两种实现:一种是带等待超时的,一种是不带等待超时,本质上都是调用了同一个方法dowait(),只是带等待超时的多传了一个时间。

public int await() throws InterruptedException, BrokenBarrierException {
   
     
    try {
   
     
        // 直接调用自己的dowait()方法
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
   
     
        throw new Error(toe); // cannot happen
    }
}

CyclicBarrier#dowait()方法

  • 首先因为await()是同步的,需要先加互斥锁ReentrantLock;
  • 如果发现当前代generation已经崩了,则当前线程直接抛出异常BrokenBarrierException。
  1. 每次进来都将count减1,减完立马进行判断看看count是否等于0:
  • 如果等于0,则执行换代前要执行的任务barrierCommand,然后唤醒所有阻塞等待的线程,接着自动进入CyclicBarrier的下一代;将计数器count的值重新设为parties。如果barrierCommand运行异常,则打破栅栏的当前代,唤醒所有阻塞等待的线程。
  • count不等于0,这进入for循环:
  • 不是超时等待,直接调用Condition.await()阻塞当前线程。
  • 是超时等待,就在nanos时间内循环竞争锁;
  • 如果当前线程在await()获取锁过程中被中断了:
  1. 在当前代还没结束之前打破栅栏,即游戏在中途被打断,则设置generation的broken状态为true并唤醒所有线程。
  2. 当前代已经结束,则直接中断当前线程。
  1. 线程被唤醒后进行下面三个判断:
  1. 如果线程因为broken generation操作(即调用breakBarrier()方法)而被唤醒则抛出异常
  2. 如果线程因为CyclicBarrier正常换代被唤醒,则返回计数器count的值
  3. 如果线程因为超时而被唤醒打破栅栏并抛出TimeOut异常
    注意:如果其中有一个线程因为等待超时而退出,那么整盘游戏也会结束,其他线程都会被唤醒
  1. 最后解锁。
private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
TimeoutException {
   
     
    // 上锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
   
     
        // 保存当前party时的generation快照,generation更新后不会影响这里
        final Generation g = generation;

        // 当party被强制中断时,抛出异常
        if (g.broken)
            throw new BrokenBarrierException();

        // 判断当前线程是否被中断
        if (Thread.interrupted()) {
   
     
            // 有中断的线程混入其中:
            // 1)broken当前generation;
            // 2)唤醒CyclicBarrier阻塞的所有线程;重新开始,注意:此时没有改变party的Generation
            // 3)抛出线程中断异常
            breakBarrier();
            throw new InterruptedException();
        }

        // 每次都将计数器的值减1,即未到场的参会人个数减一。
        int index = --count;
        // 当前线程是最后一个到达party的线程时,回调barrierCommand,然后唤醒所有阻塞在条件变量上的线程。
        // 若回调barrierCommand正常完成,则不需要手动调用reset()就可进入新的一代,因为这里调用了nextGeneration()
        if (index == 0) {
   
     
            boolean ranAction = false;
            try {
   
     
                // 唤醒所有线程前先执行指定的任务
                final Runnable command = barrierCommand;
                if (command != null)
                    // 若回调方法运行出现异常,异常直接上抛
                    command.run();
                ranAction = true;
                // 唤醒所有阻塞的线程,并进入下一代party,修改generation。
                nextGeneration();
                return 0;
            } finally {
   
     
                // barrierCommand回调方法发生了异常,那么设置broker标志位,并唤醒所有阻塞的线程
                if (!ranAction)
                    breakBarrier();
            }
        }

        // 循环等待最后一个参与party的线程 唤醒自己 或者 被中断 或者 等待超时
        for (;;) {
   
     
            try {
   
     
                // 根据传入的参数决定是定时等待还是非定时等待
                if (!timed)
                    trip.await();
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
   
      // 发生中断异常
                // 如果当前线程在CyclicBarrier的等待唤醒期间(即:g没有被改变)被中断,则中断generation,唤醒所有线程
                if (g == generation && ! g.broken) {
   
     
                    breakBarrier();
                    throw ie;
                } else {
   
      // 如果当前线程已经完成在CyclicBarrier上的等待(即:g被改变),则直接标志当前线程被中断
                    Thread.currentThread().interrupt();
                }
            }

			// 如果线程因为broken generation操作而被唤醒则抛出异常
            if (g.broken)
                throw new BrokenBarrierException();

            // 如果g != generation ,说明CyclicBarrier换代了(即:generation改变了),线程因此被唤醒的话,则返回还有多个参会者没进来,即计数器的值count。
            if (g != generation)
                return index;

            // 如果线程因为时间到了而被唤醒,这broken generation 并抛出异常
            if (timed && nanos <= 0L) {
   
     
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
   
     
        lock.unlock();
    }
}

3、breakBarrier()方法:

意味着有人搞破坏,游戏中途结束,将所有的等待线程全部唤醒。

  • await()方法通过抛出BrokenBarrierException 异常返回;
private void breakBarrier() {
   
     
    // 中断当前generation,即打破栅栏
    generation.broken = true;
    // 设置 未参会者数量count 等于 所有需要参会者数量
    count = parties;
    // 唤醒所有阻塞的线程
    trip.signalAll();
}

4、nextGeneration()方法:

开启栅栏新的一代。

private void nextGeneration() {
   
     
    // 唤醒所有阻塞的线程
    trip.signalAll();
    // 设置 未参会者数量count 等于 所有需要参会者数量
    count = parties;
    // 生成栅栏的下一代Generation,这也是和breakBarrier()方法的区别
    generation = new Generation();
}

5、reset()方法:

重置一个栅栏

  • 打破栅栏 中断当前代,await()方法通过抛出BrokenBarrierException 异常返回;
  • 开始新的下一代,重置count和generation。
public void reset() {
   
     
    final ReentrantLock lock = this.lock;
    // 上锁
    lock.lock();
    try {
   
     
        breakBarrier();   // 将所有参与party的线程唤醒
        nextGeneration(); // 生成下一代
    } finally {
   
     
        lock.unlock();
    }
}

barrierCommand正常完成,则不需要手动调用reset()就可自动进入新的一代,因为运行barrierCommand之后调用了nextGeneration()。

四、总结

简单说就是一个ReentrantLock加上一个Condition条件变量实现并发控制和多个线程的阻塞等待。
并且采用多线程协作机制,在多个线程协作过程中,只要有一个线程被中断或者发生异常,则整个协作过程取消

CyclicBarrier和CountDownLatch相同的是,它们都能让多个线程协调在某一个节点上等待;下面我们看一下他们的区别?

2、CyclicBarrier和CountDownLatch的区别?

1> 是否可复用?

  • CountDownLatch的计数器只能使用一次;
  • CyclicBarrier的计数器可以使用reset()方法重置进而循环使用。

2> 唤醒方式?

  • CountDownLatch是多线程阻塞后,需要等待外界条件达到某种状态才会被统一唤醒,即CountDownLatch还需要额外的countDown()唤醒操作。
  • CyclicBarrier是多线程协作,当线程达到等待数量或者一个线程出现异常或被中断时自动放行;即CyclicBarrier中只需要await(),不需要额外的唤醒操作;

3> 从性能来看:

  • CountDownLatch的性能大于CyclicBarrier,因为CountDownLatch是自己采用CAS,利用共享锁的原理实现;
  • 而CyclicBarrier是采用ReentrantLock独占锁 +Condition条件变量实现;

4> 从应用场景来看:

  • CyclicBarrier能处理更为复杂的业务场景。

5> 简言之:

  • 可以理解为CountDownLatch是一个计数器,线程完成一个记录一个,只不过计数不是递增而是递减。
  • 而CyclicBarrier是一个阀门,所有线程都到了 ,阀门才打开。

版权声明:本文不是「DDKK.COM 弟弟快看,程序员编程资料站」的原创文章,版权归原作者所有

原文地址:https://saint.blog.csdn.net/article/details/129029852