Kafka设计解析(二十)Apache Flink Kafka consumer

时间:2022-09-11 16:19:26

转载自 huxihx,原文链接 Apache Flink Kafka consumer

Flink提供了Kafka connector用于消费/生产Apache Kafka topic的数据。Flink的Kafka consumer集成了checkpoint机制以提供精确一次的处理语义。在具体的实现过程中,Flink不依赖于Kafka内置的消费组位移管理,而是在内部自行记录和维护consumer的位移。

用户在使用时需要根据Kafka版本来选择相应的connector,如下表所示:

Maven依赖 支持的最低Flink版本 Kafka客户端类名 说明
flink-connector-kafka-0.8_2.10 1.0.0

FlinkKafkaConsumer08

FlinkKafkaProducer08

使用的是Kafka老版本low-level consumer,即SimpleConsumer. Flink在内部会提交位移到Zookeeper
flink-connector-kafka-0.9_2.10 1.0.0

FlinkKafkaConsumer09

FlinkKafkaProducer09

使用Kafka新版本consumer
flink-connector-kafka-0.10_2.10 1.2.0

FlinkKafkaConsumer010

FlinkKafkaProducer010

支持使用Kafka 0.10.0.0版本新引入的内置时间戳信息

然后,将上面对应的connector依赖加入到maven项目中,比如:

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>1.3.2</version>
</dependency>

Kafka Consumer

Flink kafka connector使用的consumer取决于用户使用的是老版本consumer还是新版本consumer,新旧两个版本对应的connector类名是不同的,分别是:FlinkKafkaConsumer09(或FlinkKafkaConsumer010)以及FlinkKafkaConsumer08。它们都支持同时消费多个topic。

该Connector的构造函数包含以下几个字段:

  1. 待消费的topic列表
  2. key/value解序列化器,用于将字节数组形式的Kafka消息解序列化回对象
  3. Kafka consumer的属性对象,常用的consumer属性包括:bootstrap.servers(新版本consumer专用)、zookeeper.connect(旧版本consumer专用)和group.id

下面给出一个实例:

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
DataStream<String> stream = env.addSource(new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties));

DeserializationSchema

Flink的Kafka consumer需要依靠用户指定的解序列化器来将二进制的数据转换成Java对象。DeserializationSchema接口就是做这件事情的,该接口中的deserialize方法作用于每条Kafka消息上,并把转换的结果发往Flink的下游operator。

通常情况下,用户直接继承AbstractDeserializationSchema来创建新的deserializer,也可以实现DeserializationSchema接口,只不过要自行实现getProducedType方法。

如果要同时解序列化Kafka消息的key和value,则需要实现KeyedDeserializationSchema接口,因为该接口的deserialize方法同时包含了key和value的字节数组。

Flink默认提供了几种deserializer:

  • TypeInformationSerializationSchema(以及TypeInformationKeyValueSerializationSchema):创建一个基于Flink TypeInformation的schema,适用于数据是由Flink读写之时。比起其他序列化方法,这种schema性能更好
  • JsonDeserializationSchema(JSONKeyValueDeserializationSchema):将JSON转换成ObjectNode对象,然后通过ObjectNode.get("fieldName").as(Int/String...)()访问具体的字段。KeyValue

一旦在解序列化过程中出现错误,Flink提供了两个应对方法——1. 在deserialize方法中抛出异常,使得整个作业失败并重启;2. 返回null告诉Flink Kafka connector跳过这条异常消息。值得注意的是,由于consumer是高度容错的,如果采用第一种方式会让consumer再次尝试deserialize这条有问题的消息。因此倘若deserializer再次失败,程序可能陷入一个死循环并不断进行错误重试。

Kafka consumer起始位移配置

Flink的Kafka consumer允许用户配置Kafka consumer的起始读取位移,如下列代码所示:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

FlinkKafkaConsumer08<String> myConsumer = new FlinkKafkaConsumer08<>(...);
myConsumer.setStartFromEarliest(); // start from the earliest record possible
myConsumer.setStartFromLatest(); // start from the latest record
myConsumer.setStartFromGroupOffsets(); // the default behaviour DataStream<String> stream = env.addSource(myConsumer);
...

所有版本的Flink Kafka consumer都可以使用上面的方法来设定起始位移。

  • setStartFromGroupOffsets:这是默认情况,即从消费者组提交到Kafka broker上的位移开始读取分区数据(对于老版本而言,位移是提交到Zookeeper上)。如果未找到位移,使用auto.offset.reset属性值来决定位移。该属性默认是LATEST,即从最新的消息位移处开始消费
  • setStartFromEarliest() / setStartFromLatest():设置从最早/最新位移处开始消费。使用这两个方法的话,Kafka中提交的位移就将会被忽略而不会被用作起始位移

Flink也支持用户自行指定位移,方法如下:

