Java并发CyclicBarrier

CyclicBarrier能阻塞一组线程直到某个事件的发生。栅栏与闭锁的关键区别在于,所有的线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待事件,而栅栏用于等待其他线程。

构造器:

CyclicBarrier(int parties)Creates a new CyclicBarrier that will trip when the given number of parties (threads) are waiting upon it, and does not perform a predefined action when the barrier is tripped.
CyclicBarrier(int parties, Runnable barrierAction)Creates a new CyclicBarrier that will trip when the given number of parties (threads) are waiting upon it, and which will execute the given barrier action when the barrier is tripped, performed by the last thread entering the barrier.

方法:

intawait()Waits until all parties have invoked await on this barrier.
intawait(long timeout, TimeUnit unit)Waits until all parties have invoked await on this barrier, or the specified waiting time elapses.
intgetNumberWaiting()Returns the number of parties currently waiting at the barrier.
intgetParties()Returns the number of parties required to trip this barrier.
booleanisBroken()Queries if this barrier is in a broken state.
voidreset()Resets the barrier to its initial state.

demo代码:

import lombok.SneakyThrows;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierTest {

    class Worker implements Runnable {
        private CyclicBarrier cyclicBarrier;

        public Worker(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }

        @SneakyThrows
        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName()+"开始等待其它线程...");
            System.out.println("共需等待线程数:" +cyclicBarrier.getParties() + ",当前等待线程数:" + cyclicBarrier.getNumberWaiting());
            cyclicBarrier.await();
            System.out.println(Thread.currentThread().getName()+"开始线程开始执行业务...");
            Thread.sleep(1000);
        }
    }

    public static void main(String[] args) throws InterruptedException {
        CyclicBarrierTest cbt = new CyclicBarrierTest();
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
        List<Thread> ts = new ArrayList<>();
        for(int i=0; i<5; i++) {
            Thread thread = new Thread(cbt.new Worker(cyclicBarrier));
            ts.add(thread);
            thread.start();
        }
        for (Thread t : ts) {
            t.join();
        }

        System.out.println("--------------------------------------------------------------");

        //定义最后达到的线程执行的任务
        Runnable r = ()-> System.out.println(Thread.currentThread().getName()+"开始线程开始执行业务公共业务...");
        //定义带有附加执行任务的CyclicBarrier
        CyclicBarrier cyclicBarrier2 = new CyclicBarrier(5, r);
        List<Thread> ts2 = new ArrayList<>();
        for(int i=0; i<5; i++) {
            Thread thread = new Thread(cbt.new Worker(cyclicBarrier2));
            ts.add(thread);
            thread.start();
        }
        for (Thread t : ts2) {
            t.join();
        }
    }
}

输出:

Thread-0开始等待其它线程...
共需等待线程数:5,当前等待线程数:0
Thread-1开始等待其它线程...
共需等待线程数:5,当前等待线程数:1
Thread-4开始等待其它线程...
共需等待线程数:5,当前等待线程数:2
Thread-3开始等待其它线程...
Thread-2开始等待其它线程...
共需等待线程数:5,当前等待线程数:3
共需等待线程数:5,当前等待线程数:3
Thread-3开始线程开始执行业务...
Thread-0开始线程开始执行业务...
Thread-1开始线程开始执行业务...
Thread-2开始线程开始执行业务...
Thread-4开始线程开始执行业务...
--------------------------------------------------------------
Thread-5开始等待其它线程...
Thread-6开始等待其它线程...
共需等待线程数:5,当前等待线程数:0
共需等待线程数:5,当前等待线程数:0
Thread-7开始等待其它线程...
共需等待线程数:5,当前等待线程数:2
Thread-8开始等待其它线程...
共需等待线程数:5,当前等待线程数:3
Thread-9开始等待其它线程...
共需等待线程数:5,当前等待线程数:4
Thread-9开始线程开始执行业务公共业务...
Thread-9开始线程开始执行业务...
Thread-6开始线程开始执行业务...
Thread-8开始线程开始执行业务...
Thread-5开始线程开始执行业务...
Thread-7开始线程开始执行业务...

CyclicBarrier和CountDownLatch的区别

CyclicBarrier可以使一定数量的线程反复地在栅栏位置处汇集。当线程到达栅栏位置时将调用await方法,这个方法将阻塞直到所有线程都到达栅栏位置。如果所有线程都到达栅栏位置,那么栅栏将打开,此时所有的线程都将被释放,而栅栏将被重置以便下次使用。

CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置,可以使用多次,所以CyclicBarrier能够处理更为复杂的场景。

CyclicBarrier还提供了一些其他有用的方法,比如getNumberWaiting()方法可以获得CyclicBarrier阻塞的线程数量,isBroken()方法用来了解阻塞的线程是否被中断。