使用阿里云性能测试工具 JMeter 场景压测 RocketMQ 最佳实践

发布时间 2023-12-20 17:33:43作者: 阿里云云原生

作者:森元

需求背景

新业务上线前,我们通常需要对系统的不同中间件进行压测,找到当前配置下中间件承受流量的上限,从而确定上游链路的限流规则,保护系统不因突发流量而崩溃。阿里云 PTS 的 JMeter 压测可以支持用户上传自定义的 JMeter 脚本,按照自定义的逻辑,借助 PTS 强大的分布式压测能力,对系统的不同中间件进行压测。下面,将以 JMeter5.5 和 RocketMQ5.0 系列为例,详细介绍如何使用 PTS 的 JMeter 场景压测 RocketMQ。

前置条件

  1. 已在本地安装 JMeter。
  2. 已在阿里云 ECS 上部署 RocketMQ(本文选择的是一台 8C32G 规格的 ECS)。
  3. 已在阿里云上开通 PTS 服务。

压测过程

JMeter 提供了扩展性极强的 JavaSampler,我们可以通过继承 AbstractJavaSamplerClient 类来自定义在 JavaSampler 中执行的逻辑,从而实现对 RocketMQ 进行压测。

步骤一:创建 Maven 项目,并引入依赖

  1. 新建 Maven 工程,并在 pom 文件中引入下面的依赖:
<dependency>
  <groupId>org.apache.jmeter</groupId>
  <artifactId>ApacheJMeter_java</artifactId>
  <version>5.5</version>
  <scope>provided</scope>
</dependency>
<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-client</artifactId>
  <version>4.9.5</version>
</dependency>

ApacheJMeter_java 是 JMeter JavaSampler 的依赖,rocketmq-client 是 RocketMQ 的客户端依赖(此处用 4.x 版本是因为 4.x 版本的客户端可以兼容 5.x 版本的服务端实例,但是 5.x 版本的客户端不能兼容 4.x 版本的服务端实例,可根据自己需求调整)。其中,要注意的是 ApacheJMeter_java 依赖的 scope 定义为  provided,JMeter 的 lib/ext 目录下已有该 JAR 包,因此不必将该依赖一起打包。

  1. 在 pom 文件中引入 maven-assembly-plugin 插件,此处使用 “jar-with-dependencies” 打包方式,将项目所需依赖和项目代码打包到同一个 JAR 包,后续可以只上传该 JAR 包到 PTS 的 JMeter 环境中,不用上传多个依赖 JAR 包:
<build>
  <finalName>jmeter-rocketmq4</finalName>
  <plugins>
    <plugin>
      <artifactId>maven-assembly-plugin</artifactId>
      <version>3.4.2</version>
      <configuration>
        <!-- 打包方式 -->
        <descriptorRefs>
          <descriptorRef>jar-with-dependencies</descriptorRef>
        </descriptorRefs>
      </configuration>
      <executions>
        <execution>
          <id>make-assembly</id>
          <phase>package</phase>
          <goals>
            <goal>single</goal>
          </goals>
        </execution>
      </executions>
    </plugin>
  </plugins>
</build>

步骤二:新建 AbstractJavaSamplerClient 的子类,并重写相关方法

AbstractJavaSamplerClient 类继承了 JavaSamplerClient 接口,该接口包含 setupTest、runTest、teardownTest 和 getDefaultParameters 四个方法:

  • setupTest

    JMeter 将为测试中的每个线程创建一个 JavaSamplerClient 实现实例,测试开始时,将在每个线程的 JavaSamplerClient 实例上调用 setupTest 来初始化客户端,本例中即初始化 RocketMQ 的 producer。

  • runTest

    每个线程每次迭代会调用一次 runTest 方法,本例中,需要在 runTest 方法里面定义消息发送的方法和采样结果的设置逻辑。

  • teardownTest

    迭代完设置的次数或时间后,此方法将会被执行,本例中,需要在此方法关闭 producer。

  • getDefaultParameters

    此方法定义了参数列表,这些参数通过会 JavaSamplerContext 传递给上述方法方法,在此方法内定义的参数,可以在 JMeter JavaRequest Sampler 的 GUI 界面设置值,本例中,需要定义 RocketMQ 的 broker 地址、topic 名称、消息 key、消息内容等参数。

新建子类参考如下:

import java.nio.charset.StandardCharsets;
import org.apache.jmeter.config.Arguments;
import org.apache.jmeter.protocol.java.sampler.AbstractJavaSamplerClient;
import org.apache.jmeter.protocol.java.sampler.JavaSamplerContext;
import org.apache.jmeter.samplers.SampleResult;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

public class JavaSamplerForRocketMQ extends AbstractJavaSamplerClient {
    private DefaultMQProducer producer;
    private static final String NAME_SRV_ADDRESS = "nameSrvAddress";
    private static final String TOPIC = "topic";
    private static final String PRODUCER_GROUP = "producer group";
    private static final String MSG_BODY = "messageBody";
    private static final String MSG_KEY = "messageKey";
    private static final String MSG_TAG = "messageTag";
    private static final String ERROR_CODE = "500";

