当先锋百科网

首页 1 2 3 4 5 6 7

1 导入maven依赖jar包

 <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2 配置application.properties


#mq
spring.rabbitmq.host=192.168.43.100
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
spring.rabbitmq.virtual-host=my_vhost

# 发送者开启 confirm 确认机制
spring.rabbitmq.publisher-confirm-type=correlated
# 发送者开启 return 确认机制
spring.rabbitmq.publisher-returns=true

3 配置普通交换机,队列,消息生成者,消费者

配置交换机和队列进行绑定

package com.example.springbootdemo.rabbitmq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;



@Component
@Slf4j
@Configuration
public class RabbitmqTest {

    public static final String STATION_EXCHANGE = "station-exchange";

    public static final String STATION_QUEUE_11 = "station-queue-11";

    public static final String STATION_QUEUE_197 = "station-queue-197";


    @Bean
    public Binding testBinging(TopicExchange exchange,Queue testQueue){
     return BindingBuilder.bind(testQueue).to(exchange).with("#");
    }


    @Bean
    public TopicExchange stationExchange(){
        return new TopicExchange(STATION_EXCHANGE,true,false,null);
    }

    @Bean
    public Queue stationQueue(){
        return new Queue(STATION_QUEUE_11,true,false,false,null);
    }

    @Bean
    public Queue stationQueue197(){
        return new Queue(STATION_QUEUE_197,true,false,false,null);
    }


    @Bean
    public Binding stationBinding(TopicExchange stationExchange,Queue stationQueue){
        return BindingBuilder.bind(stationQueue).to(stationExchange).with("11");
    }

    @Bean
    public Binding stationBinding2(TopicExchange stationExchange,Queue stationQueue197){
        return BindingBuilder.bind(stationQueue197).to(stationExchange).with("197");
    }
}

配置消费者

package com.example.springbootdemo.listener;

import com.example.springbootdemo.rabbitmq.config.RabbitmqTest;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class StationListener {

    @RabbitListener(queues = RabbitmqTest.STATION_QUEUE_11)
    public void station11(Message message){
        log.info("11监听到消息: {}",new String(message.getBody()));
    }



    @RabbitListener(queues = RabbitmqTest.STATION_QUEUE_197)
    public void station197(Message message){
        log.info("197监听到消息: {}",new String(message.getBody()));
    }
}

消息生产者

package com.example.springbootdemo.rabbitmq.production;

import com.example.springbootdemo.config.rabbitmq.RabbitmqTemplateConfirm;
import com.example.springbootdemo.rabbitmq.config.RabbitmqTest;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Component
@Slf4j
public class TestMessageProduction {

    @Resource
    private RabbitmqTemplateConfirm rabbitmqTemplateConfirm;

    public void testMessage(String key){
        log.info("消息发送确认");
        RabbitTemplate rabbitTemplate = rabbitmqTemplateConfirm.getRabbitTemplate();
        rabbitTemplate.convertAndSend(RabbitmqTest.STATION_EXCHANGE,key,"消息发送者确认");
    }
}

4 配置延时交换机和队列,生产者,消费者

配置延时交换机和队列

package com.example.springbootdemo.rabbitmq.config;

import com.google.common.collect.Maps;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
public class PayMessageMq {

    public static final String ONLINE_PAY_EXCHANGE = "online-pay-exchange";

    public static final String ONLINE_PAY_QUEUE = "online-pay-queue";

    public static final String REFUND_EXCHANGE = "refund-exchange";

    public static final String REFUND_QUEUE = "refund-queue";

    @Bean
    public CustomExchange onlinePayExchange(){

        Map<String, Object> args = Maps.newHashMap();
        //自定义交换机的类型,指定分发方式
        args.put("x-delayed-type", "topic");
        //此处type指定为延迟交换机
        return new CustomExchange(ONLINE_PAY_EXCHANGE, "x-delayed-message", true, false, args);
    }

    @Bean
    public Queue onlinePayQueue(){
        return new Queue(ONLINE_PAY_QUEUE,true,false,false);
    }


    @Bean
    public Binding onlineBinding(Queue onlinePayQueue, CustomExchange onlinePayExchange){
        return BindingBuilder.bind(onlinePayQueue).to(onlinePayExchange).with("#").noargs();
    }

