在实际生产开发中经常会有这样的场景,因为某些场景需要暂时关闭kafka的监听,比如重刷缓存等,等刷好之后再度开启kafka监听,这里记录一下。
一、首先在监听的地方,给监听加一个id。
public static final String KAFKA_LISTENER_ID = "KAFKA_ID";
@KafkaListener(id = KAFKA_LISTENER_ID,topics = "XXXX")
public void processMessage(TagRequest request) {
//处理逻辑
}
二、添加KafkaListenerEndpointRegistry
@Autowired
private KafkaListenerEndpointRegistry registry;
三、添加关闭监听和开启监听方法
public void stop() {
log.info("Kafkalistener stop...");
registry.getListenerContainer(KAFKA_LISTENER_ID).stop();
}
public void start() {
log.info("Kafkalistener start...");
registry.getListenerContainer(KAFKA_LISTENER_ID).start();
}
这样这两个方法就实现了kafka的动态开启和关闭