    @Override
    public void setupTest(JavaSamplerContext javaSamplerContext) {

        try {
            // 初始化producer
            producer = new DefaultMQProducer(javaSamplerContext.getParameter(PRODUCER_GROUP));
            producer.setNamesrvAddr(javaSamplerContext.getParameter(NAME_SRV_ADDRESS));
            producer.start();
        } catch (MQClientException e) {
            throw new RuntimeException(e);
        }

    }

    @Override
    public SampleResult runTest(JavaSamplerContext javaSamplerContext) {
        SampleResult sampleResult = new SampleResult();
        sampleResult.setSampleLabel("rocketmq-producer");
        // 请求开始
        sampleResult.sampleStart();
        // 普通消息发送
        Message message = new Message(
            javaSamplerContext.getParameter(TOPIC),
            javaSamplerContext.getParameter(MSG_TAG),
            javaSamplerContext.getParameter(MSG_BODY).getBytes()
        );
        try {
            // 发送消息,需要关注发送结果,并捕获失败等异常。
            SendResult sendResult = producer.send(message);
            // 设置发送请求的字节数
            sampleResult.setSentBytes(message.toString().getBytes(StandardCharsets.UTF_8).length);
            sampleResult.setDataType(SampleResult.TEXT);
            // 设置请求内容
            sampleResult.setSamplerData(message.toString());
            // 设置响应内容
            sampleResult.setResponseData(String.format("Msg Id:%s", sendResult.getMsgId()).getBytes());
            sampleResult.setSuccessful(true);
            sampleResult.setResponseCodeOK();
        } catch (MQBrokerException | InterruptedException | RemotingException | MQClientException e) {
            sampleResult.setSuccessful(false);
            sampleResult.setResponseCode(ERROR_CODE);
            sampleResult.setResponseData(String.format("Error Msg:%s", e).getBytes());
            return sampleResult;
        } finally {
            // 请求结束
            sampleResult.sampleEnd();
        }
        return sampleResult;
    }

    @Override
    public void teardownTest(JavaSamplerContext javaSamplerContext) {
        producer.shutdown();
    }

    @Override
    public Arguments getDefaultParameters() {
        Arguments arguments = new Arguments();
        arguments.addArgument(NAME_SRV_ADDRESS, "");
        arguments.addArgument(PRODUCER_GROUP, "");
        arguments.addArgument(TOPIC, "");
        arguments.addArgument(MSG_KEY, "");
        arguments.addArgument(MSG_TAG, "");
        arguments.addArgument(MSG_BODY, "");
        return arguments;
    }
}

步骤三:打包项目成 JAR 文件

通过 mvn clean package 将项目打包,在 target 目录中可见 jmeter-rocketmq4.jar 和 jmeter-rocketmq4-jar-with-dependencies.jar 两个 JAR 包,其中 jmeter-rocketmq4-jar-with-dependencies.jar 包括了所需的依赖,在后续步骤中使用此 JAR 包。

.
├── pom.xml
├── src
│   ├── main
│   │   ├── java
│   │   │   └── JavaSamplerForRocketMQ4.java
│   │   └── resources
│   └── test
│       └── java
└── target
    ├── jmeter-rocketmq4-jar-with-dependencies.jar
    ├── jmeter-rocketmq4.jar

步骤四:使用 JMeter GUI 进行脚本编写和调试

  1. 将打包好的 JAR 包和依赖的 JAR 包复制到 JMETER_HOME/lib/ext 目录下,然后执行命令 JMETER_HOME/bin/jmeter 打开 JMeter GUI。

  2. 新建线程组后添加 Java 请求取样器。

  1. 在下拉框中选择步骤二中新增的类(不一定和图片中的完全一致,按照实际的类全限定名选择),并填写下方相关参数。

  1. 为线程组添加“查看结果树”和“汇总报告”监听器,然后启动测试计划,在结果树和汇总报告中验证测试的结果是否符合预期。

  2. 保存该测试计划为 JMX 文件。

步骤五:在 PTS 创建 JMeter 场景进行压测

  1. 在 PTS 控制台创建 JMeter 环境,将步骤三中打包的 JAR 包上传到该 JMeter 环境中(更多细节请参考 JMeter 环境管理的查看、修改及创建_性能测试-阿里云帮助中心 [ 1] ):

a. 进入 PTS 控制台,选择“JMeter 环境”;

b. 输入自定义的环境名;

c. 点击上传文件,选择步骤三中打包的 JAR 包;

d. 点击保存。

  1. 在 PTS 控制台创建场景中选择“JMeter 压测”场景:

  1. 编辑“场景配置”:

a. 自定义场景名;

b. 点击上传文件,选择步骤四中保存的 JMX 文件;

c. 在“使用依赖环境?”下拉框中选择“是,使用依赖环境”;

d. 在“选择依赖环境”下拉框选择刚刚创建的 JMeter 环境。

  1. 施压配置:

小建议:由于我们是想通过压测找到 RocektMQ 能承受的最大并发请求数,因此建议选择 RPS 模式,这样可以直接衡量 RocektMQ 的承压能力。同时,考虑到公网带宽限制,应该选择阿里云 VPC 内网压测。

