Rocketmq-背景

发布时间 2023-07-13 23:31:26作者: 羊37

背景

消息队列(Message Queue,MQ)的三大特性:异步、解耦、削峰

在很多场景下,我们的业务是没有那么强的实时性的。

比如登录成功后发送短信,我们不必在登录的逻辑里实时的去调用发送短信,而是可以先“记下来”我们等下需要发送短信,然后快速的给出响应。

串行的登录+发送短信逻辑,会导致必须做完所有操作用户才能收到响应,体验十分不好。

消息队列诞生的背景之一,就是为了解决这个的问题。

1 MQ的特性

1.1 异步

概念

异步就类似于上面的例子,为了体会深刻一点,我一般都习惯用夸张的手法描述。

假设我们需要做的逻辑是登录、发短信、发微信、发邮件,每个耗时1s。

同步的场景下,用户需要等待所有接口执行完成,等待4s。

image-20230713204819497

现在呢,你也知道,核心功能在于登录,微信通知能不能立马发给他,好像也没有那么重要。

我们只要确保,还记得要做发短信这个事情就好了,所以,这个流程可以改成异步。

image-20230713205316109

现在我们先不管MQ后面的逻辑,来看下整个耗时,已经变成1.1s了。

所以说,关键点在于这个事情有没有必要同步的做,没必要的时候,异步可以很好的优化整个流程。

示例

用Java代码演示下,直接来4个类,每个类1个方法,sleep模拟耗时1s。

package cn.yang37.queue;

import lombok.SneakyThrows;

/**
 * @description:
 * @class: A
 * @author: yang37z@qq.com
 * @date: 2023/7/13 20:56
 * @version: 1.0
 */
public class RunDemo {

    static class A {
        @SneakyThrows
        public static String doA() {
            Thread.sleep(1000);
            return "ok";
        }
    }

    static class B {
        @SneakyThrows
        public static String doB() {
            Thread.sleep(1000);
            return "ok";
        }
    }

    static class C {
        @SneakyThrows
        public static String doC() {
            Thread.sleep(1000);
            return "ok";
        }

    }

    static class D {
        @SneakyThrows
        public static String doD() {
            Thread.sleep(1000);
            return "ok";
        }
    }
}

现在,我们依次执行这几个方法,记录下耗时。

    @Test
    void name1() {
        long start = System.currentTimeMillis();

        String doA = RunDemo.A.doA();
        String doB = RunDemo.B.doB();
        String doC = RunDemo.C.doC();
        String doD = RunDemo.D.doD();

        log.info("doA: {}", doA);
        log.info("doB: {}", doB);
        log.info("doC: {}", doC);
        log.info("doD: {}", doD);

        long end = System.currentTimeMillis();

        log.info("执行耗时: {}", end - start);
    }

程序输出

2023-07-13 21:15:21.569 -- [main] INFO  cn.yang37.queue.RunDemoTest.name1 - doA: ok
2023-07-13 21:15:21.570 -- [main] INFO  cn.yang37.queue.RunDemoTest.name1 - doB: ok
2023-07-13 21:15:21.570 -- [main] INFO  cn.yang37.queue.RunDemoTest.name1 - doC: ok
2023-07-13 21:15:21.571 -- [main] INFO  cn.yang37.queue.RunDemoTest.name1 - doD: ok
2023-07-13 21:15:21.571 -- [main] INFO  cn.yang37.queue.RunDemoTest.name1 - 执行耗时: 4034

嗯,每个方法耗时1s,总体耗时超过4s。

现在,我们只做A,异步的去操作B和C。

    @Test
    void name2() throws Exception {
        long start = System.currentTimeMillis();
        // 操作A
        String doA = RunDemo.A.doA();
        log.info("doA: {}", doA);

        // 等待A执行完成后,BCD不同步执行,异步处理.
        CompletableFuture<String> futureB = CompletableFuture.supplyAsync(RunDemo.B::doB);
        CompletableFuture<String> futureC = CompletableFuture.supplyAsync(RunDemo.C::doC);
        CompletableFuture<String> futureD = CompletableFuture.supplyAsync(RunDemo.D::doD);

        // futureB/C/D都执行完成后,通知combinedFuture对象
        CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(futureB, futureC, futureD);

        // combinedFuture等待并获取执行结果
        combinedFuture.thenRun(() -> {
            try {
                String resultB = futureB.get();
                String resultC = futureC.get();
                String resultD = futureD.get();
                log.info("doB: {}", resultB);
                log.info("doC: {}", resultC);
                log.info("doD: {}", resultD);
            } catch (Exception e) {
                log.error("执行出错!", e);
            }
        });

        // 阻塞等待所有异步任务完成
        combinedFuture.get();

        long end = System.currentTimeMillis();

        log.info("执行耗时: {}", end - start);
    }

