跳到主要内容

12、RocketMQ 实战 - 消费者类型

消费者概览

Apache RocketMQ 4.x 支持 PushConsumer 、 PullConsumer 这两种类型的消费者。DefaultMQPushConsumer只需要设置MessageListener,获取消息,消息并发等都有SDK处理。DefaultMQPullConsumer需要用户自己拉取消息,并维护消费进度,同时并发消费消息都由用户控制,比较灵活。

RocketMQ 4.x还提供了另一种PullConsumer,是Lite Pull Consumer,它提供了Subscribe和Assign两种方式,使用起来更加方便。

Subscribe模式示例如下:

@Configuration
@Slf4j
public class ConsumerConfig {

    @Value("${rocketmq.namesrv}")
    private String namesrv;

    String topic = "MyTopic";

    @Bean
    public DefaultLitePullConsumer litePullConsumer() throws MQClientException, InterruptedException {
        DefaultLitePullConsumer defaultLitePullConsumer = new DefaultLitePullConsumer("my-lite-consumer");

        defaultLitePullConsumer.setNamesrvAddr(namesrv);

        defaultLitePullConsumer.subscribe(topic,"*");

        defaultLitePullConsumer.setPullBatchSize(20);

        defaultLitePullConsumer.start();

        while (true) {
            List<MessageExt> messageExts = defaultLitePullConsumer.poll(300);
            if (!CollectionUtils.isEmpty(messageExts)) {
                log. info("消费消息:{}", messageExts.stream().map(msg -> new String(msg.getBody(), StandardCharsets.UTF_8)).collect(Collectors.toList()));
            } else {
                TimeUnit.MILLISECONDS.sleep(300);
            }
        }

    }
}

首先还是初始化DefaultLitePullConsumer并设置ConsumerGroupName,调用subscribe方法订阅topic并启动。与Push Consumer不同的是,LitePullConsumer拉取消息调用的是轮询poll接口,如果能拉取到消息则返回对应的消息列表,否则返回null。通过setPullBatchSize可以设置每一次拉取的最大消息数量,此外如果不额外设置,LitePullConsumer默认是自动提交位点在subscribe模式下,同一个消费组下的多个LitePullConsumer会负载均衡消费,与PushConsumer一致

再来看Assign模式:

@Bean
public DefaultLitePullConsumer litePullConsumer() throws MQClientException, InterruptedException {
        DefaultLitePullConsumer defaultLitePullConsumer = new DefaultLitePullConsumer("my-lite-consumer");

        defaultLitePullConsumer.setNamesrvAddr(namesrv);

        defaultLitePullConsumer.setAutoCommit(false);
        defaultLitePullConsumer.start();
        Collection<MessageQueue> messageQueues = defaultLitePullConsumer.fetchMessageQueues(topic);
        List<MessageQueue> list = new ArrayList<>(messageQueues);
        List<MessageQueue> messageQueueList = new ArrayList<>(messageQueues.size() / 2);

        for (int i = 0; i < list.size() / 2; i++) {
            messageQueueList.add(list.get(i));
        }

        defaultLitePullConsumer.assign(messageQueueList);

        defaultLitePullConsumer.seek(messageQueueList.get(0), 10);
        

        while (true) {
            List<MessageExt> messageExts = defaultLitePullConsumer.poll(300);
            if (!CollectionUtils.isEmpty(messageExts)) {
                log. info("消费消息:{}", messageExts.stream().map(msg -> new String(msg.getBody(), StandardCharsets.UTF_8)).collect(Collectors.toList()));
                defaultLitePullConsumer.commitSync();
            } else {
                TimeUnit.MILLISECONDS.sleep(300);
            }
        }

    }
}

Assign模式一开始仍然是初始化DefaultLitePullConsumer,这里我们采用手动提交位点的方式,因此设置AutoCommit为false,然后启动consumer。与Subscribe模式不同的是,Assign模式下没有自动的负载均衡机制,需要用户自行指定需要拉取的队列,因此在例子中,先用fetchMessageQueues获取了Topic下的队列,再取前面的一半队列进行拉取,示例中还调用了seek方法,将第一个队列拉取的位点设置从10开始。紧接着进入循环不停地调用poll方法拉取消息,拉取到消息后调用commitSync方法手动提交位点。

接下来看Apache RocketMQ 5.0版本的消费者,并做比较。

Apache RocketMQ 支持 PushConsumer 、 SimpleConsumer 以及 PullConsumer 这三种类型的消费者。

对比项 PushConsumer SimpleConsumer PullConsumer
接口方式 使用监听器回调接口返回消费结果,消费者仅允许在监听器范围内处理消费逻辑。 业务方自行实现消息处理,并主动调用接口返回消费结果。 业务方自行按队列拉取消息,并可选择性地提交消费结果
消费并发度管理 由SDK管理消费并发度。 由业务方消费逻辑自行管理消费线程。 由业务方消费逻辑自行管理消费线程。
负载均衡粒度 5.0 SDK是消息粒度,更均衡,早期版本是队列维度 消息粒度,更均衡 队列粒度,吞吐攒批性能更好,但容易不均衡
接口灵活度 高度封装,不够灵活。 原子接口,可灵活自定义。 原子接口,可灵活自定义。
适用场景 适用于无自定义流程的业务消息开发场景。 适用于需要高度自定义业务流程的业务开发场景。 仅推荐在流处理框架场景下集成使用

PushConsumer内部原理

在PushConsumer类型中,消息的实时处理能力是基于SDK内部的典型Reactor线程模型实现的。如下图所示,SDK内置了一个长轮询线程,先将消息异步拉取到SDK内置的缓存队列中,再分别提交到消费线程中,触发监听器执行本地消费逻辑。

 

PushConsumer 消费者类型中,客户端SDK和消费逻辑的唯一边界是消费监听器接口。客户端SDK严格按照监听器的返回结果判断消息是否消费成功,并做可靠性重试。所有消息必须以同步方式进行消费处理,并在监听器接口结束时返回调用结果,不允许再做异步化分发。如果消费者分组设置了顺序消费模式,则PushConsumer在触发消费监听器时,严格遵循消息的先后顺序。业务处理逻辑无感知即可保证消息的消费顺序。如果业务逻辑自定义实现了异步分发,则Apache RocketMQ 无法保证消息的顺序性。