    @Bean
    public CustomExchange refundExchange(){
        Map<String,Object> args = Maps.newHashMap();
        args.put("x-delayed-type", "topic");
        return new CustomExchange(REFUND_EXCHANGE, "x-delayed-message", true, false, args);
    }

    @Bean
    public Queue refundQueue(){
        return new Queue(REFUND_QUEUE, true,false,false);
    }

    @Bean
    public Binding refundBinding(CustomExchange refundExchange,Queue refundQueue){
        return BindingBuilder.bind(refundQueue).to(refundExchange).with("#").noargs();
    }
}

配置消费者

package com.example.springbootdemo.listener;

import com.example.springbootdemo.application.pay.OnlinePay;
import com.example.springbootdemo.application.pay.ali.response.QueryAliPayResponse;
import com.example.springbootdemo.entity.Order;
import com.example.springbootdemo.entity.PaymentRecord;
import com.example.springbootdemo.entity.dict.PaymentRecordEnumStatus;
import com.example.springbootdemo.rabbitmq.config.PayMessageMq;

import com.example.springbootdemo.rabbitmq.entity.QueryPayMessageEvent;
import com.example.springbootdemo.rabbitmq.entity.RefundMessageEvent;
import com.example.springbootdemo.rabbitmq.production.MessageProduction;
import com.example.springbootdemo.service.PaymentRecordService;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Resource;
import java.util.Objects;

@Slf4j
@Component
public class OnlinePayListener {

    @Resource
    private OnlinePay onlinePay;

    @Resource
    private ObjectMapper objectMapper;

    @Resource
    private PaymentRecordService paymentRecordService;

    @Resource
    private MessageProduction messageProduction;


    @RabbitListener(queues = PayMessageMq.ONLINE_PAY_QUEUE)
    public void queryPay(@Payload QueryPayMessageEvent event) throws JsonProcessingException {

        if(Objects.nonNull(event) && event.getQueryNum() <= 16) {

            log.info("查询支付结果orderCod: {}", event.getOrderCode());

            String payStatus = onlinePay.queryPayStatus(Order.builder().code(event.getOrderCode()).build());

            QueryAliPayResponse queryAliPayResponse = objectMapper.readValue(payStatus, QueryAliPayResponse.class);
            QueryAliPayResponse.AlipayTradeQueryResponse alipayTradeQueryResponse = queryAliPayResponse.getAlipay_trade_query_response();
            if(Objects.nonNull(alipayTradeQueryResponse) && alipayTradeQueryResponse.getCode().equals("10000") && alipayTradeQueryResponse.getTrade_status().equals("TRADE_SUCCESS")){
                PaymentRecord paymentRecord = paymentRecordService.lambdaQuery().eq(PaymentRecord::getOrderCode, event.getOrderCode()).one();
                paymentRecord.setTradeNo(alipayTradeQueryResponse.getTrade_no());
                paymentRecord.setStatus(PaymentRecordEnumStatus.SUCCESS);
                paymentRecordService.updateById(paymentRecord);
            }else {
                event.setQueryNum(event.getQueryNum()+1);
                messageProduction.queryPayRecord(event,59*1000);
            }

            log.info("支付结果: {}", payStatus);
        }
    }

    public void queryRefund(@Payload RefundMessageEvent event){


        if(Objects.nonNull(event) && event.getQueryNum() <= 16) {
            log.info("查询退款结果orderCode: {}", event.getOrderCode());

            String queryRefund = onlinePay.queryRefund(event.getOrderCode(), event.getOutRequestNo(),event.getTradeNo());

            System.out.println(queryRefund);
        }else {

        }
    }
}

生产者

package com.example.springbootdemo.rabbitmq.production;

import com.example.springbootdemo.rabbitmq.config.PayMessageMq;
import com.example.springbootdemo.rabbitmq.entity.QueryPayMessageEvent;
import com.example.springbootdemo.rabbitmq.entity.RefundMessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Slf4j
@Component
public class MessageProduction {

    @Resource
    private RabbitTemplate rabbitTemplate;



    public void queryPayRecord(QueryPayMessageEvent event,Integer delayTime){
        log.info("发送查询支付结果事件:订单号: {} ",event.getOrderCode());
        rabbitTemplate.convertAndSend(PayMessageMq.ONLINE_PAY_EXCHANGE,"pay",event,correlationData->{
            correlationData.getMessageProperties().setDelay(delayTime);
            return correlationData;
        });
        log.info("发送查询支付结果事件完成");
    }