ap<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L);
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L);
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L); myConsumer.setStartFromSpecificOffsets(specificStartOffsets);

上面的例子中,consumer将从用户指定的位移处开始读取消息。这里的位移记录的是下一条待消费消息的位移,而不是最新的已消费消息的位移。值得注意的是,如果待消费分区的位移不在保存的位移映射中,Flink Kafka connector会使用默认的组位移策略(即setStartFromGroupOffsets())。

另外,当任务自动地从失败中恢复或手动地从savepoint中恢复时,上述这些设置位移的方法是不生效的。在恢复时,每个Kafka分区的起始位移都是由保存在savepoint或checkpoint中的位移来决定的。

Kafka consumer容错性

一旦启用了Flink的检查点机制(checkpointing),Flink Kafka消费者会定期地对其消费的topic做checkpoint以保存它消费的位移以及其他操作的状态。一旦出现失败,Flink将会恢复streaming程序到最新的checkpoint状态,然后重新从Kafka消费数据,重新读取的位置就是保存在checkpoint中的位移。

checkpoint的间隔决定了程序容错性的程度,它直接确定了在程序崩溃时,程序回溯到的最久状态。

如果要使用启动容错性的Kafka消费者,定期对拓扑进行checkpoint就是非常必要的,实现方法如下面代码所示:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // 每5秒做一次checkpoint  

需要注意的是,只有槽位(slot)充足Flink才会重启拓扑,因此一旦拓扑因无法连接TaskManager而崩溃,仍然需要有足够的slot才能重启拓扑。如果使用YARN的话,Flink能够自动地重启丢失的YARN容器。

如果没有启用checkpoint,那么Kafka consumer会定期地向Zookeeper提交位移。

Kafka consumer位移提交

Flink Kafka consumer可以自行设置位移提交的行为。当然,它不依赖于这些已提交的位移来实现容错性。这些提交位移只是供监控使用。

配置位移提交的方法各异,主要依赖于是否启用了checkpointing机制:

  • 未启用checkpointing:Flink Kafka consumer依赖于Kafka提供的自动提交位移功能。设置方法是在Properties对象中配置Kafka参数enable.auto.commit(新版本Kafka consumer)或auto.commit.enable(老版本Kafka consumer)
  • 启用checkpointing:Flink Kafka consumer会提交位移到checkpoint状态中。这就保证了Kafka中提交的位移与checkpoint状态中的位移是一致的。用户可以调用setCommitOffsetsCheckpoints(boolean)方法来禁用/开启位移提交——默认是true,即开启了位移提交。注意,这种情况下,Flink会忽略上一种情况中提及的Kafka参数

Kafka consumer时间戳提取/水位生成

通常,事件或记录的时间戳信息是封装在消息体中。至于水位,用户可以选择定期地发生水位,也可以基于某些特定的Kafka消息来生成水位——这分别就是AssignerWithPeriodicWatermaks以及AssignerWithPunctuatedWatermarks接口的使用场景。

用户也能够自定义时间戳提取器/水位生成器,具体方法参见这里,然后按照下面的方式传递给consumer:

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test"); FlinkKafkaConsumer08<String> myConsumer =
new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties);
myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter()); DataStream<String> stream = env
.addSource(myConsumer)
.print();

在内部,Flink会为每个Kafka分区都执行一个对应的assigner实例。一旦指定了这样的assigner,对于每条Kafka中的消息,extractTimestamp(T element, long previousElementTimestamp)方法会被调用来给消息分配时间戳,而getCurrentWatermark()方法(定时生成水位)或checkAndGetNextWatermark(T lastElement, long extractedTimestamp)方法(基于特定条件)会被调用以确定是否发送新的水位值。

