跳到主要内容

13、SpringBoot 整合 ActiveMQ 延时消息、死信队列

1、引入 ActiveMQ 的依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

2、application.yml 文件配置

spring:
  activemq:
    # 连接地址
    broker-url: failover:tcp://192.168.237.23:61616
    user: admin
    password: admin
    queue-name: active.queue.name
    #true表示使用连接池;false时,每发送一条数据创建一个连接
    pool:
      enabled: true
  jms:
    template:
      delivery-mode: persistent

spring.activemq.queue-name 是我自定义的,待会在配置队列名称时可以用到。

3、SpringBoot 的启动类配置

在SpringBoot 的启动类上加上一个 @EnableJms 注解

4、ActiveMQ 连接相关配置

新建一个 BeanConfig 类用来配置 ActiveMQ 连接相关配置。在工厂中设置开启手动消息确认模式。注意:ActiveMQ 默认是开启事务的,且在事务开启的时候消息为自动确认模式,就算是设置了手动确认也无效,因此想要开启手动确认消息模式,还需关掉事务。

代码如下:

package com.caihao.activemqdemo.config;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.config.SimpleJmsListenerContainerFactory;
import org.springframework.jms.core.JmsMessagingTemplate;

import javax.jms.ConnectionFactory;
import javax.jms.Queue;
import javax.jms.Topic;

@Configuration
public class BeanConfig {
   
     

    @Value("${spring.activemq.broker-url}")
    private String brokerUrl;

    @Value("${spring.activemq.user}")
    private String username;

    @Value("${spring.activemq.password}")
    private String password;

    @Bean(name = "queue")
    public Queue queue() {
   
     
        return new ActiveMQQueue("queue-test");
    }

    @Bean(name = "delayQueue")
    public Queue delayQueue() {
   
     
        return new ActiveMQQueue("delay-queue-test");
    }

    @Bean
    public Topic topic() {
   
     
        return new ActiveMQTopic("topic-test");
    }

    @Bean
    public ConnectionFactory connectionFactory() {
   
     
        return new ActiveMQConnectionFactory(username, password, brokerUrl);
    }

    @Bean
    public JmsMessagingTemplate jmsMessageTemplate() {
   
     
        return new JmsMessagingTemplate(connectionFactory());
    }

    // 在Queue模式中,对消息的监听需要对containerFactory进行配置
    @Bean("queueListener")
    public JmsListenerContainerFactory<?> queueJmsListenerContainerFactory(ConnectionFactory connectionFactory) {
   
     
        SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory();
        // 关闭事务
        factory.setSessionTransacted(false);
        // 设置手动确认,默认配置中Session是开启了事物的,即使我们设置了手动Ack也是无效的
        factory.setSessionAcknowledgeMode(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
        factory.setPubSubDomain(false);
        factory.setConnectionFactory(connectionFactory);
        return factory;
    }

    //在Topic模式中,对消息的监听需要对containerFactory进行配置
    @Bean("topicListener")
    public JmsListenerContainerFactory<?> topicJmsListenerContainerFactory(ConnectionFactory connectionFactory) {
   
     
        SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setPubSubDomain(true);
        return factory;
    }
}

5、队列消息手动确认实现

5.1、消息生产者

消息生产者代码比较少,主要是调用 jmsMessagingTemplateconvertAndSend() 方法。发送的消息如果是对象的话,可以将对象转成 json 串传输。(注意:在上面配置中关掉事务和设置手动确认)

package com.caihao.activemqdemo.producer;

import com.caihao.activemqdemo.entity.Student;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.ScheduledMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.jms.Queue;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

@RestController
@Slf4j
public class ProducerController {
   
     

    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;

    @Autowired
    @Qualifier("queue")// 因为我配置中还有个延时队列,所以采用通过bean的名称方式注入
    private Queue queue;

    /**
     * 发送普通消息队列
     */
    @GetMapping("/queue")
    public String sendQueue() throws JsonProcessingException {
   
     
        Student student = new Student(1, "张三", new Date());
        ObjectMapper objectMapper = new ObjectMapper();
        String msg = objectMapper.writeValueAsString(student);
        log.info("发送-开始");
        jmsMessagingTemplate.convertAndSend(queue, msg);
        log.info("发送-结束");
        return "send queue success";
    }
}

其中Student 类如下:

package com.caihao.activemqdemo.entity;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;
import java.util.Date;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class Student implements Serializable {
   
     
    private static final long serialVersionUID = 1;

    private Integer id;
    private String name;
    private Date birthDay;

}

5.2、消息消费者

这里我定义了两个 "queue-test" 的消费者,在消费消息时,这两个消费者默认会轮询消费。在消费完消息之后,调用 message.acknowledge() 进行消息的手动确认。如果在消费者中未进行手动确认的话,由于 ActiveMQ 进行了持久化消息,那么在项目下次启动的时候还会再次发送该消息。

package com.caihao.activemqdemo.consumer;

import com.caihao.activemqdemo.entity.Student;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.command.ActiveMQMessage;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;

@Slf4j
@Component
public class ConsumerListener {
     