    public void queryRefund(RefundMessageEvent event,Integer delayTime){
        log.info("发送查询退款事件: {}",event.getOrderCode());

        rabbitTemplate.convertAndSend(PayMessageMq.REFUND_EXCHANGE,"hello",event,corn->{
            corn.getMessageProperties().setDelay(delayTime);
            return corn;
        });
        log.info("发送查询退款事件完成");
    }
}

5 配置某些交换机,队列进行消息发布的确认

  • 在application.properties配置:

发送者开启 confirm 确认机制

spring.rabbitmq.publisher-confirm-type=correlated

发送者开启 return 确认机制

spring.rabbitmq.publisher-returns=true

  • 代码配置如下
package com.example.springbootdemo.config.rabbitmq;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Objects;

/***
 *发送者开启 confirm 确认机制
 */
@Slf4j
@Component
public class RabbitmqTemplateConfirm implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback{
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (!ack) {
            log.error("confirm==>发送到broker失败\r\n" +
                            "correlationData={}\r\n" + "ack={}\r\n" + "cause={}",
                    correlationData, ack, cause);
        } else {
            log.info("confirm==>发送到broker成功\r\n" +
                            "correlationData={}\r\n" + "ack={}\r\n" + "cause={}",
                    correlationData, ack, cause);
        }
    }

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText,
                                String exchange, String routingKey) {
        log.info("returnedMessage==> \r\n" + "message={}\r\n" + "replyCode={}\r\n" +
                        "replyText={}\r\n" + "exchange={}\r\n" + "routingKey={}",
                message, replyCode, replyText, exchange, routingKey);
    }

    private RabbitTemplate rabbitTemplate;

    /* 连接工厂 */
    @Resource
    private ConnectionFactory connectionFactory;

    public RabbitTemplate getRabbitTemplate(){
        if(Objects.isNull(rabbitTemplate)){
            rabbitTemplate=new RabbitTemplate(connectionFactory);
            rabbitTemplate.setMandatory(true);
            rabbitTemplate.setConfirmCallback(this);
            rabbitTemplate.setReturnCallback(this);
        }
        return rabbitTemplate;
    }
}

package com.example.springbootdemo.rabbitmq.production;

import com.example.springbootdemo.config.rabbitmq.RabbitmqTemplateConfirm;
import com.example.springbootdemo.rabbitmq.config.RabbitmqTest;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Component
@Slf4j
public class TestMessageProduction {

    @Resource
    private RabbitmqTemplateConfirm rabbitmqTemplateConfirm;

    public void testMessage(String key){
        log.info("消息发送确认");
        RabbitTemplate rabbitTemplate = rabbitmqTemplateConfirm.getRabbitTemplate();
        rabbitTemplate.convertAndSend(RabbitmqTest.STATION_EXCHANGE,key,"消息发送者确认");
    }
}

6 对某些队列进行手动ack

设置消费端手动 ack none不确认 auto自动确认 manual手动确认

spring.rabbitmq.listener.simple.acknowledge-mode=manual
该配置是全局的,如果只对某些队列进行手动需要进行如下配置

package com.example.springbootdemo.config.rabbitmq;

import com.example.springbootdemo.rabbitmq.config.RabbitmqTest;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/***
 * rabbitmq对单个队列消费这设置手动ack
 */
@Slf4j
@Configuration
public class RabbitmqConsumerAck {

    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(RabbitmqTest.STATION_QUEUE_197);
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        container.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {
            log.info(RabbitmqTest.STATION_QUEUE_197 + "get msg:" +new String(message.getBody()));
            if(message.getMessageProperties().getHeaders().get("error") == null){
                // 消息手动ack
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
                log.info("消息确认");
            }else {
                // 消息重新回到队列
                //channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
                // 拒绝消息(删除)
                channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
                log.info("消息拒绝");
            }

        });
        return container;
    }
}

7 rabbitmq其他配置


##发送重试配置
#启用发送重试
#spring.rabbitmq.template.retry.enabled=true
#最大重试次数
#spring.rabbitmq.template.retry.max-attempts=5
#第一次和第二次尝试发布或传递消息之间的间隔
#spring.rabbitmq.template.retry.initial-interval=1000ms
#应用于上一重试间隔的乘数 步长
#spring.rabbitmq.template.retry.multiplier=2
#最大重试时间间隔
#spring.rabbitmq.template.retry.max-interval=10000ms