程序输出

2023-07-13 21:22:46.081 -- [main] INFO  cn.yang37.queue.RunDemoTest.name2 - doA: ok
2023-07-13 21:22:47.100 -- [ForkJoinPool.commonPool-worker-2] INFO  cn.yang37.queue.RunDemoTest.lambda$name2$0 - doB: ok
2023-07-13 21:22:47.100 -- [ForkJoinPool.commonPool-worker-2] INFO  cn.yang37.queue.RunDemoTest.lambda$name2$0 - doC: ok
2023-07-13 21:22:47.100 -- [ForkJoinPool.commonPool-worker-2] INFO  cn.yang37.queue.RunDemoTest.lambda$name2$0 - doD: ok
2023-07-13 21:22:47.100 -- [main] INFO  cn.yang37.queue.RunDemoTest.name2 - 执行耗时: 2027

你看,同样都能输出B、C、D,我们只是改成了异步操作,耗时已经来到2s了。

对比之前4s的耗时,时间节省了大把。

1.2 解耦

解耦怎么理解呢,比如说,刚才还是执行A、B、C、D这4个方法,现在的逻辑是。

A、B、C、D这4个操作都需要执行下。

假设某天,我们的场景变成了邮件通知不需要发送,哪怕还是刚才优化后的异步代码,类比不执行C操作,也会有一个问题。

我得去掉调用这个方法的代码。

  		// 等待A执行完成后,BCD不同步执行,异步处理.
        CompletableFuture<String> futureB = CompletableFuture.supplyAsync(RunDemo.B::doB);
        CompletableFuture<String> futureC = CompletableFuture.supplyAsync(RunDemo.C::doC);
        CompletableFuture<String> futureD = CompletableFuture.supplyAsync(RunDemo.D::doD);

很简单,不调用就不执行了呗。

    	// 等待A执行完成后,BCD不同步执行,异步处理.
        CompletableFuture<String> futureB = CompletableFuture.supplyAsync(RunDemo.B::doB);
        CompletableFuture<String> futureD = CompletableFuture.supplyAsync(RunDemo.D::doD);

哇靠,随便去掉下,就又来两个报错。

image-20230713212821413

你看,只是这么个简单的操作,就要修改这么多代码。而且,实际开发中,肯定比这复杂的多。

最关键的是你改完代码还要发布个版本重启下服务。

又或者发短信的方法执行的时候,我想让他再加个发送登录时间啥的,能不能让他执行的时候帮我改下呀。

这个时候问题就出现了,就是异步操作和我们关心的主要业务逻辑耦合在了一起。

就像上面那个地方,你就在想,要是我可以动态的更改帮我异步操作的那个东西就行了。

通过MQ,可以将主要业务逻辑不太重要的异步操作逻辑拆开,达到解耦的效果。

就比如不执行D操作,我可以告诉MQ,你别执行xx操作了,这样也实现了去掉的效果。

怎么实现的暂时不是重点,关键是,要不要执行xx操作,怎么执行xx操作,这件事情已经和你的主要逻辑分开了。

1.3 削峰

削峰比较好理解,以上面的例子为例,登录+发送短信邮件那些通知,不管怎么样,发通知执行的时候总是要耗费服务器资源的。

夸张一下,执行进来一千万个请求,如果让你同时执行,是不是开始担心了,直接登录异常了。

为了不影响我们登录的操作,可以暂时将这些不重要的操作先存到MQ里面,后续由实际的消费服务来执行,缓解我们核心程序的压力。

2 队列和主题模型

为啥要叫消息队列?

2.1 队列

很废话,这玩意不就是存放消息的队列吗,不叫队列叫什么?

嗯,关键字来了,什么是队列?

队列就是我们说的那个队列,跟日常生活中的队列一样的。

食堂排队吃饭,这是个队列。

景区门点检票,这也是个队列。

那么队列最核心的概念是什么?

先进先出,FIFO(First-In-First-Out,FIFO)。

即最先进入队列的元素将首先被处理,而最后进入队列的元素将被保留在队列末尾等待处理。