a. 选择压力来源为阿里云 VPC 内网,同时选择部署被压测 RocketMQ 的 ECS 所在区域;

b. 设置 ECS 的 VPC、安全组和交换机,注意 VPC 和安全组一定要和 ECS 相同,安全组中要打开响应的端口(在 ECS 控制台设置);

c. 设置压力模式为 RPS 模式;

d. 设置起始 RPS、最大 RPS 和压测时长,本文设置起始 RPS 为 90000,最大 RPS 为 110000,持续 2 分钟。

e. 指定循环一般设置为否,表示执行一次就结束,指定 IP 数会根据设置的 RPS 自动生成。

  1. 其余设置请根据需求参考 JMeter 压测_性能测试-阿里云帮助中心 [ 2]

  2. 保存配置并调试场景,确认和 RocketMQ 的连通,之后可以开始进行压测。

步骤六:查看压测报告

JMeter 的压测报告通用解读可以参考如何查看 JMeter 压测数据、采样日志及施压机性能_性能测试-阿里云帮助中心 [ 3] ,下一节将介绍如何使用 PTS 的压测报告来找到 RocketMQ 的承压能力。

报告解读

  1. 首先,查看整个压测的概览信息和指标趋势。如下图所示,报告第一栏展示了整个压测过程的请求成功率、平均 RT、平均 TPS 等指标,这些指标可以在官方文档中找到具体解释。同时,根据成功率的趋势图所示,从 18:54:05 开始,成功率逐渐波动下降,此时的 TPS 值为 9.55W,代表 18:54:05 计算的前 5 秒平均 TPS 约为 9.55W。

  1. 其次,使用压测报告中的 Prometheus 监控数据对结果进一步分析。借助阿里云 ARMS 的 Prometheus 和 Grafana 产品,PTS 的压测报告可以提供包括吞吐量、成功率和响应时长的时序图,同时,支持用户使用 PromQL 语句对数据面板进行编辑操作,灵活查询所需的数据,在本文中,我们可以将成功率和吞吐量放在一个 panel,来进一步分析。

a. 首先点击“成功率(时序)”,然后点击“Edit”,可进入成功率大盘的编辑界面,复制成功率的查询 PromQL:

sum(rate(pts_api_response_total{task_id="$task_id", code=~"200|302"}[5s]))/sum(rate(pts_api_response_total{task_id="$task_id"}[5s]))

b. 然后进入吞吐量大盘的编辑界面,使用成功率的 PromQL 替换虚拟用户数的 PromQL,并更改 Grafana 的相关配置(下图中红框),便可得到展示吞吐量和成功率的面板。

该面板展示的数据统计精度为 1 秒,可得到更精确的数据,在 18:54:05 秒时,成功率开始下降,此时 TPS 为 96561.9。

c. 为了更好的评估 RocketMQ 的性能,我们还可以统计出成功率保持 100% 的时间范围内的平均 TPS,首先找到成功率为 100% 的持续时间,下图中为 47 秒,然后将计算 TPS 的指标的时间范围改成 47s,这样每个点都代表前 47s 的平均 TPS,将鼠标移动到成功率为 100% 的最后一个时间,当前时间的 TPS 值即为成功率为 100% 时间范围内的平均 TPS,即 89357.5。

  1. 最后,为了对比不同参数的设置对 RocketMQ 性能的影响,同时验证 PTS 在 RocketMQ 压测上的可用性,我们做了一个简单的对比实验,并通过 jstat 命令来观察不同参数对垃圾回收的影响。

实验结果显示,对于当前 ECS 配置部署的 RocketMQ,适当调大堆内存可以有效提高 RocketMQ 的性能,当堆内存提高到 24g 时(此事 ECS 内存使用率达到 85.39%),性能没有显著提高;适当提高 sendMessageThreadPoolNums 的值可以提高 RocketMQ 的性能,当 sendMessageThreadPoolNums 超过 16 后,性能没有显著提高,甚至略有下降。用户可以根据实际情况,进行更详细的对比实验,来充分评估所部署的 RocketMQ 承压能力。

结束语

本文介绍了使用阿里云 PTS 的 JMeter 场景压测 RocketMQ 的详细步骤,对各环节逐一进行了说明,最后,通过对压测报告的自定义分析,展现了 PTS 强大的压测结果分析能力,借助 JMeter 和 PTS,用户可以对各类中间件进行灵活多维的分析,助力其构建起稳定健壮的系统。

最新活动&免费试用

相关链接:

[1] JMeter 环境管理的查看、修改及创建_性能测试-阿里云帮助中心

https://help.aliyun.com/document_detail/170857.html?spm=a2c4g.103173.0.0.292c20f8wnWyCV

[2] JMeter 压测_性能测试-阿里云帮助中心

https://help.aliyun.com/document_detail/97876.html?spm=a2c4g.91788.0.0.2fde6f338aHIDI

[3] 如何查看 JMeter 压测数据、采样日志及施压机性能_性能测试-阿里云帮助中心

https://help.aliyun.com/document_detail/127454.html?spm=a2c4g.94066.0.0.4a5164bepHmzWD