Springboot集成rabbitmq之mandatory和备份交换机

Springboot集成rabbitmq之mandatory和备份交换机

mandatory

之前编写的消息队列代码中,通过重写ConfirmCallback中的confirm方法实现了消息送达的确认以及出错的处理,但是,该方法无法判断消息投递到不存在的队列中导致失败的问题。 mandatory是channel.basicPublish方法中的参数,当mandatory设置为true时,交换机无法根据自身的类型和路由键找到一个符合条件的队列,那么就会调用Basic.Return命令将消息返回给生产者。当mandatory参数设置为false时,出现上述情形,则消息直接丢弃。

springboot中使用mandatory

如果不使用amqp框架,那么可以调用channel.addReturnListener来添加ReturnListener监听器实现。
在springboot框架中,可以在rabbitmqTemplate中设置setReturnsCallback()方法,并且把rabbitmqTemplate的Mandatory设置为true.具体示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
RabbitConfig.java
@Bean
public RabbitTemplate createRabbitMq() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(returnedMessage -> log.info("消息发送出错:" + returnedMessage.getReplyText()));
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.info(correlationData.getId() + "发送成功");
} else {
log.info("消息发送失败,{}",cause);
}
});
return rabbitTemplate;
}

这里在setReturnCallback方法中,添加了log,打印了出错原因;此外还可以打印发送的消息,这样可以更精准的定位问题。
这样,如果消息被发送到不存在的routingKey,就会触发ReturnCallBack方法,打印出对应问题。

1
2
3
4
5
6
7
8
2022-10-01 22:36:06.271  INFO 17968 --- [nectionFactory2] c.t.s.config.RabbitMQConfig              : 0发送成功
2022-10-01 22:36:06.290 INFO 17968 --- [nectionFactory2] c.t.s.config.RabbitMQConfig : 消息发送出错:(Body:'发送消息1' MessageProperties [headers={spring_returned_message_correlation=1}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]),NO_ROUTE
2022-10-01 22:36:06.290 INFO 17968 --- [nectionFactory3] c.t.s.config.RabbitMQConfig : 消息发送出错:(Body:'发送消息2' MessageProperties [headers={spring_returned_message_correlation=2}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]),NO_ROUTE
2022-10-01 22:36:06.290 INFO 17968 --- [nectionFactory1] c.t.s.config.RabbitMQConfig : 1发送成功
2022-10-01 22:36:06.291 INFO 17968 --- [nectionFactory4] c.t.s.config.RabbitMQConfig : 2发送成功
2022-10-01 22:36:06.313 INFO 17968 --- [nectionFactory4] c.t.s.config.RabbitMQConfig : 消息发送出错:(Body:'发送消息5' MessageProperties [headers={spring_returned_message_correlation=5}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]),NO_ROUTE
2022-10-01 22:36:06.313 INFO 17968 --- [nectionFactory3] c.t.s.config.RabbitMQConfig : 消息发送出错:(Body:'发送消息4' MessageProperties [headers={spring_returned_message_correlation=4}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]),NO_ROUTE
2022-10-01 22:36:06.313 INFO 17968 --- [nectionFactory1] c.t.s.config.RabbitMQConfig : 5发送成功

备份交换机

上述的逻辑中提到了,将Mandatory参数设置为True,并且设置了对应的ReturnBack 回调方法,如果不想使代码变得繁琐,不编写ReturnBack方法,没有设置Mandatory参数,并且,又想要消息不丢失,那么可以使用备份交换机,这样,未被路由的信息,就会存储在备用交换机中,可以以后再去处理;

springboot配置备份交换机

声明备用交换机,队列,和绑定关系

注意,备用交换机必须是Fanout类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
RabbitConfig.java
public Queue backupQueue() {
return new Queue("backup", true,false,false);
}

@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("backup", true, false);
}

@Bean
public Binding backupBinding(){
return BindingBuilder.bind(backupQueue()).to(fanoutExchange());
}

上述代码配置了对应的交换机和队列,并且进行了绑定;

给交换机添加备用交换机

这里有个很大的坑,就是已经创建的交换机是不能修改的,所以需要删除原来的交换机或者创建一个新的交换机
所以,这里新创建了一个交换机

1
2
3
4
5
6
7
8
@Bean
public TopicExchange createExchange() {
//这三个参数,分别是 交换机名称,是否持久化,以及是否自动删除
Map<String, Object> arguments = new HashMap<>();
arguments.put("alternate-exchange", "backup");
TopicExchange topicExchange = new TopicExchange("config_exchange2", true, false,arguments);
return topicExchange;
}

这个交换机也需要绑定队列等等,略。
此时,给这个交换机发送消息到不存在的路由键,则会将这些消息转到对应的备份交换机及队列中。

可以看到,如果交换机配置了备份交换机,会有AE的标签。

总结

介绍了消息在发送给不存在交换机时可能造成的丢失问题,以及如何检测该问题。个人感觉配置备份交换机是一个比较稳妥的方式,但是备用交换机的创建还是有一些坑存在的。
1、备用交换机是Fanout模式
2、已经创建的交换机,不能修改添加备用交换机,会报PRECONDITION_FAILED - inequivalent arg 的错误。


Springboot集成rabbitmq之mandatory和备份交换机
http://yoursite.com/2022/10/01/Springboot集成rabbitmq之mandatory和备份交换机/
作者
yangchen
发布于
2022年10月1日
许可协议