flink重温笔记(十二): flink 高级特性和新特性(1)——End-to-End Exactly-Once(端到端精确一致性语义)

时间:2024-03-13 21:14:55

Flink学习笔记

前言:今天是学习 flink 的第 12 天啦!学习了 flink 高级特性和新特性之 End-to-End Exactly-Once(端到端精确一致性语义),主要是解决大数据领域数据从数据源到数据落点的一致性,不会容易造成数据丢失的问题,结合自己实验猜想和代码实践,总结了很多自己的理解和想法,希望和大家多多交流!

Tips:端到端的一致性语义,说明每一步都算术,每一天的努力都不会白费,明天也要继续努力!


文章目录

  • Flink学习笔记
    • 四、Flink 高级特性和新特性
      • 1. End-to-End Exactly-Once
        • 1.1 流处理的数据语义
          • 1.1.1 At most once(最多一次)
          • 1.1.2 At least once(至少一次)
          • 1.1.3 Exactly once(精确一次)
          • 1.1.4 End to End Exactly once(端到端精确一次)
          • 1.1.5 流计算系统如何支持一次性语义
            • (1) At least once + 去重
            • (2) At least once + 幂等
            • (3) 分布式快照
            • (4) 方法汇总
        • 1.2 End-to-End Exactly-Once 实现
          • 1.2.1 Source
          • 1.2.2 Transformation
          • 1.2.3 sink
        • 1.3 Flink + Kafka 的 End-to-End Exactly Once
          • 1.3.1 版本声明
          • 1.3.2 两阶段提交-API
          • 1.3.3 两阶段提交-流程
        • 1.4 案例演示
          • 1.4.1 Flink + Kafka 实现 End-to-End Exactly Once
          • 1.4.2 Flink + MySQL 实现 End-to-End Exactly Once

四、Flink 高级特性和新特性

1. End-to-End Exactly-Once

1.1 流处理的数据语义

顺序:At most once(最多一次)< At least once(至少一次)< Exactly once(精确一次)< End to End Exactly once(端到端一次)


1.1.1 At most once(最多一次)

最简单的恢复方式,直接从失败的下个数据恢复程序,丢失刚刚失败的数据。


1.1.2 At least once(至少一次)

由于事件是可以重传的,可能造成数据重复。


1.1.3 Exactly once(精确一次)

依赖 checkpoint 机制,回滚恢复数据,保持所有记录仅影响内部状态一次,即不考虑部分数据泄露到下游。


1.1.4 End to End Exactly once(端到端精确一次)

Flink 应用从 Source 端开始到 Sink 端结束,保持所有记录影响内部和外部状态一次,即考虑部分数据泄露到下游。


1.1.5 流计算系统如何支持一次性语义
(1) At least once + 去重


(2) At least once + 幂等


(3) 分布式快照


(4) 方法汇总
Exactly Once 实现方式 优点 缺点
At least once + 去重 1. 故障对性能的影响是局部的;
2. 故障的影响不一定随着拓扑大小而增加
1. 可能需要大量的存储和基础设施来支持;
2. 每个算子的每个事件都有资源开销
At least once + 幂等 1. 实现简单,开销较低 1. 依赖存储特性和数据特征
分布式快照 1. 较小的性能和资源开销 1. barrier 同步;
2. 任何算子发生故障都需要全局暂停和状态回滚;
3. 拓扑越大,对性能的潜在影响越大

1.2 End-to-End Exactly-Once 实现
1.2.1 Source

发生故障时需要支持重设数据的读取位置,如Kafka可以通过offset来实现(其他的没有offset系统,可以自己实现累加器计数)


1.2.2 Transformation
  • 分布式快照机制

    • 同 Spark 相比,Spark 仅仅是针对 Driver 的故障恢复 Checkpoint,
    • 而 Flink 的快照可以到算子级别,并且对全局数据也可以做快照,
    • Flink 的分布式快照受到 Chandy-Lamport 分布式快照算法启发,同时进行了量身定做。
  • Barrier

    • 数据栅栏是一个标记,不会干扰正常数据处理,
    • 一个数据源可以有多个 barrier,
    • 多个数据源,快流等慢流。
  • 异步和增量

    • 异步快照不会阻塞任务,
    • 增量快照,每次进行的全量快照是根据上一次更新的。

