CountDownLatch、CycLicBarrier、Semaphore

发布时间 2023-04-24 17:53:22作者: or追梦者

减计数器

 

A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.
允许一个或多个线程等待直到在其他线程中执行的一组操作完成的同步辅助程序 (一等多场景适用

使用给定的计数初始化CountDownLatch。  由于countDown()方法的调用,直到当前计数达到零为止,await方法将阻塞。
A CountDownLatch is initialized with a given count. The await methods block until the current count reaches zero due to invocations
之后,将释放所有等待线程,并立即返回所有随后的await调用
of the countDown() method, after which all waiting threads are released and any subsequent invocations of await return
这是一种一次性的现象,计数器无法被重置。如果需要能够重置的版本,考虑使用CyclicBarrier;
immediately. This is a one-shot phenomenon -- the count cannot be reset. If you need a version that resets the count, consider using a CyclicBarrier.
来自 <https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CountDownLatch.html>

案例

六国破灭后,秦国才一统华夏
枚举类
    1 public enum CountryEnum {
    2         /**
    3      * 国家
    4      *
    5      * @description 枚举常量 序数从0开始
    6      */
    7         ONE(1, "齐国"), TWO(2, "楚国"), THREE(3, "燕国"), FOUR(4, "韩国"), FIVE(5, "赵国"), SIX(6, "魏国");
    8 
    9         private Integer code;
   10         private String message;
   11 
   12         CountryEnum(int code, String message) {
   13                 this.code = code;
   14                 this.message = message;
   15         }
   16 
   17         public Integer getCode() {
   18                 return code;
   19         }
   20 
   21         public void setCode(Integer code) {
   22                 this.code = code;
   23         }
   24 
   25         public String getMessage() {
   26                 return message;
   27         }
   28 
   29         public void setMessage(String message) {
   30                 this.message = message;
   31         }
   32 
   33         /**
   34      * 匹配索引
   35      *
   36      * @param index
   37      * @return
   38      */
   39         public static CountryEnum forEach_CountryEnum(int index) {
   40                 //类型为 enum [ONE,TWO,THREE,FOUR,FIVE,SIX]
   41                 CountryEnum[] values = CountryEnum.values();
   42                 for (CountryEnum country : values
   43                 ) {
   44                         //System.out.println(country);
   45                         if (country.getCode() == index) {
   46                                 return country;
   47                         }
   48                 }
   49                 return null;
   50         }
   51 

demo

/**
 * 减少计数锁存器用法实例
 * @author 夜神
 * @description 让一些线程阻塞直到另一些线程完成一系列操作后才被唤醒
 *                  CountDownLatch主要有两个方法,当一个或多个线程调用await方法      时,这些线程会阻塞。
 *                  其它线程调用countDown方法会将计数器减1(调用countDown方法的线程不会阻塞),
 *                  当计数器的值变为0时,因await方法阻塞的线程会被唤醒,继续执行。
 * @like  解释: 六国破灭后秦才完成了一统。
 *        main主线程必须要等前面6个线程完成全部工作后,自己才能开干
 */
    1 public class CountDownLatchDemo {
    2         public static void main(String[] args) throws InterruptedException {
    3                 CountDownLatch count = new CountDownLatch(6);
    4                 for (int i = 1; i <= 6; i++) {
    5                         new Thread(()->{
    6                                 System.out.println(Thread.currentThread().getName()+"\t"+"被灭");
    7                                 //减少计数
    8                                 count.countDown();
    9                         },Objects.requireNonNull(CountryEnum.forEach_CountryEnum(i)).getMessage()).start();
   10                 }
   11                 //等待调用countDown()的线程执行完
   12                 count.await();
   13                 System.out.println("秦灭六国,一统华夏");
   14         }
   15 }

 

循环栅栏

CyclicBarrier循环栅栏,设置公共障碍点,会阻塞达到障碍点的线程,直到都达到后执行栅栏处的操作,然后放行线程,栅栏可以重复使用
适用于线程互相等待以执行公共操作的场景

 

 

 

A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point. CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other. The barrier is called cyclic because it can be re-used after the waiting threads are released.
允许一组线程互相等待以达到公共障碍点。CyclicBarriers在涉及固定线程大小的程序时很有用,必须偶尔等待对方。
这个栅栏被称为循环栅栏是因为它在释放等待的线程后可以被重用。

  A CyclicBarrier supports an optional Runnable command that is run once per barrier point, after the last thread in the party arrives, but before any threads are released. This barrier action is useful for updating shared-state before any of the parties continue.
在最后的线程达到后,但是在任何线程被释放之前,CyclicBarrier支持的可选Runnable命令会在每个公共障碍点运行一次。
这个栅栏操作对任何一方继续之前更新共享状态是非常有用的。
来自 <https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CyclicBarrier.html>
 
 
/**
 * 循环栅栏
 * @author 夜神
 * @mean
 *  CyclicBarrier
 *  的字面意思是可循环(Cyclic)使用的屏障(Barrier)。它要做的事情是,
 *  让一组线程到达一个屏障(也可以叫同步点)时被阻塞,
 *  直到最后一个线程到达屏障时,执行一次BarrierAction操作,然后屏障才会开门,所有
 *  被屏障拦截的线程才会继续干活。
 *  线程进入屏障通过CyclicBarrier的await()方法。
 *
 * @description 集齐七颗龙珠后才召唤神龙
 */

案例

集齐七颗龙珠,然后召唤神龙

    1 public class CyclicBarriersDemo {
    2         public static void main(String[] args) {
    3                 CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{
    4                         System.out.println("集齐七颗龙珠可以召唤神龙了");
    5                 });
    6                 for (int i = 1; i <= 7 ; i++) {
    7                         new Thread(()->{
    8                                 try{
    9                                         System.out.println(Thread.currentThread().getName()+"\t星龙珠被收集");
   10                                         //公共障碍点
   11                                         cyclicBarrier.await();
   12                                         System.out.println("龙珠飞散了");
   13                                         TimeUnit.SECONDS.sleep(2);
   14                                         System.out.println(Thread.currentThread().getName()+"\t星龙珠又被收集了");
   15                                         //公共障碍点
   16                                         cyclicBarrier.await();
   17                                 }catch(Exception e){
   18                                         e.printStackTrace();
   19                                 }
   20                         },String.valueOf(i)).start();
   21                 }
   22         }
   23 

 信号量

适用于限制数量的场景 类似车位、门票

 

A counting semaphore. Conceptually, a semaphore maintains a set of permits. Each acquire() blocks if necessary until a permit is available, and then takes it. Each release() adds a permit, potentially releasing a blocking acquirer. However, no actual permit objects are used; the Semaphore just keeps a count of the number available and acts accordingly.
一个计数信号量,从概念上讲,一个Semaphore维持着一组许可证。
如果必要,每个acquire()都会阻塞,直到获得许可,然后拿到它。每个release()添加一个许可证,可能释放一个阻塞的acquirer。但是,没有用到实际的许可对象,Semaphore仅仅保持可用量的计数并采取相应措施。

Semaphores are often used to restrict the number of threads than can access some (physical or logical) resource
信号量通常用于限制线程数量,使其无法访问某些(物理或逻辑)资源

来自 <https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Semaphore.html>
 

 

Acquires a permit from this semaphore, blocking until one is available, or the thread is interrupted.
Acquires a permit, if one is available and returns immediately, reducing the number of available permits by one.
If no permit is available then the current thread becomes disabled for thread scheduling purposes and lies dormant until one of two things happens:

从这个信号量获取许可,阻塞直到有可用的信号量,或者线程被中断。

获取一个许可证(如果有)并立即返回,将可用许可证的数量减少一个。

如果没有可用的许可证,那么当前线程将被禁用线程调度目的,并处于休眠状态,直到发生以下两种情况之一:

  其他线程调用release()来释一个或多个放信号量
  • Some other thread invokes the release() method for this semaphore and the current thread is next to be assigned a permit; or
   其他线程被中断了
  • Some other thread interrupts the current thread.
If the current thread:
  • has its interrupted status set on entry to this method; or
  • is interrupted while waiting for a permit,
then InterruptedException is thrown and the current thread's interrupted status is cleared.
  Throws:
InterruptedException - if the current thread is interrupted
 

 

释放一个许可,并将其返回给信号量。

释放一个许可证,将可用许可证的数量增加一个。如果有线程试图获取许可证,则选择一个线程并给予刚刚释放的许可证。该线程被(重新)启

用以进行线程调度。

Releases a permit, returning it to the semaphore.
Releases a permit, increasing the number of available permits by one. If any threads are trying to acquire a permit, then one is selected and given the permit that was just released. That thread is (re)enabled for thread scheduling purposes.
没有要求释放了许可证的线程必须通过调用acquire()获得许可证 信号量的正确使用是由应
用程序中的编程约定建立的。
There is no requirement that a thread that releases a permit must have acquired that permit by calling acquire(). Correct usage of a semaphore is established by programming convention in the application.

来自 <https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Semaphore.html#release-->
 
/**
 * @author 夜神
 * @description 在(事先定义好)信号量上我们定义两种操作:
 * acquire(获取) 当一个线程调用acquire操作时,它要么通过成功获取信号量(信号量减1),
 * 要么一直等下去,直到有线程释放信号量,或超时。
 * release(释放)实际上会将信号量的值加1,然后唤醒等待的线程。
 * 
 * 信号量主要用于两个目的,一个是用于多个共享资源的互斥使用,另一个用于并发线程数的控制。
 */
 
    1 public class SemaphoreDemo {
    2         public static void main(String[] args) {
    3                 //三个停车位
    4                 Semaphore semaphore = new Semaphore(3);
    5                 //听六辆汽车
    6                 for (int i = 1; i <= 6; i++) {
    7                         new Thread(() -> {
    8                                 try {
    9                                         //查询可用车位数量
   10                                         System.out.println("驶入前可用车位"+semaphore.availablePermits());
   11                                         //获取车位
   12                                         semaphore.acquire();
   13                                         System.out.println(Thread.currentThread().getName() + "\t 号汽车驶入停车位");
   14                                         //停车时间
   15                                         TimeUnit.SECONDS.sleep(3);
   16                                         System.out.println(Thread.currentThread().getName() + "\t 号汽车驶出停车位");
   17                                 } catch (Exception e) {
   18                                         e.printStackTrace();
   19                                 } finally {
   20                                         //无论驶出车位还是爆炸了,释放停车位
   21                                         semaphore.release();
   22                                         //再次查询车位
   23                                         System.out.println("驶出后可用车位"+semaphore.availablePermits());
   24                                 }
   25                         }, String.valueOf(i)).start();
   26                 }
   27         }