当先锋百科网

首页 1 2 3 4 5 6 7

1.kafka connector版本选取

Flink有多个Kafka connector:universal,0.10和0.11。 Flink 1.7 开始就有这个universal的Kafka connector通用版本,跟Kafka client端的尽量保持最新版本。这个版本的Kafka客户端向后兼容代理版本0.10.0或更高版本。对于大多数用户而言,universal的Kafka连接器是最合适的。对于Kafka版本0.11.x和0.10.x,我们建议分别使用专用的0.11和0.10连接器。

Kafka 版本universal (>= 1.0.0)0.11.x0.10.x
Maven 依赖flink-connector-kafka_2.11flink-connector-kafka-011_2.11flink-connector-kafka-010_2.11
消费者生产者的类名称FlinkKafkaConsumer FlinkKafkaProducerFlinkKafkaConsumer011 FlinkKafkaProducer011FlinkKafkaConsumer010 FlinkKafkaProducer010

2.kafka consumer

Flink的Kafka 消费者 FlinkKafkaConsumer(或FlinkKafkaConsumer011Kafka 0.11.x或FlinkKafkaConsumer010Kafka 0.10.x)

如果只需要kafka消息的value话,可以使用SimpleStringSchema来new FlinkKafkaConsumer

需要输入以下参数:

  1. topic name / list of topic names 一个或者多个topic name
  2. DeserializationSchema / KafkaDeserializationSchema用于反序列化来自Kafka的数据
  3. Properties for the Kafka consumer。需要以下Properties :
    • bootstrap.servers”(Kafka broker )
    • group.id” 消费者组的ID
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
DataStream<String> stream = env.addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties));

如果需要获得Kafka的消息的key、value 和元数据,就需要通过实现KafkaDeserializationSchema接口方法deserialize 来实现

代码

import java.util.Properties

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTuple2TypeInformation, createTypeInformation}
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.flink.streaming.api.scala._

//读取kafka中数据  key value全部读出来
object ReadKafka {
  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //设置连接kafka的配置信息
    val props = new Properties()
    props.setProperty("bootstrap.servers","node06:9092,node07:9092,node08:9092")
    props.setProperty("group.id","flink-kafka-001")
    props.setProperty("key.deserializer",classOf[StringSerializer].getName)
    props.setProperty("value.deserializer",classOf[StringSerializer].getName)

    //第一个参数 : 消费的topic名
    val stream = env.addSource(new FlinkKafkaConsumer[(String, String)]("kafkaTest", new KafkaDeserializationSchema[(String, String)] {
      //什么时候停止,停止条件是什么
      override def isEndOfStream(t: (String, String)): Boolean = false

      //要进行序列化的字节流
      override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String) = {
        val key = new String(consumerRecord.key(), "UTF-8")
        val value = new String(consumerRecord.value(), "UTF-8")
        (key, value)
      }

      //指定一下返回的数据类型  Flink提供的类型
      override def getProducedType: TypeInformation[(String, String)] = {
        createTuple2TypeInformation(createTypeInformation[String], createTypeInformation[String])
      }
    }, props))
    stream.print()
    env.execute()
  }
}

##2.1 Kafka Consumer offset 配置
Flink Kafka Consumer可以配置如何确定Kafka分区的起始位置。

val env = StreamExecutionEnvironment.getExecutionEnvironment()

val myConsumer = new FlinkKafkaConsumer[String](...)
myConsumer.setStartFromEarliest()      // start from the earliest record possible
myConsumer.setStartFromLatest()        // start from the latest record
myConsumer.setStartFromTimestamp(...)  // start from specified epoch timestamp (milliseconds)
myConsumer.setStartFromGroupOffsets()  // the default behaviour

val stream = env.addSource(myConsumer)
...

Flink Kafka Consumer的所有版本都具有上述用于offset 消费显式配置方法。

  • setStartFromGroupOffsets(默认行为):开始从消费者组的(group.id在消费者属性中的设置)在Kafka代理中提交的偏移中读取分区。如果找不到分区的偏移量,auto.offset.reset则将使用属性中的设置。
  • setStartFromEarliest()setStartFromLatest():从最早/最新记录开始。在这些模式下,Kafka中已提交的偏移将被忽略,不会用作起始位置。
  • setStartFromTimestamp(long):从指定的时间戳开始。对于每个分区,其时间戳大于或等于指定时间戳的记录将用作开始位置。如果分区的最新记录早于时间戳,则仅从最新记录中读取分区。在这种模式下,Kafka中已提交的偏移将被忽略,并且不会用作起始位置。

还可以为每个分区指定使用者应从其开始的确切偏移量:

val specificStartOffsets = new java.util.HashMap[KafkaTopicPartition, java.lang.Long]()
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L)
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L)
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L)

myConsumer.setStartFromSpecificOffsets(specificStartOffsets)

上面的示例将配置从分区0、1和2的指定offset开始消费 myTopic。offset应该是为每个分区读取的下一条记录。请注意,如果指定分区的偏移量没有在这个tropic中对应上,那么它将默认使用setStartFromGroupOffsets()来消费topic

请注意,当作业从故障中自动还原或使用savepoint手动还原时,这些起始位置配置方法不会影响起始位置。还原时,每个Kafka分区的开始位置由savepoint或checkpoint中存储的offset来确定

2.2 Kafka Consumers容错

开启Flink的checkpointing后,Flink Kafka Consumer 会记录topic中的offset,并定期把offset以及其他operations 状态一起 checkpoint。如果作业失败,Flink 会恢复到最新checkpoint状态,并根据checkpoint中的offset开始重新消费Kafka。


