当先锋百科网

首页 1 2 3 4 5 6 7

配置文件

首先需要在配置文件添加:

#确认消息已发送到交换机
    publisher-confirm-type: correlated

配置文件整体为:
在这里插入图片描述

配置类代码

/**
 * 配置类 发布确认
 */
@Configuration
public class ConfirmConfig {
    //普通交换机
    public static final String CONFIRM_EXCHANGE_NAME="confirm_exchange";
    //普通队列
    public static final String CONFIRM_QUEUE_NAME="confirm_queue";
    //RoutingKey
    public static final String CONFIRM_EXCHANGE_ROUTING_KEY="key1";
    //声明普通交换机
    @Bean("confirmExchange")
    public DirectExchange confirmExchange(){
        return new DirectExchange(CONFIRM_EXCHANGE_NAME);
    }
    //声明普通队列
    @Bean("confirmQueue")
    public Queue confirmQueue(){
        return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    }
	//绑定普通交换机和普通队列
    @Bean
    public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue confirmQueue,
                                        @Qualifier("confirmExchange")DirectExchange confirmExchange){
        return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_EXCHANGE_ROUTING_KEY);
    }

回调接口配置

/**
 *回调接口
 */
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        //内部接口注入类中
        rabbitTemplate.setConfirmCallback(this);
    }

    /**
     *交换机确定回调方法
     * 1.发消息 交换机接收到消息 回调
     *  1.1 correlationData 保存回调消息的ID及相关信息
     *  1.2 交换机收到消息 ack=true
     *  1.3 cause null
     * 2.发消息 交换机接受失败 回调
     *  2.1 correlationData 保存回调消息的ID及相关信息
     *  2.2 交换机收到消息 ack=false
     *  2.3 cause 失败原因
     */

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData != null ? correlationData.getId() : "";
        if(ack){
            log.info("交换机已经收到id为:{}的消息",id);
        }
        else {
            log.info("交换机未经收到id为:{}的消息,原因为:{}",id,cause);
        }

    }

生产者代码

@Slf4j
@RestController
@RequestMapping("confirm")
public class ProduceController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMessage/{message}")
    public void sendMessage(@PathVariable String message){
        CorrelationData correlationData1 = new CorrelationData("1");
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
                ConfirmConfig.CONFIRM_EXCHANGE_ROUTING_KEY,message,correlationData1);
        log.info("发送的消息内容为:{}",message);
 }

消费者代码

/**
 * 接收消息
 */
@Slf4j
@Component
public class Consumer {
    @RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
    public void receiveConfirmMessage(Message message){
       String msg = new String(message.getBody());
        log.info("接受到的队列confirm.queue消息:{}",msg);
    }
}

测试

先测试一下正确的情况
在这里插入图片描述
消息已发出,交换机正常接收转发消息,消费者正常接收到消息
在这里插入图片描述

现在将发消息时的交换机名改错,测试一下
在这里插入图片描述
在发消息选择的交换机名字后面拼接1,这样交换机名字就变成了CONFIRM_EXCHANGE_NAME1,我们之前没有声明过,现在测试发消息
在这里插入图片描述

在这里插入图片描述
交换机未收到消息。
测试结束

但是到现在为止只能确认交换机是否收到信息,如队列出现问题,如不可路由等情况,消息会被直接丢弃,且生产者不知情,就会出现问题
所以可以继续学习:消息回报(队列确认),备份交换机

同系列文章

原理部分

MQ(消息队列)简介
RabbitMQ简介
RabbitMQ 四大核心概念及工作原理

操作部分

Windows版Docker安装RabbitMq
Maven整合RabbitMQ实现生产消费消息
SpringBoot整合RabbitMQ实现生产消费消息
RabbitMQ延迟队列及实战
RabbitMQ-消息回报(队列确认)
RabbitMQ-备份交换机
RabbitMQ-优先级队列