RabbitMQ–集成Springboot–07–死信交换机和死信队列配置
1、死信交换机和队列配置
1.1、配置
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* @描述
*/
@Configuration
@Slf4j
public class TestDLXConfiguration {
/**
* 延时队列
* 发送到该队列的message会在一段时间后过期进入到delay_process_queue
* 队列里所有的message都有统一的失效时间
*/
public static String DELAY_QUEUE = "delay.queue";
/**
* 业务交换机
*/
public static String DELAY_EXCHANGE = "delay.queue.exchange";
/**
* 实际消费队列
* message失效后进入的队列,也就是实际的消费队列
*/
public static final String PROCESS_QUEUE = "process.queue";
/**
* 处理的交换器
*/
public static String PROCESS_EXCHANGE = "process.queue.exchange";
/**
* 超时时间
*/
public static Long QUEUE_EXPIRATION = 4000L;
/**
* 配置处理交换(死信交换机)
* @return
*/
@Bean
DirectExchange processExchange() {
return new DirectExchange(PROCESS_EXCHANGE);
}
/**
* 设置处理队列(死信队列)
* @return
*/
@Bean
public Queue processQueue() {
return QueueBuilder
.durable(PROCESS_QUEUE)
.build();
}
/**
* 绑定死信队列到死信交换机
* @return
*/
@Bean
Binding processBinding() {
return BindingBuilder
//绑定死信队列
.bind(processQueue())
//绑定死信交换机
.to(processExchange())
//路由键
.with(PROCESS_QUEUE);
}
/**
* 配置业务队列
* @return
*/
@Bean
public Queue delayQueue() {
//Map<String,Object> arguments = new HashMap<>(2);
//arguments.put("x-dead-letter-exchange",DIRCET_EXCHANGE_NAME);
// arguments.put("x-dead-letter-routing-key",TEST_QUEUE1_NAME);
// arguments.put("x-message-ttl", 4000L);
// return new Queue(orderQueue,true,false,false,arguments);
//构造者模式
return QueueBuilder.durable(DELAY_QUEUE)
// DLX,dead letter发送到的exchange ,设置死信队列交换器到处理交换器
.withArgument("x-dead-letter-exchange", PROCESS_EXCHANGE)
// dead letter携带的routing key,配置处理队列的路由key
.withArgument("x-dead-letter-routing-key", PROCESS_QUEUE)
// 设置过期时间 当配置此事件则为延迟死信队列
.withArgument("x-message-ttl", QUEUE_EXPIRATION)
.build();
}
/**
* 配置业务交换机
* @return
*/
@Bean
DirectExchange delayExchange() {
return new DirectExchange(DELAY_EXCHANGE);
}
/**
* 将delayQueue2绑定延时交换机中,routingKey为队列名称
* @return
*/
@Bean
Binding delayBinding() {
return BindingBuilder
.bind(delayQueue())
.to(delayExchange())
.with(DELAY_QUEUE);
}
}
//发送
rabbitTemplate.convertAndSend(
DelayConfig.DELAY_EXCHANGE,
// routingKey
DelayConfig.DELAY_QUEUE,
msg);
1.2、发送消息
//发送
rabbitTemplate.convertAndSend(
DelayConfig.DELAY_EXCHANGE,
// routingKey
DelayConfig.DELAY_QUEUE,
msg);
2、延迟死信队列两种配置方式
2.1、方式1
rabbitTemplate.convertAndSend("Direct.exchange.name", "Direct.queue1.name",
msg, message -> { message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
//此处设置延迟时间 单位毫秒
message.getMessageProperties().setExpiration("10000");
return message;
});
2.2、方式2
/**
* 声明业务队列
* @return Queue
* 在声明业务队列时,创建了一个Map,并且put了两个值,这两个值就是死信队列的声明。
* x-dead-letter-exchange:死信交换机的名称
* x-dead-letter-routing-key:死信交换机的路由键,因为demo中两个交换机的类型都是direct的,因此路由键必须相同。
*/
@Bean
public Queue orderQueue() {
Map<String,Object> arguments = new HashMap<>(2);
// 绑定该队列到死信交换机
arguments.put("x-dead-letter-exchange",DIRCET_EXCHANGE_NAME);
arguments.put("x-dead-letter-routing-key",TEST_QUEUE1_NAME);
// 设置过期时间
arguments.put("x-message-ttl", 4000L);
return new Queue(orderQueue,true,false,false,arguments);
}