【Java 并发】【九】【AQS】【七】Semaphore信号量底层机制原理

发布时间 2023-04-07 16:52:05作者: 酷酷-

1  前言

接下来我们来看看Semaphore,也是基于之前讲解的AQS来实现的,建立在AQS体系之上的一个并发工具类。

2  Semaphore是什么

Semaphore,它是一个信号量,主要作用是用来控制并发中同一个时刻执行的线程数量,可以用来做限流器,或者流程控制器。
在创建的时候会指定好它有多少个信号量,比如Semaphre semaphore = new Semaphore(2),就只有2个信号量。
这个信号量可以比作是车道,每一个时刻每条车道只能允许一辆汽车通过,你可以理解为高速收费站上的收费口,每个收费口任意一时刻只能允许一辆汽车通行。画个图来讲解一下:

这里的收费站其实就是Semaphore,而2个收费口其实就是Semaphore中的2个信号量。我们还是写个例子,感受下:

/**
 * @author xjx
 * @description
 * @date 2023/4/7 15:00
 */
public class SemaphoreTest {
    // 创建一个有2个收费口的收费站
    private static Semaphore semaphore = new Semaphore(2);
    public static class RunThread extends Thread {
        @Override
        public void run() {
            // 这里循环100次,模拟车辆非常多,竞争激烈
            for (int i = 0; i < 100; i++) {
                doBusiness();
            }
        }
        // 这里模拟通过收费口的情况,业务操作
        private void doBusiness() {
            try {
                // 获取信号
                semaphore.acquire();
                // 模拟业务操作耗时
                Thread.sleep(2000);
                // 打印信息
                System.out.println(Thread.currentThread().getName() + "获取信号");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                // 释放信号
                semaphore.release();
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        // 创建4个线程
        RunThread runThread1 = new RunThread();
        RunThread runThread2 = new RunThread();
        RunThread runThread3 = new RunThread();
        RunThread runThread4 = new RunThread();
        // 启动线程
        runThread1.start();
        runThread2.start();
        runThread3.start();
        runThread4.start();
        // 主线程等待runThread1、2、3、4结束之后再往下运行
        runThread1.join();
        runThread2.join();
        runThread3.join();
        runThread4.join();
        System.out.println("结束");
    }
}

运行程序,你会发现每次打印只会打印2条日志,也就是时候每次最多只会有2辆车同时经过收费站。那么接下来我们就深入的研究一下实现原理。

3  Semaphore源码分析

Semaphore有两种模式,公平模式和非公平模式,分别对应两个内部类为FairSync、NonfairSync,这两个子类继承了Sync,都是基于之前讲解过的AQS来实现的。
画个图来说明一下内部的结构如下:

Semaphore的公平模式依赖于FairSync公平同步器来实现,非公平模式依赖于NonfairSync非公平同步器来实现。
其中FairSync、NonfairSync继承自Sync,而Sync又继承自AQS,这些同步器的底层都是依赖于AQS提供的机制来实现的。
这里的Semaphore实现的思路跟我们之前讲过的ReentrantLock非常的相似,包括内部类的结构都是一样的,也是有公平和非公平两种模式。只是不同的是Semaphore是共享锁,支持多个线程同时操作;然而ReentrantLock是互斥锁,同一个时刻只允许一个线程操作。接下来我们就来看看Semaphore中都有哪些东西。

3.1  Semaphore的构造方法

默认的构造方法,构造出非公平模式的:

public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}

另一个传参构造方法,传递fair参数是否构造公平的信号量:

public Semaphore(int permits, boolean fair) {
    // 如果传递fair为true,构造公平模式,否则构造非公平模式
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

3.2  公平模式信号量

我们就先来看一下公平模式的信号量,核心的acquire和release方法,先分析下公平模式的acquire方法源码如下:

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

这里公平模式,Semaphore.acquire方法源码直接是调用FairSync的acquireSharedInterruptibly,也就是进入了AQS的acquireSharedInterruptibly的模板方法里面了,之前我们就讲过了。
然后看一下acquireSharedInterruptibly方法内部:

public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

这个方法定义了一个模板流程:
(1)先调用子类的tryAcquireShared方法获取共享锁,也就是获取信号量。
(2)如果获取信号量成功,即返回值大于等于0,则直接返回。
(3)如果获取失败,返回值小于0,则调用AQS的doAcquireSharedInterruptibly方法,进入AQS的等待队列里面,等待别人释放资源之后它再去获取。

这里我们画个图理解一下:

3.2.1  FairSync的tryAcquireShared方法源码

doAcquireSharedInterruptibly方法我们之前讲解AQS的时候都讲解过了,所以现在看一下FairSync子类的tryAcquireShared方法的内部源码即可:

protected int tryAcquireShared(int acquires) {
    for (;;) {
        // 这里作为公平模式,首先判断一下AQS等待队列里面
        // 有没有人在等待获取信号量,如果有人排队了,自己就不去获取了
        if (hasQueuedPredecessors())
            return -1;
        // 获取剩余的信号量资源
        int available = getState();
        // 剩余资源减去我需要的资源,是否小于0
        // 如果小于0则说明资源不够了
        // 如果大于等于0,说明资源是足够我使用的
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

上面的源码就是获取信号量的核心流程了:
(1)首先判断一下AQS等待队列里面是否有人在排队,如果是,则自己不尝试获取资源了,乖乖的去排队
(2)如果没有人在排队,获取一下当前剩余的信号量available,然后减去自己需要的信号量acquires,得到减去后的结果remaining。
(3)如果remaining小于0,直接返回remaining,说明资源不够,获取失败了,这个时候就会进入AQS等待队列等待。
(4)如果remaining 大于等于0,则执行CAS操作compareAndSetState竞争资源,如果成功了,说明自己获取信号量成功,如果失败了同样进入AQS等待队列。

我们画一下公平模式FairSync的tryAcquireShared流程图,以及整个公平模式的acquire方法的流程图:

3.2.2  FairSync的releaseShared方法源码

看完获取,我们紧接着来看下释放,这里Semaphore的release方法直接调用Sync的releaseShared方法:

public void release() {
    sync.releaseShared(1);
}

我们继续来分析releaseShared方法,进入到AQS的releaseShard释放资源的模板方法:

public final boolean releaseShared(int arg) {
    // 1. 调用子类的tryReleaseShared释放资源
    if (tryReleaseShared(arg)) {
        // 释放资源成功,调用doReleaseShared唤醒等待队列中等待资源的线程
        doReleaseShared();
        return true;
    }
    return false;
}

这里的模板流程有:
(1)调用子类的tryReleaseShared去释放资源,即释放信号量
(2)如果释放成功了,则调用doReleaseShared唤醒AQS中等待资源的线程,将资源传播下去,如果释放失败,即返回小于等于0,则直接返回。
所以,这里除了AQS的核心模板流程之外,具体释放逻辑就是Sync的tryReleaseShared方法的源码了,我们继续来查看:

protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState();
        // 这里就是将释放的信号量资源加回去而已
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        // 尝试CAS设置资源,成功直接返回,失败则进入下一循环重试
        if (compareAndSetState(current, next))
            return true;
    }
}

这里的逻辑非常简单了,无非是不断尝试CAS将资源加回去而已。
我们再画个图来理解一下:

3.3  非公平模式信号量

非公平模式NonfairSync跟公平模式唯一的区别就是在tryAcquireShared上的实现不一样,其它的完全都是一致的,我们下面就看一下NonfairSync的tryAcquireShared方法源码:

3.3.1  NonfairSync的tryAcquireShared方法源码

protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}

这里是直接调用了Sync的nonfairTryAcquireShared方法源码,我们继续往下看:

final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();
        // 上面公平模式需要看下等待队列是否有人
        // 这里是直接去尝试获取资源,根本不管是否有人
        int remaining = available - acquires;
        if (remaining < 0 ||
            // 如果remaining剩余资源 >= 0 则执行CAS操作
            compareAndSetState(available, remaining))
            return remaining;
    }
}

这里非公平锁的源码流程大致就是:
(1)对比上面的公平模式,需要判断AQS等待队列是否有人在等待。而这里非公平模式不管有没有人在等
(2)如果剩余可用资源remaining >= 0,则直接CAS去争抢资源,成功则返回,失败则重试。

释放信号量的话跟公平模式用的一样的都是实际都是走的基类Sync的tryReleaseShared方法,上边我们已经看过了。

4  小结

到这里,Semaphore我们就看完了,包括公平模式、非公平模式,有理解不对的地方欢迎指正。