MQ做的事情很专注,就是告诉某某某需要做啥事了。

还是刚才那个场景。

image-20230713215823627

现在,我们把后面3个发通知的操作告诉队列,它就会帮我去依次的去找人处理。

image-20230713220002623


现在换个场景,买票,一次只能买一张,假设我们都是通知小黄帮我们去买票,买一张的时候,一个队列就够用了。

image-20230713220715506

哇靠,可是我是两个人,我想要两张票,我准备找小李也帮我买下。

这个时候,你就发现,好像得变成这样。

image-20230713220847267

哇靠,只有一个队列,好像不能同时处理。没办法呀,得排队,通知完黄哥再通知我李哥。

问题就来了,我就想快点操作,就想同时通知他们咋办。

问题的关键点就是:怎么将同一个消息,同时通知给多个人。

嗯,很简单嘛,我再搞一个队列进来。

image-20230713221049579

哈哈,是不是很easy,一个黄哥专属队列,一个李哥专属队列,这不就能一下子通知两个人了?

这个时候就带来了一个成本的问题。问题确实解决了,但是你发现没有,那个绿色的正方形框框被复制了一个在上面的李哥队列,假设我要买它个几十张呢。

现在,可以整理出来问题点了。

  • 需要创建多个队列,复制多份消息。

    是会很影响资源和性能的。

  • 生产者需要知道具体消费者个数,然后去创建队列和复制消息。

    我本人得知道有几个人在帮忙,黄哥李哥两个人我就搞2个队列,再来个张哥我就搞3个队列。

我明明就是找别人帮忙买个票,咋还要关心这么多破事啊我靠。

不解耦,都自己搞,就是这么烦恼。

可不可以不管上面这些破事,我就在那里吼一声"帮忙买票,给两倍的米!",然后就行了啊?

当然可以,不然后面这么多东西怎么写。

这个时候就需要引出一个主题模型,也叫作发布-订阅模型

2.2 观察者模式

我先给你手搓一个设计模式里的观察者模式。为啥突然要手搓一个,主要是我要顺带复习下。

你敢相信这玩意一年前我研究过?我现在看代码人都是懵的。

image-20230713222325530

好,先搞一个观察者模式,实现上面这个功能,逻辑很简单,就是这样。

image-20230713223029879

根据这个背景来编码一下,项目结构是这样的。

image-20230713230558875

首先定义一个观察者接口,因为我们知道,每个观察者,肯定要能接收到通知和知道价格。

package cn.yang37.design.patterns24.observe.practice.p1;

/**
 * @description: 观察者-股民
 * @class: StockObserver
 * @author: yang37z@qq.com
 * @date: 2022/3/8 23:26
 * @version: 1.0
 */
public interface StockObserver {

    /**
     * 股价波动通知
     * @param msg 通知信息
     */
    void obtainStockChangeMsg(String msg);
    
    /**
     *  获得股价
     * @param stockPrice 股价
     */
    void obtainStockPrice(float stockPrice);
    
}

股民来个小王和小宋,实现下StockObserver接口。

package cn.yang37.design.patterns24.observe.practice.p1;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @description: 股民
 * @class: StockInvestor
 * @author: yang37z@qq.com
 * @date: 2022/3/8 23:49
 * @version: 1.0
 */
public class StockInvestor implements StockObserver {

    private static final Logger log = LoggerFactory.getLogger(StockInvestor.class);

    @Override
    public void obtainStockChangeMsg(String msg) {
        log.info("当前对象: {},通知信息: {}", this.getClass().getSimpleName(), msg);

    }

    @Override
    public void obtainStockPrice(float stockPrice) {
        log.info("当前对象: {},当前股价: {}", this.getClass().getSimpleName(), stockPrice);
    }

}

小王和小宋继承下这个StockInvestor类,就默认有这两个方法了。

package cn.yang37.design.patterns24.observe.practice.p1.stock;

import cn.yang37.design.patterns24.observe.practice.p1.StockInvestor;

/**
 * @description: 股民A
 * @class: Wang
 * @author: yang37z@qq.com
 * @date: 2022/3/8 23:48
 * @version: 1.0
 */
public class Wang extends StockInvestor {
}
package cn.yang37.design.patterns24.observe.practice.p1.stock;

import cn.yang37.design.patterns24.observe.practice.p1.StockInvestor;