    /**
     * queue-test普通队列:消费者1
     */
    @JmsListener(destination = "queue-test", containerFactory = "queueListener")
    public void receiveQueueTest1(ActiveMQMessage message, Session session) throws JMSException,
            JsonProcessingException {
   
     
        log.info("receiveQueueTest:1");
        String text = null;
        if (message instanceof TextMessage) {
   
     
            TextMessage textMessage = (TextMessage) message;
            text = textMessage.getText();
            log.info("queue1接收到消息:{}", text);
            ObjectMapper objectMapper = new ObjectMapper();
            Student student = objectMapper.readValue(text, Student.class);
            sleep(5000);
            log.info("queue1接收到student:{}", student);
            // 手动确认
            message.acknowledge();
        }
    }

    /**
     * queue-test普通队列:消费者2
     */
    @JmsListener(destination = "queue-test", containerFactory = "queueListener")
    public void receiveQueueTest2(ActiveMQMessage message, Session session) throws JMSException,
            JsonProcessingException {
   
     
        log.info("receiveQueueTest:2");
        String text = null;
        if (message instanceof TextMessage) {
   
     
            TextMessage textMessage = (TextMessage) message;
            text = textMessage.getText();
            log.info("queue2接收到消息:{}", text);
            ObjectMapper objectMapper = new ObjectMapper();
            Student student = objectMapper.readValue(text, Student.class);
            sleep(5000);
            log.info("queue2接收到student:{}", student);
            // 手动确认
            message.acknowledge();
        }
    }

    private void sleep(long time) {
   
     
        try {
   
     
            Thread.sleep(time);
        } catch (InterruptedException e) {
   
     
            e.printStackTrace();
        }
    }

}

6、发送延时消息

发送延时消息需要修改 ActiveMQ 安装目录下的 conf/activemq.xml 文件。

打开activemq.xml 文件,找到 <broker> 标签,在 <broker> 标签里面增加一个属性 schedulerSupport="true" 。如下所示:

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}"
schedulerSupport="true">

发送延迟消息队列的时候还是调用 jmsMessagingTemplate.convertAndSend 方法,只不过相比于普通队列,我们需要再多传一个 Map 类型的参数,为该参数添加一个键值对,其中 key 为 ScheduledMessage.AMQ_SCHEDULED_DELAY ,value 为需要延迟的时间(毫秒)。

生产者代码如下:

@Autowired
@Qualifier("delayQueue")
private Queue delayQueue;

/**
 * 发送延迟消息队列
 */
@GetMapping("/delay-queue")
public String sendDelayQueue() throws JsonProcessingException {
   
     
    Student student = new Student(2, "李四", new Date());
    ObjectMapper objectMapper = new ObjectMapper();
    String msg = objectMapper.writeValueAsString(student);
    Map<String, Object> headers = new HashMap<>();
    // 延迟5秒
    headers.put(ScheduledMessage.AMQ_SCHEDULED_DELAY, 5000);
    log.info("延迟发送-开始");
    jmsMessagingTemplate.convertAndSend(delayQueue, msg, headers);
    log.info("延迟发送-结束");
    return "send delay queue success";
}

消费者代码和普通队列方式无异,如下:

@JmsListener(destination = "delay-queue-test", containerFactory = "queueListener")
public void receiveDelayQueueTest(ActiveMQMessage message, Session session) throws JMSException,
JsonProcessingException {
   
     
    log.info("receiveDelayQueueTest");
    String text = null;
    if (message instanceof TextMessage) {
   
     
        TextMessage textMessage = (TextMessage) message;
        text = textMessage.getText();
        log.info("delayQueue接收到消息:{}", text);
        ObjectMapper objectMapper = new ObjectMapper();
        Student student = objectMapper.readValue(text, Student.class);
        sleep(2000);
        log.info("delayQueue接收到student:{}", student);
        // 手动确认
        message.acknowledge();
    }
}

7、死信队列

7.1、默认死信队列 ActiveMQ.DLQ

