Springboot集成rabbitmq之mandatory和备份交换机
mandatory
之前编写的消息队列代码中,通过重写ConfirmCallback中的confirm方法实现了消息送达的确认以及出错的处理,但是,该方法无法判断消息投递到不存在的队列中导致失败的问题。 mandatory是channel.basicPublish方法中的参数,当mandatory设置为true时,交换机无法根据自身的类型和路由键找到一个符合条件的队列,那么就会调用Basic.Return命令将消息返回给生产者。当mandatory参数设置为false时,出现上述情形,则消息直接丢弃。
springboot中使用mandatory
如果不使用amqp框架,那么可以调用channel.addReturnListener来添加ReturnListener监听器实现。
在springboot框架中,可以在rabbitmqTemplate中设置setReturnsCallback()方法,并且把rabbitmqTemplate的Mandatory设置为true.具体示例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| RabbitConfig.java @Bean public RabbitTemplate createRabbitMq() { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnsCallback(returnedMessage -> log.info("消息发送出错:" + returnedMessage.getReplyText())); rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { log.info(correlationData.getId() + "发送成功"); } else { log.info("消息发送失败,{}",cause); } }); return rabbitTemplate; }
|
这里在setReturnCallback方法中,添加了log,打印了出错原因;此外还可以打印发送的消息,这样可以更精准的定位问题。
这样,如果消息被发送到不存在的routingKey,就会触发ReturnCallBack方法,打印出对应问题。
1 2 3 4 5 6 7 8
| 2022-10-01 22:36:06.271 INFO 17968 --- [nectionFactory2] c.t.s.config.RabbitMQConfig : 0发送成功 2022-10-01 22:36:06.290 INFO 17968 --- [nectionFactory2] c.t.s.config.RabbitMQConfig : 消息发送出错:(Body:'发送消息1' MessageProperties [headers={spring_returned_message_correlation=1}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]),NO_ROUTE 2022-10-01 22:36:06.290 INFO 17968 --- [nectionFactory3] c.t.s.config.RabbitMQConfig : 消息发送出错:(Body:'发送消息2' MessageProperties [headers={spring_returned_message_correlation=2}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]),NO_ROUTE 2022-10-01 22:36:06.290 INFO 17968 --- [nectionFactory1] c.t.s.config.RabbitMQConfig : 1发送成功 2022-10-01 22:36:06.291 INFO 17968 --- [nectionFactory4] c.t.s.config.RabbitMQConfig : 2发送成功 2022-10-01 22:36:06.313 INFO 17968 --- [nectionFactory4] c.t.s.config.RabbitMQConfig : 消息发送出错:(Body:'发送消息5' MessageProperties [headers={spring_returned_message_correlation=5}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]),NO_ROUTE 2022-10-01 22:36:06.313 INFO 17968 --- [nectionFactory3] c.t.s.config.RabbitMQConfig : 消息发送出错:(Body:'发送消息4' MessageProperties [headers={spring_returned_message_correlation=4}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]),NO_ROUTE 2022-10-01 22:36:06.313 INFO 17968 --- [nectionFactory1] c.t.s.config.RabbitMQConfig : 5发送成功
|
备份交换机
上述的逻辑中提到了,将Mandatory参数设置为True,并且设置了对应的ReturnBack 回调方法,如果不想使代码变得繁琐,不编写ReturnBack方法,没有设置Mandatory参数,并且,又想要消息不丢失,那么可以使用备份交换机,这样,未被路由的信息,就会存储在备用交换机中,可以以后再去处理;
springboot配置备份交换机
声明备用交换机,队列,和绑定关系
注意,备用交换机必须是Fanout类型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| RabbitConfig.java public Queue backupQueue() { return new Queue("backup", true,false,false); }
@Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("backup", true, false); }
@Bean public Binding backupBinding(){ return BindingBuilder.bind(backupQueue()).to(fanoutExchange()); }
|
上述代码配置了对应的交换机和队列,并且进行了绑定;
给交换机添加备用交换机
这里有个很大的坑,就是已经创建的交换机是不能修改的,所以需要删除原来的交换机或者创建一个新的交换机
所以,这里新创建了一个交换机
1 2 3 4 5 6 7 8
| @Bean public TopicExchange createExchange() { Map<String, Object> arguments = new HashMap<>(); arguments.put("alternate-exchange", "backup"); TopicExchange topicExchange = new TopicExchange("config_exchange2", true, false,arguments); return topicExchange; }
|
这个交换机也需要绑定队列等等,略。
此时,给这个交换机发送消息到不存在的路由键,则会将这些消息转到对应的备份交换机及队列中。

可以看到,如果交换机配置了备份交换机,会有AE的标签。
总结
介绍了消息在发送给不存在交换机时可能造成的丢失问题,以及如何检测该问题。个人感觉配置备份交换机是一个比较稳妥的方式,但是备用交换机的创建还是有一些坑存在的。
1、备用交换机是Fanout模式
2、已经创建的交换机,不能修改添加备用交换机,会报PRECONDITION_FAILED - inequivalent arg 的错误。