1.引入依赖
由于我们直接使用Spring Cloud Stream 集成Kafka,官方也已经有现成的starter。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<version>2.1.0.RELEASE</version>
</dependency>
2.kafka配置
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
auto-add-partitions: false
auto-create-topics: true
min-partition-count: 1
configuration:
max:
request:
size: 83886080
bindings:
#消费者
input_cruise:
destination: cruise_third_client_t2
group: s3${SERVER_NUM:node2}
consumer:
autoCommitOffset: false
concurrency: 1
partitioned: false
resetOffsets: true
startOffset: latest
#生产者
output_cruise:
destination: cruise_third_client_t2
content-type: application/json
producer:
partitionCount: 1
sync: true
bufferSize: 83886080
properties:
max:
request:
# 10M
size: 83886080
3.生产者示例
首先需要定义生产者MessageChannel,这里会用到Output注解
public interface CruiseSource {
String OUTPUT = "output_cruise";
@Output(OUTPUT)
MessageChannel output();
}
使用MessageChannel 发送消息。
@Slf4j
@EnableBinding(CruiseSource.class)
public class CruiseMsgSource {
@Resource private CruiseSource cruiseSource;
/**
* <发送消息到队列 此方法为通用>
*
* @param msg
* @param manufacturer
* @param msgType
* @author qius
* @updator qius 消费消息
* @see com.airlook.cruise.server.msg.thirdclient.CruiseMsgSink#accessRecordProcess(Message)
* @since 2020/8/6 16:40
*/
@Async
public void sendMsg(String msg, Integer manufacturer, Integer msgType) {
// 传输对象
cruiseSource
.output()
.send(
MessageBuilder.withPayload(msg)
.setHeader(CommonConstants.KAFKA_THIRD_PARTY_SERVER_FLAG_HEADER, manufacturer)
.setHeader(CommonConstants.KAFKA_AIR_LOOK_MSG_TYPE_FLAG_HEADER, msgType)
.build());
}
}
4.消费者示例
首先需要定义SubscribableChannel 接口方法使用Input注解。
public interface CruiseSink {
String INPUT = "input_cruise";
@Input(INPUT)
SubscribableChannel input();
}
通过@StreamListener监听通道的消息
@Slf4j
@EnableBinding(CruiseSink.class)
public class MessageSinkHandler {
@StreamListener(target = CruiseSink.INPUT)
public void handler(Message<String> msg){
System.out.println(" received message : "+msg);
}
}
参考文章 https://blog.csdn.net/lt_xiaodou/article/details/126527224