/**
 * @description: 股民宋
 * @class: Song
 * @author: yang37z@qq.com
 * @date: 2022/3/8 23:48
 * @version: 1.0
 */
public class Song extends StockInvestor {
}

股民部分就搞定了。


现在再来搞一个主题(消息),抽象一手,应该具备以下几个功能,增删和通知。

package cn.yang37.design.patterns24.observe.practice.p1;

/**
 * @description:
 * @class: StockSubject
 * @author: yang37z@qq.com
 * @date: 2022/3/8 23:19
 * @version: 1.0
 */
public interface StockSubject {
    /**
     * 注册
     *
     * @param observe 股民对象
     */
    void registerObserver(StockObserver observe);

    /**
     * 移除
     *
     * @param observe 股民对象
     */
    void removeObserver(StockObserver observe);

    /**
     * 通知: 状态改变时,通知所有观察者.
     */
    void notifyObserver();

}

然后做一下这个类的具体实现,搞一个股价变动的实现类(假设有个别的场景,比如钱亏完了的通知,你还是可以实现这个类,核心的方法就这几个)。

这里面的代码就是具体的实现操作了。

package cn.yang37.design.patterns24.observe.practice.p1;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Vector;

/**
 * @description: 股价改变
 * @class: PriceChangeSubject
 * @author: yang37z@qq.com
 * @date: 2022/3/8 23:28
 * @version: 1.0
 */
public class StockSubject_PriceChange implements StockSubject {

    private static final Logger log = LoggerFactory.getLogger(StockSubject_PriceChange.class);

    /**
     * 当前股价
     */
    private float nowPrice = 2.0f;

    /**
     * 波动标志
     */
    private boolean priceChange = false;

    /**
     * 通知信息
     */
    private String msg = "[股价变动较大通知] ";

    /**
     * 维护一份观察者对象
     */
    private final Vector<StockObserver> stockObserverVector = new Vector<>();

    @Override
    public void registerObserver(StockObserver observe) {
        stockObserverVector.add(observe);
    }

    @Override
    public void removeObserver(StockObserver observe) {
        stockObserverVector.remove(observe);
    }

    @Override
    public void notifyObserver() {
        stockObserverVector.forEach(observer -> {
            // 通知当前价格
            observer.obtainStockPrice(nowPrice);

            // 股价波动标志
            if (priceChange) {
                // 通知波动信息
                observer.obtainStockChangeMsg(msg);
            }
        });
        // 恢复标志位
        priceChange = false;
    }

    /**
     * 股价改变
     *
     * @param newPricce 新价格
     */
    public void stockPriceChange(float newPricce) {
        // 波动情况
        float differencePrice = newPricce - nowPrice;
        float absDifferencePrice = Math.abs(differencePrice);
        float diff = absDifferencePrice / nowPrice;

        log.info("原股价 -> 新股价:{} -> {},差值: {},差值/原股价: {}", nowPrice, newPricce, differencePrice, diff);
        log.info("差值: {},差值/原股价: {}\n", differencePrice, diff);

        // 波动
        if (diff >= 0.05) {
            priceChange = true;
            msg = msg + diff;
        }

        // 更新当前股价信息
        nowPrice = newPricce;

        // 通知所有股民
        notifyObserver();
    }
}

image-20230713225038724

增加和删除订阅者信息没啥好说的,集合stockObserverVector做下增删就行了,看下这个notifyObserver()方法,用处就是通知所有的订阅方。

    @Override
    public void notifyObserver() {
        stockObserverVector.forEach(observer -> {
            // 通知当前价格
            observer.obtainStockPrice(nowPrice);

            // 股价波动标志
            if (priceChange) {
                // 通知波动信息
                observer.obtainStockChangeMsg(msg);
            }
        });
        // 恢复标志位
        priceChange = false;
    }

这个通知方法,其实就是for循环了一手,取出来内部的每个订阅者信息,都通知下他们当前的价格obtainStockPrice(),发现变动差距超过xx后,就就额外通知下价格已经变动了,反正就是发通知呗。

stockPriceChange()方法使我们测试用的,传入一个新的价格,触发下通知。

/**
 * 股价改变
 *
 * @param newPricce 新价格
 */
