当先锋百科网

首页 1 2 3 4 5 6 7

1.引入依赖
由于我们直接使用Spring Cloud Stream 集成Kafka,官方也已经有现成的starter。

<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-starter-stream-kafka</artifactId>
   <version>2.1.0.RELEASE</version>
</dependency>

2.kafka配置

  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092
          auto-add-partitions: false
          auto-create-topics: true
          min-partition-count: 1
          configuration:
            max:
              request:
                size: 83886080
      bindings:
        #消费者
        input_cruise:
          destination: cruise_third_client_t2
          group: s3${SERVER_NUM:node2}
          consumer:
            autoCommitOffset: false
            concurrency: 1
            partitioned: false
            resetOffsets: true
            startOffset: latest
        #生产者
        output_cruise:
          destination: cruise_third_client_t2
          content-type: application/json
          producer:
            partitionCount: 1
            sync: true
            bufferSize: 83886080
            properties:
              max:
                request:
                  # 10M
                  size: 83886080

3.生产者示例
首先需要定义生产者MessageChannel,这里会用到Output注解

public interface CruiseSource {
    String OUTPUT = "output_cruise";

    @Output(OUTPUT)
    MessageChannel output();
}

使用MessageChannel 发送消息。

@Slf4j
@EnableBinding(CruiseSource.class)
public class CruiseMsgSource {

  @Resource private CruiseSource cruiseSource;

  /**
   * <发送消息到队列 此方法为通用>
   *
   * @param msg
   * @param manufacturer
   * @param msgType
   * @author qius
   * @updator qius 消费消息
   * @see com.airlook.cruise.server.msg.thirdclient.CruiseMsgSink#accessRecordProcess(Message)
   * @since 2020/8/6 16:40
   */
  @Async
  public void sendMsg(String msg, Integer manufacturer, Integer msgType) {
    // 传输对象
    cruiseSource
        .output()
        .send(
            MessageBuilder.withPayload(msg)
                .setHeader(CommonConstants.KAFKA_THIRD_PARTY_SERVER_FLAG_HEADER, manufacturer)
                .setHeader(CommonConstants.KAFKA_AIR_LOOK_MSG_TYPE_FLAG_HEADER, msgType)
                .build());
  }
  
}

4.消费者示例
首先需要定义SubscribableChannel 接口方法使用Input注解。

public interface CruiseSink {

    String INPUT = "input_cruise";

    @Input(INPUT)
    SubscribableChannel input();

}

通过@StreamListener监听通道的消息

@Slf4j
@EnableBinding(CruiseSink.class)
public class MessageSinkHandler {
 
    @StreamListener(target = CruiseSink.INPUT)
    public void handler(Message<String> msg){
        System.out.println(" received message : "+msg);

    }
}

参考文章 https://blog.csdn.net/lt_xiaodou/article/details/126527224