1.2.3 sink
  • 幂等写入

    • 任意多次向一个系统写入数据,只对目标系统产生一次结果影响。
    • key,和 value 可以控制不重复
  • 事务写入

    • 借鉴数据库的事务机制,结合自身 checkpoint 机制,

    • 分阶段快照,先保存数据不向外部系统提交,checkpoint 确认过上下游一致后,才向外部系统 commit。

    • 实现方式:

      • 1- 预写日志(Write-Ahead-Log,WAL)

        通用性强,但不能保证百分比,因为要写入内存这个易失介质。

      • 2- 两阶段提交(Two-Phase-Commit,2PC)

        如果外部系统自身支持事务(比如MySQL、Kafka),可以使用2PC方式,百分百端到端的Exactly-Once。

    • 缺点:

      • 牺牲了延迟
      • 输出不是实时写入,而是分批写入

1.3 Flink + Kafka 的 End-to-End Exactly Once
1.3.1 版本声明

Flink 1.4 版本之前,支持 Exactly Once 语义,仅限于应用内部。

Flink 1.4 版本之后,通过两阶段提交 (TwoPhaseCommitSinkFunction) 支持 End-To-End Exactly Once,而且要求 Kafka 0.11+。


1.3.2 两阶段提交-API

实现方法封装在抽象类:TwoPhaseCommitSinkFunction ,重写方法:

  • beginTransaction:

    开启事务前,在目标文件系统的临时目录中创建一个临时文件,处理数据时将数据写入此文件;

  • preCommit:

    在预提交阶段,刷写(flush)文件,然后关闭文件,之后就不能写入到文件了,将为下一检查点的任何后续写入启动新事务;

  • commit:

    在提交阶段,将预提交的文件原子性移动到真正的目标目录中,注意,会增加输出数据可见性的延迟;

  • abort:

    在中止阶段,删除临时文件。


1.3.3 两阶段提交-流程
  • 1- 数据源阶段

    对接数据源系统

  • 2- 预提交阶段(pre-commit)-内部状态

    Flink 开始 checkpoint,就会进入 pre-commit 阶段,同时 JobManager 的 Coordinator 会将 Barrier 注入数据流中

  • 3- 预提交阶段(pre-commit)-外部状态

    当所有的 barrier 在算子中成功进行一遍传递(就是 Checkpoint 完成),并完成快照后,则“预提交”阶段完成;

  • 4- commit 阶段

    所有算子完成“预提交”,就会发起一个commit“提交”动作,任何一个“预提交”失败都会回滚到最近的 checkpoint;


1.4 案例演示
1.4.1 Flink + Kafka 实现 End-to-End Exactly Once

例子1:普通方式——内部一致性语义,重点在生产者 API 设置上,只是简单序列化为字节流 SimpleStringSchema

package cn.itcast.day12.endtoend;

import org.apache.commons.lang3.SystemUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaSerializationSchemaWrapper;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;


import java.util.Properties;

/**
 * @author lql
 * @time 2024-03-07 14:51:04
 * @description TODO:topic:test3 终端生产生数据,控制台打印 topic:test4数据
 */
public class Kafka_Flink_Kafka_EndToEnd_ExactlyOnce {
    public static void main(String[] args) throws Exception {

        //todo 1)初始化flink流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(5000);
        
        //todo 2)判断当前的环境
        env.setStateBackend(new HashMapStateBackend());
        if(SystemUtils.IS_OS_WINDOWS || SystemUtils.IS_OS_MAC){
            env.getCheckpointConfig().setCheckpointStorage("file:///D:\\checkpoint");
        }else{
            env.getCheckpointConfig().setCheckpointStorage(args[0]);
        }

        //todo 3)设置checkpoint的其他参数
        //设置checkpoint的超时时间
        env.getCheckpointConfig().setCheckpointTimeout(2000L);
        //同一个时间只能有一个栅栏在运行
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        //设置checkpoint的执行模式。仅执行一次
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //设置checkpoint最小时间间隔
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000L);