public void stockPriceChange(float newPricce) {
    // 波动情况
    float differencePrice = newPricce - nowPrice;
    float absDifferencePrice = Math.abs(differencePrice);
    float diff = absDifferencePrice / nowPrice;

    log.info("原股价 -> 新股价:{} -> {},差值: {},差值/原股价: {}", nowPrice, newPricce, differencePrice, diff);
    log.info("差值: {},差值/原股价: {}\n", differencePrice, diff);

    // 波动
    if (diff >= 0.05) {
        priceChange = true;
        msg = msg + diff;
    }

    // 更新当前股价信息
    nowPrice = newPricce;

    // 通知所有股民
    notifyObserver();
}

测试代码

    @Test
    void name1() {

        StockSubject_PriceChange stockSubject_priceChange = new StockSubject_PriceChange();
        // 添加观察者
        stockSubject_priceChange.registerObserver(new Wang());
        stockSubject_priceChange.registerObserver(new Song());

        // 模拟股价变动
        stockSubject_priceChange.stockPriceChange(2.0f);

    }

运行结果

2023-07-13 22:59:36.546 -- [main] INFO  cn.yang37.design.patterns24.observe.practice.p1.StockSubject_PriceChange.stockPriceChange - 原股价 -> 新股价:2.0 -> 2.0,差值: 0.0,差值/原股价: 0.0
2023-07-13 22:59:36.547 -- [main] INFO  cn.yang37.design.patterns24.observe.practice.p1.StockSubject_PriceChange.stockPriceChange - 差值: 0.0,差值/原股价: 0.0

2023-07-13 22:59:36.547 -- [main] INFO  cn.yang37.design.patterns24.observe.practice.p1.StockInvestor.obtainStockPrice - 当前对象: Wang,当前股价: 2.0
2023-07-13 22:59:36.548 -- [main] INFO  cn.yang37.design.patterns24.observe.practice.p1.StockInvestor.obtainStockPrice - 当前对象: Song,当前股价: 2.0

可以看到,新股价传入2.0时,没有变动,大家只是收到了当前股价的通知。

		// 模拟股价变动
        stockSubject_priceChange.stockPriceChange(2.5f);

运行结果

2023-07-13 23:02:27.181 -- [main] INFO  cn.yang37.design.patterns24.observe.practice.p1.StockSubject_PriceChange.stockPriceChange - 原股价 -> 新股价:2.0 -> 2.5,差值: 0.5,差值/原股价: 0.25
2023-07-13 23:02:27.182 -- [main] INFO  cn.yang37.design.patterns24.observe.practice.p1.StockSubject_PriceChange.stockPriceChange - 差值: 0.5,差值/原股价: 0.25

2023-07-13 23:02:27.183 -- [main] INFO  cn.yang37.design.patterns24.observe.practice.p1.StockInvestor.obtainStockPrice - 当前对象: Wang,当前股价: 2.5
2023-07-13 23:02:27.183 -- [main] INFO  cn.yang37.design.patterns24.observe.practice.p1.StockInvestor.obtainStockChangeMsg - 当前对象: Wang,通知信息: [股价变动较大通知] 0.25
2023-07-13 23:02:27.183 -- [main] INFO  cn.yang37.design.patterns24.observe.practice.p1.StockInvestor.obtainStockPrice - 当前对象: Song,当前股价: 2.5
2023-07-13 23:02:27.183 -- [main] INFO  cn.yang37.design.patterns24.observe.practice.p1.StockInvestor.obtainStockChangeMsg - 当前对象: Song,通知信息: [股价变动较大通知] 0.25

修改成2.5f后,股价变动0.25超过0.05,所有人额外还会收到一条价格变动过大的通知。


通过上面的逻辑,主要实现了下面的功能。

  • 新用户主要通过registerObserver(StockObserver observe) 方法添加进来,就能收到我们的通知。
  • 股价变动的时候stockPriceChange(float newPricce),只管调用通知就行notifyObserver(),不必关注有哪些人都在订阅我的信息。

那么回到上面找小黄小李买票的例子,好像,似乎,也许,已经变得简单了?

我只管在那里吼一声"帮忙买票,给两倍的米!"。

你说那你不也是for循环搞出来的,你先别管MQ是咋实现的,问题的关键点在于。

对于我们本身,现在不需要管有几个订阅方,只管吼一声就行了。

image-20230713231710816

2.3 主题模型

主题模型 也可以称为 发布-订阅模型

在主题模型中,消息的生产者称为 发布者(Publisher) ,消息的消费者称为 订阅者(Subscriber) ,存放消息的容器称为 主题(Topic)

image-20230713231929065