3. Kafka Producer

Flink的Kafka 生产者者 FlinkKafkaConsumer(或FlinkKafkaConsumer011Kafka 0.11.x或FlinkKafkaConsumer010Kafka 0.10.x)

如果只需要kafka消息的value话,可以使用SimpleStringSchema来new FlinkKafkaProducer

需要输入以下参数:
需要输入以下参数:

  1. topic name 要输出的kafka topic name
  2. SerializationSchema / KafkaSerializationSchema 用于反序列化来自Kafka的数据
  3. Properties for the Kafka Producer。需要以下Properties :
    • bootstrap.servers”(Kafka broker )
  4. 容错的语义
val stream: DataStream[String] = ...

Properties properties = new Properties
properties.setProperty("bootstrap.servers", "localhost:9092")

val myProducer = new FlinkKafkaProducer[String](
        "my-topic",                  // target topic
        new SimpleStringSchema(),    // serialization schema
        properties,                  // producer config
        FlinkKafkaProducer.Semantic.EXACTLY_ONCE) // fault-tolerance

stream.addSink(myProducer)

3.1 Kafka Producers 容错

启用Flink的checkpoint后,FlinkKafkaProducer011(FlinkKafkaProducer 对应Kafka> = 1.0.0版本)可以提供exactly-once的保证

除了启用Flink的checkpoint,可以通过设置适当的semantic参数传递给FlinkKafkaProducer011(FlinkKafkaProducer对应Kafka> = 1.0.0版本)
选择三种不同的语义:

  • Semantic.NONE:Flink不能保证任何事情。产生的记录可能会丢失或可以重复。
  • Semantic.AT_LEAST_ONCE (默认设置):至少一次(可能重复写入)。
  • Semantic.EXACTLY_ONCE:Kafka事务将用于提供精确一次的语义。每当您使用事务写入Kafka时,请不要忘记为使用Kafka记录的任何应用程序设置所需的设置isolation.level(read_committed 或read_uncommitted-后者是默认值)。 这是因为Kafka的事务支持是给写入的数据分为committed和uncomitted,如果使用默认配置的consumer,读取的时候依然会读取所有数据而不是根据事务隔离。

注意事项
####1. Semantic.EXACTLY_ONCE模式 transaction timeout 事务超时问题
Semantic.EXACTLY_ONCE模式下, 如果Flink application crash到完成重启之间的时间大于Kafka的事务超时时间,则将丢失数据,因为kafka会自动中止超过超时时间的事务。所以两个重要的设置需要配置一下。
- Kafka brokers: transaction.max.timeout.ms 默认值是15分钟
- FlinkKafkaProducer011 : transaction.timeout.ms 默认值是1小时
因此 使用Semantic.EXACTLY_ONCE 模式时,必须要把 transaction.max.timeout.ms大于一小时

####2. read_committed 事件阻塞问题

Kafka的事务工作流程如下:

  1. 开启一个事务,将所有属于此事务内的消息(写入)追加到partition的末尾,并标注这些消息为uncommitted
  2. 在一个事务committed后,这些标记变为committed
  3. 从Kafka topic消费消息的consumer,可以配置为一个isolation级别(通过isolation.level属性进行配置),申明是否它可以读uncommitted消息,可读参数为read_uncommitted,也是默认配置。不可读的参数为read_committed。如果consumer被配置为read_committed,则它会在遇到一个uncommitted消息后,停止从一个partition消费数据,并在消息变为committed后,恢复消费数据。

在事务隔离级别为read_committed ,任何未完成(未中止或未完成)的事务会把这个topic的所有read操作阻塞掉。下面是提交两个transaction 的流程例子:
1) User started transaction1 and written some records using it
2) User started transaction2 and written some further records using it
3) User committed transaction2
尽管transaction2的记录已经提交,在transaction1提交或中止之前,transaction2是不可以被read。

3. KafkaProducers pool

Semantic.EXACTLY_ONCE模式每个FlinkKafkaProducer011实例使用一个固定大小的KafkaProducers池。每个检查点使用这些生产者中的每一个。如果并发检查点的数量超过池大小,FlinkKafkaProducer011 将引发异常,并使整个应用程序失败。请相应地配置最大池大小和最大并发检查点数

4. Troubleshooting

如果在使用Flink时Kafka遇到问题,问题可能与Flink无关,Flink也是通过包装 KafkaConsumer或 KafkaProducer来读写kafka。
有时可以通过在Flink中升级Kafka brokers,重新配置Kafkabrokers 或重新配置KafkaConsumer或KafkaProducer来解决。

下面列出了一些常见问题的示例。

4.1 Data loss

Kafka集群的默认配置可能会导致数据丢失(即使一个写操作已经被ack)。我们需要特别注意一下Kafka启动参数:

  • acks
  • log.flush.interval.messages
  • log.flush.interval.ms
  • log.flush.*
    建议查阅一下Kakfa 官方文档,对这些配置信息有更进一步的了解。

4.2 UnknownTopicOrPartitionException

在重启Kafka brokers 之后或期间,新的领导者选举也会导致这个Exception。
这是一个可重试的异常,因此Flink作业应该能够重新启动并恢复正常操作。也可以通过retries在生产者设置中更改属性来规避。但是,这可能会导致消息重新排序,如果不希望出现的话,可以通过设置max.in.flight.requests.per.connection为1 来避免。

参考 zackstang博客
参考flink 官网 Apache Kafka Connector