        //todo 4)接入数据源
        //指定topic的名称
        String topicName = "test";
        //实例化kafkaConsumer对象
        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test001");
        props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // 消费最新的数据
        props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // 自动提交偏移量offset
        props.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000"); // 提交偏移量的时间间隔
        props.setProperty("flink.partition-discovery.interval-millis", "5000");//开启一个后台线程每隔5s检测一次kafka的分区情况

        FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>(topicName, new SimpleStringSchema(), props);
        //在开启checkpoint以后,offset的递交会随着checkpoint的成功而递交,从而实现一致性语义,默认就是true
        kafkaSource.setCommitOffsetsOnCheckpoints(true);

        DataStreamSource<String> kafkaDS = env.addSource(kafkaSource);
        //todo 5)单词计数操作
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = kafkaDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(Tuple2.of(word, 1));
                }
            }
        });
        //todo 6)单词分组操作
        SingleOutputStreamOperator<Tuple2<String, Integer>> result_1 = wordAndOne
                .keyBy(t -> t.f0)
                .sum(1);

        //todo 7)打印计算结果
        result_1.print();

        SingleOutputStreamOperator<String> result = result_1.map(new MapFunction<Tuple2<String, Integer>, String>() {
            @Override
            public String map(Tuple2<String, Integer> value) throws Exception {
                return value.f0 + "_" + value.f1;
            }
        });

        result.printToErr();

        //todo 8)创建kafka的生产者实例
        //指定topic的名称
        String distTopicName = "test1";
        //实例化FlinkKafkaProducer对象
        Properties distProps = new Properties();
        distProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");

        FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<String>(
                distTopicName,
                new SimpleStringSchema(),
                distProps
        );// 容错
        //todo 4)将数据写入到kafka
        result.addSink(myProducer);

        //todo 8)启动作业
        env.execute();
    }
}

结果:在 node1 的 kafka 生产者模式终端输入数据到 test,词频统计结果写入到 topic:test1,但不保证外部一致性语义


例子2:超级方式——内部外部一致性语义

package cn.itcast.day12.endtoend;

import org.apache.commons.lang3.SystemUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;

import java.util.Properties;

/**
 * @author lql
 * @time 2024-03-07 14:51:04
 * @description TODO:topic:test3 终端生产生数据,控制台打印 topic:test4数据
 */
public class Kafka_Flink_Kafka_EndToEnd_ExactlyOnce_pro {
    public static void main(String[] args) throws Exception {

        //todo 1)初始化flink流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		// 在这里就不能开启了,因为 kafka
        //env.enableCheckpointing(5000);
        //todo 2)判断当前的环境
        env.setStateBackend(new HashMapStateBackend());
        if(SystemUtils.IS_OS_WINDOWS || SystemUtils.IS_OS_MAC){
            env.getCheckpointConfig().setCheckpointStorage("file:///D:\\checkpoint");
        }else{
            env.getCheckpointConfig().setCheckpointStorage(args[0]);
        }

        //todo 3)设置checkpoint的其他参数
        //设置checkpoint的超时时间
        env.getCheckpointConfig().setCheckpointTimeout(2000L);
        //同一个时间只能有一个栅栏在运行
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        //设置checkpoint的执行模式。仅执行一次
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //设置checkpoint最小时间间隔
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000L);


        //todo 4)接入数据源
        //指定topic的名称
        String topicName = "test";
        //实例化kafkaConsumer对象
        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test001");
        props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // 消费最新的数据
        props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); // 自动提交偏移量offset
        props.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "2000"); // 提交偏移量的时间间隔
        props.setProperty("flink.partition-discovery.interval-millis", "5000");//开启一个后台线程每隔5s检测一次kafka的分区情况

        FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>(topicName, new SimpleStringSchema(), props);
        //在开启checkpoint以后,offset的递交会随着checkpoint的成功而递交,从而实现一致性语义,默认就是true
        kafkaSource.setCommitOffsetsOnCheckpoints(true);

        DataStreamSource<String> kafkaDS = env.addSource(kafkaSource);
        //todo 5)单词计数操作
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = kafkaDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(Tuple2.of(word, 1));
                }
            }
        });