RocketMq发送消息之过滤消息

发布时间 2023-09-24 14:27:51作者: 自学Java笔记本

过滤消息概述

在大多数情况下,TAG是一个简单而有用的设计,其可以来选择您想要的消息。例如:

// 定义一个group1的消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
// 订阅 TOPIC 主题中的 三个TAG 类型的消息
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");

消费者将接收包含TAGA或TAGB或TAGC的消息。但是限制是一个消息只能有一个标签,这对于复杂的场景可能不起作用。在这种情况下,可以使用SQL表达式筛选消息。SQL特性可以通过发送消息时的属性来进行计算。在RocketMQ定义的语法下,可以实现一些简单的逻辑。下面是一个例子:
image

基本语法

RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。

  • 数值比较,比如:>,>=,<,<=,BETWEEN,=;
  • 字符比较,比如:=,<>,IN;
  • IS NULL 或者 IS NOT NULL;
  • 逻辑符号 AND,OR,NOT;

常量支持类型为:

  • 数值,比如:123,3.1415;
  • 字符,比如:'abc',必须用单引号包裹起来;
  • NULL,特殊的常量
  • 布尔值,TRUE 或 FALSE

只有使用push模式的消费者才能用使用SQL92标准的sql语句,接口如下:

public void subscribe(finalString topic, final MessageSelector messageSelector)

生产者代码

在生产者中通过putUserProperty设置一些属性

// 设置一些属性
msg.putUserProperty("a", String.valueOf(i));
public class Producer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = ProducerUtils.getProducer("group1");
        producer.start();
        for (int i = 0; i < 3; i++) {
            // 创建消息,并指定Topic(主题),Tag(消息Tag)和消息体(消息内容)
            Message msg = new Message("FilterTagTopic",
                    "Tag1",
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
            );
            // 设置一些属性,消费者可以通过consumer.subscribe去或者这个消息
            msg.putUserProperty("a", String.valueOf(i));
            // 发送消息到一个Broker
            SendResult sendResult = producer.send(msg);
            TimeUnit.SECONDS.sleep(1);
            // 通过sendResult返回消息是否成功送达
            System.out.printf("%s%n", sendResult);
        }
        // 如果不再发送消息,关闭Producer实例。
        producer.shutdown();
    }
}

消费者代码

消费者只有通过consumer.subscribe("TopicTest", MessageSelector.bySql("消息的匹配规则");才能获取到消息,并且要匹配sql的规则
例如:生产者通过msg.putUserProperty("a", String.valueOf(i)); 设置了消息的属性,那么a为标识,value为值,此时要获取,可以这样写MessageSelector.bySql("a between 0 and 3")这样获取的就是 值为 0-3 的消息

public class Consumer {
    public static void main(String[] args) throws MQClientException {
        // 1.创建消费者Consumer,指定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        // 2.指定NameServer地址
        consumer.setNamesrvAddr("192.168.78.129:9876;192.168.78.130:9876");
        // 3. 只有订阅的消息有这个属性a, a >=0 and a <= 3
        MessageSelector ms = MessageSelector.bySql("a between 0 3");
        // 订阅主题
        consumer.subscribe("FilterTagTopic",ms);

        // 开启消费offset
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        // 4.设置回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            // 接收消息内容的方法
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("接收到的消息是:"+new String(msg.getBody()));
                }
                // 标记该消息已经被成功消费
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 5.启动消费者consumer
        consumer.start();
    }
}

看官网的教程就是这样,但是发现启动后报错,抛出
CODE: 1 DESC: The broker does not support consumer to filter message by SQL92
查看原因是因为要在broker.conf文件中加入:enablePropertyFilter = true,由于我们采用的是2主2从,所以指定的文件在/usr/local/rocketmq/conf/2m-2s-sync/ broker-a.properties 中去修改。注意是修改多个哦。

消息过滤不仅仅只有上述方法,我们可以查看subscribe的重载方法:
image

  • subscribe(String topic, String subExpression) 就是直接加标签的这种:
    例如,生产者发送消息时,指明了多个TAG,那么消费者可以通过subExpression去获取
  // 订阅多个Tag
  consumer.subscribe("FilterTagTopic","TAG1 || TAG2 || TAG3");
  • subscribe(String topic, String fullClassName, String filterClassSource)
    这种,是自定义一个过滤器实现类的。
    image
    fullClassName:是指类的全路径 filterClassSource:是指类的java文件路径
    • 1,Broker机器启动多个FilterServer过滤进程
    • 2,Consumer启动后,会想Broker传递一个Java类
    • 3,Consumer从FilterServer拉取消息,FilterServer从Broker拉取消息,按照上传的java类进行过滤,过滤后返回给Consumer

  • 最后一种就是通过sql语句去过滤的啦