Kafka设计解析(二十)Apache Flink Kafka consumer的更多相关文章

  1. Kafka设计解析(十八)Kafka与Flink集成

    转载自 huxihx,原文链接 Kafka与Flink集成 Apache Flink是新一代的分布式流式数据处理框架,它统一的处理引擎既可以处理批数据(batch data)也可以处理流式数据(str ...

  2. Kafka设计解析(十六)Kafka 0&period;11消息设计

    转载自 huxihx,原文链接 [原创]Kafka 0.11消息设计 目录 一.Kafka消息层次设计 1. v1格式 2. v2格式 二.v1消息格式 三.v2消息格式 四.测试对比 Kafka 0 ...

  3. Kafka设计解析(十五)Kafka controller重设计

    转载自 huxihx,原文链接 Kafka controller重设计 目录 一.Controller是做什么的 二.Controller当前设计 三.Controller组成 四.Controlle ...

  4. Kafka设计解析(十四)Kafka producer介绍

    转载自 huxihx,原文链接 Kafka producer介绍 Kafka 0.9版本正式使用Java版本的producer替换了原Scala版本的producer.本文着重讨论新版本produce ...

  5. Kafka设计解析(十九)Kafka consumer group位移重设

    转载自 huxihx,原文链接 Kafka consumer group位移重设 本文阐述如何使用Kafka自带的kafka-consumer-groups.sh脚本随意设置消费者组(consumer ...

  6. &lbrack;Big Data - Kafka&rsqb; Kafka设计解析(五):Kafka Benchmark

    性能测试及集群监控工具 Kafka提供了非常多有用的工具,如Kafka设计解析(三)- Kafka High Availability (下)中提到的运维类工具——Partition Reassign ...

  7. Kafka设计解析(十)Kafka如何创建topic

    转载自 huxihx,原文链接 Kafka如何创建topic? 目录 一.命令行部分 二.后台逻辑部分 Kafka创建topic命令很简单,一条命令足矣: bin/kafka-topics. --re ...

  8. &lbrack;Big Data - Kafka&rsqb; Kafka设计解析(四):Kafka Consumer解析

    High Level Consumer 很多时候,客户程序只是希望从Kafka读取数据,不太关心消息offset的处理.同时也希望提供一些语义,例如同一条消息只被某一个Consumer消费(单播)或被 ...

  9. Kafka设计解析(十二)Kafka 如何读取offset topic内容 &lpar;&lowbar;&lowbar;consumer&lowbar;offsets&rpar;

    转载自 huxihx,原文链接 Kafka 如何读取offset topic内容 (__consumer_offsets) 众所周知,由于Zookeeper并不适合大批量的频繁写入操作,新版Kafka ...

  10. &lbrack;Big Data - Kafka&rsqb; Kafka设计解析(三):Kafka High Availability (下)

    Kafka是由LinkedIn开发的一个分布式的消息系统,使用Scala编写,它以可水平扩展和高吞吐率而被广泛使用.目前越来越多的开源分布式处理系统如Cloudera.Apache Storm.Spa ...

随机推荐

  1. Integer&period;valueof&lpar;String s&rpar;和Integer&period;parseInt&lpar;String s&rpar;的具体区别是什么?

    Integer.valueof(String s)和Integer.parseInt(String s)的具体区别是什么? Integer.valueof(String s)是将一个包装类是将一个实际 ...

  2. Fragment全解析系列(二):正确的使用姿势

    作为一个稳定的app,从后台且回到前台,一定会在任何情况都能恢复到离开前的页面,并且保证数据的完整性. 如果你没看过本系列的第一篇,为了方便后面文章的介绍,先规定一个"术语",安卓 ...

  3. spring&period;net

    Spring.Net有两个很重要的感念就是IoC(控制反转)和DI(依赖注入). IoC.英文全称Inversion of Control.控制反转.DI.英文全称Dependency Injecti ...

  4. supervisor安装配置与使用

    supervisor:C/S架构的进程控制系统,可使用户在类UNIX系统中监控.管理进程.常用于管理与某个用户或项目相关的进程. 组成部分supervisord:服务守护进程supervisorctl ...

  5. JavaBean规范

    JavaBean规范 (1)JavaBean 类必须是一个公共类,并将其访问属性设置为 public  ,如: public class user{ …}(2)JavaBean 类必须有一个空的构造函 ...

  6. C&num; web项目利用docx文档作为模板~为打印专做的解决方案

    还是那句话:十年河东,十年河西,莫欺少年穷. 目前,web端打印技术有很多,有收费的专业web打印控件,大家可以参考我的上一篇博客.当然,很多公司不愿意出钱,那么今天咱们就探讨下怎么做免费的打印. w ...

  7. EventKey为last&lowbar;trade&lowbar;no的subscribe关注事件

    如果用户曾经在该公众号有支付行为,关注的时候EventKey中将包含上次交易订单号,如 last_trade_no_4002752001201704258347703919 <xml> & ...

  8. SQL中的float类型的数据

    问题1.  如何在SQL中默认的使用float类型的数据 SQL中想要通过计算的方式最快的得到一个float类型的数据,只需要运算的其中一个值后面加上小数点就ok. 比如 :9/2=4 但是 :9/2 ...

  9. 牛客网 桂林电子科技大学第三届ACM程序设计竞赛 C&period;二元-K个二元组最小值和最大-优先队列&plus;贪心&lpar;思维&rpar;

    链接:https://ac.nowcoder.com/acm/contest/558/C来源:牛客网 小猫在研究二元组. 小猫在研究最大值. 给定N个二元组(a1,b1),(a2,b2),…,(aN, ...

  10. 【【模板】严格次小生成树&lbrack;BJWC2010&rsqb;】

    树上的路径怎么能没有树剖 显然,次小生成树和最小生成树只在一条边上有差距,于是我们就可以枚举这一条边,将所有边加入最小生成树,之后再来从这些并不是那么小的生成树中找到那个最小的 我们往最小生成树里加入 ...