跳到主要内容

23、RocketMQ 实战 - 应用之消息过滤

消息者在进行消息订阅时,除了可以指定要订阅消息的Topic外,还可以对指定Topic中的消息根据指定条件进行过滤,即可以订阅比Topic更加细粒度的消息类型。
对于指定Topic消息的过滤有两种过滤方式:Tag过滤与SQL过滤。

1 Tag过滤

通过consumer的subscribe()方法指定要订阅消息的Tag。如果订阅多个Tag的消息,Tag间使用或运算符(双竖线||)连接。

DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");

2 SQL过滤

SQL过滤是一种通过特定表达式对事先埋入到消息中的用户属性进行筛选过滤的方式。通过SQL过滤,可以实现对消息的复杂过滤。不过,只有使用PUSH模式的消费者才能使用SQL过滤。

SQL过滤表达式中支持多种常量类型与运算符。

支持的常量类型:

  • 数值:比如:123,3.1415
  • 字符:必须用单引号包裹起来,比如:‘abc’
  • 布尔:TRUE 或 FALSE
  • NULL:特殊的常量,表示空

支持的运算符有:

  • 数值比较:>,>=,<,<=,BETWEEN,=
  • 字符比较:=,<>,IN
  • 逻辑运算 :AND,OR,NOT
  • NULL判断:IS NULL 或者 IS NOT NULL

默认情况下Broker没有开启消息的SQL过滤功能,需要在Broker加载的配置文件中添加如下属性,以开启该功能:

enablePropertyFilter 1 = true

在启动Broker时需要指定这个修改过的配置文件。例如对于单机Broker的启动,其修改的配置文件是conf/broker.conf,启动时使用如下命令:

sh bin/mqbroker -n localhost:9876 -c conf/broker.conf &

3 代码举例

定义Tag过滤Producer

public class FilterByTagProducer {
   
     
public static void main(String[] args) throws Exception {
   
     
DefaultMQProducer producer = new DefaultMQProducer("pg");
producer.setNamesrvAddr("rocketmqOS:9876");
producer.start();
String[] tags = {
   
     "myTagA","myTagB","myTagC"};
for (int i = 0; i < 10; i++) {
   
     
byte[] body = ("Hi," + i).getBytes();
String tag = tags[i%tags.length];
Message msg = new Message("myTopic",tag,body);
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}
producer.shutdown();
}
}

定义Tag过滤Consumer

public class FilterByTagConsumer {
   
     
public static void main(String[] args) throws Exception {
   
     
DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("pg");
consumer.setNamesrvAddr("rocketmqOS:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET
);
consumer.subscribe("myTopic", "myTagA || myTagB");
consumer.registerMessageListener(new
MessageListenerConcurrently() {
   
     
@Override
public ConsumeConcurrentlyStatus
consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
   
     
for (MessageExt me:msgs){
   
     
System.out.println(me);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started");
}
}

定义SQL过滤Producer

public class FilterBySQLProducer {
   
     
public static void main(String[] args) throws Exception {
   
     
DefaultMQProducer producer = new DefaultMQProducer("pg");
producer.setNamesrvAddr("rocketmqOS:9876");
producer.start();
for (int i = 0; i < 10; i++) {
   
     
try {
   
     
byte[] body = ("Hi," + i).getBytes();
Message msg = new Message("myTopic", "myTag", body);
msg.putUserProperty("age", i + "");
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
} catch (Exception e) {
   
     
e.printStackTrace();
}
}
producer.shutdown();
}
}

定义SQL过滤Consumer

public class FilterBySQLConsumer {
   
     
public static void main(String[] args) throws Exception {
   
     
DefaultMQPushConsumer consumer = new
DefaultMQPushConsumer("pg");
consumer.setNamesrvAddr("rocketmqOS:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET
);
consumer.subscribe("myTopic", MessageSelector.bySql("age between
0 and 6"));
consumer.registerMessageListener(new
MessageListenerConcurrently() {
   
     
@Override
public ConsumeConcurrentlyStatus
consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext
context) {
   
     
for (MessageExt me:msgs){
   
     
System.out.println(me);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started");
}
}