logo头像

From zero to HERO

Spring Boot 使用 rabbitmq 操作死信队列

1. 前言

之前探讨了 Spring Boot 集成 rabbitmq 以及开启 ack 模式——传送门,今天我们搞一下 死信队列

2. 概念

死信队列 听上去像 消息“死”了,其实也有点这个意思,死信队列 是当消息在一个队列由于下列原因:

  1. 消息被拒绝(basic.reject/basic.nack)并且不再重新投递 requeue=false
  2. 消息超期 (rabbitmq Time-To-Live -> messageProperties.setExpiration())
  3. 队列超载

当消息变成了 “死信” 后会被重新投递(publish)到另一个 Exchange。 该 Exchange 就是一个 DLX 属性的交换机。然后该 Exchange 根据绑定规则转发到对应的队列上监听该队列,于是 死信消息 就可以重新被消费。

说白了就是没有被消费的消息,换个地方重新被消费。

         ** 生产者   -->  消息 -->  交换机 --\> 队列  --> 变成死信  --\> DLX交换机  -->队列 --\> 消费者**

3.springboot rabbitmq 死信队列实践

下面我们模拟一个死信队列的应用场景——消息延时处理。还是以这个项目为基础: https://gitee.com/felord/springboot-message

3.1 Rabbitmq 中死信队列的配置

项目中 RabbitConfig 死信相关片段:

   /**
     * 死信队列跟交换机类型没有关系 不一定为directExchange  不影响该类型交换机的特性.
     *
     * @return the exchange
     */
    @Bean("deadLetterExchange")
    public Exchange deadLetterExchange() {
        return ExchangeBuilder.directExchange("DL_EXCHANGE").durable(true).build();
    }

    /**
     * 声明一个死信队列.
     * x-dead-letter-exchange   对应  死信交换机
     * x-dead-letter-routing-key  对应 死信队列
     *
     * @return the queue
     */
    @Bean("deadLetterQueue")
    public Queue deadLetterQueue() {
        Map<String, Object> args = new HashMap<>(2);
//       x-dead-letter-exchange    声明  死信交换机
        args.put(DEAD_LETTER_QUEUE_KEY, "DL_EXCHANGE");
//       x-dead-letter-routing-key    声明 死信路由键
        args.put(DEAD_LETTER_ROUTING_KEY, "KEY_R");
        return QueueBuilder.durable("DL_QUEUE").withArguments(args).build();
    }

    /**
     * 定义死信队列转发队列.
     *
     * @return the queue
     */
    @Bean("redirectQueue")
    public Queue redirectQueue() {
        return QueueBuilder.durable("REDIRECT_QUEUE").build();
    }

    /**
     * 死信路由通过 DL_KEY 绑定键绑定到死信队列上.
     *
     * @return the binding
     */
    @Bean
    public Binding deadLetterBinding() {
        return new Binding("DL_QUEUE", Binding.DestinationType.QUEUE, "DL_EXCHANGE", "DL_KEY", null);

    }

    /**
     * 死信路由通过 KEY_R 绑定键绑定到死信队列上.
     *
     * @return the binding
     */
    @Bean
    public Binding redirectBinding() {
        return new Binding("REDIRECT_QUEUE", Binding.DestinationType.QUEUE, "DL_EXCHANGE", "KEY_R", null);
    }

说明:

deadLetterExchange() 声明了一个 Direct 类型的 Exchange (死信队列跟交换机没有关系)。

deadLetterQueue() 声明了一个队列。 这个队列跟前面我们声明的队列不一样,它注入了 Map<String,Object> 参数,下面的概念非常重要:

x-dead-letter-exchange 来标识一个交换机; x-dead-letter-routing-key 来标识一个绑定键(RoutingKey), 这个绑定键是标识的交换机的, 如果没有特殊指定声明队列的原 routingkey , 有队列通过此绑定键绑定到该交换机,那么死信会被该交换机转发到该队列上通过监听可对消息进行消费。**
可以打个比方,这个是一个球队为主力队员设置了一个替补,如果主力 “死”了,他的活替补接手,这样更好理解一点。

deadLetterBinding() 对这个带参队列进行了和交换机的规则绑定,等下消费者先把消息通过交换机投递到该队列中去,然后制造条件发生“死信” 。redirectBinding() 我们需要给标识的交换机 ,以及对其指定的 routingkey 来绑定一个所谓的“替补”队列用来监听。

流程具体是消息投递到 DL_QUEUE 10秒后消息过期生成死信, 然后转发到 REDIRECT_QUEUE 通过对其的监听来消费消息。

3.2 消费死信消息

控制器 SendController 增加消费发送接口

    /**
     * 测试死信队列.
     *
     * @param p the p
     * @return the response entity
     */
    @RequestMapping("/dead")
    public ResponseEntity deadLetter(String p) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
//        声明消息处理器  这个对消息进行处理  可以设置一些参数   对消息进行一些定制化处理   我们这里  来设置消息的编码  以及消息的过期时间  因为在.net 以及其他版本过期时间不一致   这里的时间毫秒值 为字符串
        MessagePostProcessor messagePostProcessor = message -> {
            MessageProperties messageProperties = message.getMessageProperties();
//            设置编码
            messageProperties.setContentEncoding("utf-8");
//            设置过期时间10*1000毫秒
            messageProperties.setExpiration("10000");
            return message;
        };
//         向DL_QUEUE 发送消息  10*1000毫秒后过期 形成死信
        rabbitTemplate.convertAndSend("DL_EXCHANGE", "DL_KEY", p, messagePostProcessor, correlationData);
        return ResponseEntity.ok();
    }

监听替补队列 REDIRECT_QUEUE

    /**
     * 监听替补队列 来验证死信.
     *
     * @param message the message
     * @param channel the channel
     * @throws IOException the io exception  这里异常需要处理
     */
    @RabbitListener(queues = {"REDIRECT_QUEUE"})
    public void redirect(Message message, Channel channel) throws IOException {
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        log.debug("dead message  10s 后 消费消息 {}",new String (message.getBody()));
    }

3.3 测试死信队列接口

不出意外 消息会在发出10秒后 才被消费 一下信息证实了这一猜测

4. 总结

今天我们对 Rabbitmq 中的死信队列进行了介绍和运用,相信能够在实际开发中帮助你,多多关注公众号:Felordcn 获取更多原创编程技术。
相关源码:https://gitee.com/felord/springboot-message

评论系统未开启,无法评论!