当先锋百科网

首页 1 2 3 4 5 6 7

RabbitMQ–集成Springboot–07–死信交换机和死信队列配置


1、死信交换机和队列配置

1.1、配置

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @描述
 */
@Configuration
@Slf4j
public class TestDLXConfiguration {
    /**
     * 延时队列
     * 发送到该队列的message会在一段时间后过期进入到delay_process_queue
     * 队列里所有的message都有统一的失效时间
     */
    public static String DELAY_QUEUE   = "delay.queue";

    /**
     * 业务交换机
     */
    public static String DELAY_EXCHANGE = "delay.queue.exchange";

    /**
     * 实际消费队列
     * message失效后进入的队列,也就是实际的消费队列
     */
    public static final String PROCESS_QUEUE = "process.queue";

    /**
     * 处理的交换器
     */
    public static String PROCESS_EXCHANGE = "process.queue.exchange";

    /**
     * 超时时间
     */
    public static Long QUEUE_EXPIRATION = 4000L;

    /**
     * 配置处理交换(死信交换机)
     * @return
     */
    @Bean
    DirectExchange processExchange() {
        return new DirectExchange(PROCESS_EXCHANGE);
    }
    /**
     * 设置处理队列(死信队列)
     * @return
     */
    @Bean
    public Queue processQueue() {
        return QueueBuilder
                .durable(PROCESS_QUEUE)
                .build();
    }

    /**
     * 绑定死信队列到死信交换机
     * @return
     */
    @Bean
    Binding processBinding() {
        return BindingBuilder
				//绑定死信队列
                .bind(processQueue())
				//绑定死信交换机
                .to(processExchange())
				//路由键
                .with(PROCESS_QUEUE);
    }
    /**
     * 配置业务队列
     * @return
     */
    @Bean
    public Queue delayQueue() {
       //Map<String,Object> arguments = new HashMap<>(2);
       //arguments.put("x-dead-letter-exchange",DIRCET_EXCHANGE_NAME);
       // arguments.put("x-dead-letter-routing-key",TEST_QUEUE1_NAME);
       // arguments.put("x-message-ttl", 4000L);
       // return new Queue(orderQueue,true,false,false,arguments);
        //构造者模式
        return QueueBuilder.durable(DELAY_QUEUE)
                // DLX,dead letter发送到的exchange ,设置死信队列交换器到处理交换器
                .withArgument("x-dead-letter-exchange", PROCESS_EXCHANGE)
                // dead letter携带的routing key,配置处理队列的路由key
                .withArgument("x-dead-letter-routing-key", PROCESS_QUEUE)
                // 设置过期时间 当配置此事件则为延迟死信队列
                .withArgument("x-message-ttl", QUEUE_EXPIRATION)
                .build();
        
    }





    /**
     * 配置业务交换机
     * @return
     */
    @Bean
    DirectExchange delayExchange() {
        return new DirectExchange(DELAY_EXCHANGE);
    }


    /**
     * 将delayQueue2绑定延时交换机中,routingKey为队列名称
     * @return
     */
    @Bean
    Binding delayBinding() {
        return BindingBuilder
                .bind(delayQueue())
                .to(delayExchange())
                .with(DELAY_QUEUE);
    }

}
    //发送
   rabbitTemplate.convertAndSend(
                DelayConfig.DELAY_EXCHANGE,
                // routingKey
                DelayConfig.DELAY_QUEUE,
                msg);


1.2、发送消息

    //发送
   rabbitTemplate.convertAndSend(
                DelayConfig.DELAY_EXCHANGE,
                // routingKey
                DelayConfig.DELAY_QUEUE,
                msg);

2、延迟死信队列两种配置方式

2.1、方式1

rabbitTemplate.convertAndSend("Direct.exchange.name", "Direct.queue1.name",
                msg, message -> {                message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                      //此处设置延迟时间 单位毫秒
                    message.getMessageProperties().setExpiration("10000");
                    return message;
                });

2.2、方式2


   /**
     * 声明业务队列
     * @return Queue
     * 在声明业务队列时,创建了一个Map,并且put了两个值,这两个值就是死信队列的声明。
     * x-dead-letter-exchange:死信交换机的名称
     * x-dead-letter-routing-key:死信交换机的路由键,因为demo中两个交换机的类型都是direct的,因此路由键必须相同。
     */
    @Bean
    public Queue orderQueue() {
        Map<String,Object> arguments = new HashMap<>(2);
        // 绑定该队列到死信交换机
        arguments.put("x-dead-letter-exchange",DIRCET_EXCHANGE_NAME);
        arguments.put("x-dead-letter-routing-key",TEST_QUEUE1_NAME);
        // 设置过期时间
        arguments.put("x-message-ttl", 4000L);
        return new Queue(orderQueue,true,false,false,arguments);
    }