DLQ-死信队列 (Dead Letter Queue) 用来保存处理失败或者过期的消息。默认的死信队列为 ActiveMQ.DLQ ,为了演示死信队列,我在消费者监听类中增加一个监听 queue-test 队列的方法 receiveQueueTest3(),当轮到该方法消费消息的时候,调用 session.recover() 方法让消息重发,默认重发 6 次后,消息就会进入死信队列中。消息生产者方法见 5.1 中的消息生产者。receiveQueueTest3() 方法代码如下:

/**
 * queue-test普通队列:消费者3,专门用来让消息重发,从而进入死信队列
 */
@JmsListener(destination = "queue-test", containerFactory = "queueListener")
public void receiveQueueTest3(ActiveMQMessage message, Session session) throws JMSException,
JsonProcessingException {
   
     
    log.info("receiveQueueTest:3");
    String text = null;
    if (message instanceof TextMessage) {
   
     
        TextMessage textMessage = (TextMessage) message;
        text = textMessage.getText();
        log.info("queue3接收到消息:{}", text);
        ObjectMapper objectMapper = new ObjectMapper();
        Student student = objectMapper.readValue(text, Student.class);
        // 该消费者用来让消息重发,从而进入死信队列
        session.recover();
        if (true) {
   
     
            return;
        }
        sleep(5000);
        log.info("queue3接收到student:{}", student);
        // 手动确认
        message.acknowledge();
    }
}

当轮到该消费者消费消息的时候,由于调用了 session.recover() ,消息会进行重发 6 次之后就不再重发。这个时候观察 ActiveMQ 的管理界面 http://192.168.237.23:8161/admin/queues.jsp 的 ActiveMQ.DLQ 队列,由于没有消费者监听 ActiveMQ.DLQ ,因此该队列会显示有 1 条待处理信息,1 条入队,0 条出队。

7.2、自定义死信队列名称

项目中通常会让自定义死信队列名称,且每个队列不共用一个死信队列,因此我们可以修改 ActiveMQ 安装目录下的 conf/activemq.xml 文件自定义名称。

打开activemq.xml 文件,找到 <policyEntries> 标签,在 <policyEntries> 里面增加一个 <policyEntry> ,增加代码如下所示:配置死信队列前缀为 DLQ. (自定义)

<policyEntry queue=">">
    <deadLetterStrategy>
        <individualDeadLetterStrategy queuePrefix="DLQ."
                                      useQueueForQueueMessages="true"/>
    </deadLetterStrategy>
</policyEntry>

具体如下图片所示:

 

增加一个消费者监听之前的 queue-test 队列产生的死信。

/**
 * queue-test 产生的死信队列
 */
@JmsListener(destination = "DLQ.queue-test")
public void receiveDLQQueueTest(ActiveMQMessage message, Session session) throws JMSException,
JsonProcessingException {
   
     
    log.info("DLQ.queue-test");
    if (message instanceof TextMessage) {
   
     
        TextMessage textMessage = (TextMessage) message;
        String text = textMessage.getText();
        ObjectMapper objectMapper = new ObjectMapper();
        Student student = objectMapper.readValue(text, Student.class);
        log.info("DLQ.queue-test接收到消息:{}", student);
    }
}

再次测试发送 queue-test 队列,当轮到 7.1 中的消费者收到消息的时候,会进行消息重发 6 次,之后消息会进入我们自定义的 DLQ.queue-test 队列中,然后消息就会被我们写的 receiveDLQQueueTest 方法所收到。

7.3、丢弃某个死信队列

丢弃某个指定死信队列,即让处理失败或者过期的消息直接丢弃掉,不要进入死信队列。还是需要配置 ActiveMQ 安装目录下的 conf/activemq.xml 文件。打开 activemq.xml 文件,找到 <broker> 标签,在 <broker> 下面添加如下代码:

<plugins>
    <!-- 丢弃指定死信队列 -->
    <discardingDLQBrokerPlugin dropOnly="DLQ.queue-test3" reportInterval="1000" />
</plugins>

图片示例如下:

 

再次测试发送 queue-test 队列,当轮到 7.1 中的消费者收到消息的时候,会进行消息重发 6 次,之后消息就丢弃了,没有进入 DLQ.queue-test 或者默认的 ActiveMQ.DLQ 死信队列中。

完整代码:https://gitee.com/caiworld/note-demo/tree/master/activemq-demo

版权声明:「DDKK.COM 弟弟快看,程序员编程资料站」本站文章,版权归原作者所有