自定义无限制来源如何在Google Cloud DataFlow中运行?

时间:2022-09-12 15:44:13

I'm trying to implement custom unbounded source to Google Cloud Dataflow to read from Amazon Kinesis queue. In order to implement checkpointing properly, I'd like to understand how the mechanism works exactly.

我正在尝试为Google Cloud Dataflow实施自定义无限制来源,以便从Amazon Kinesis队列中读取。为了正确实现检查点,我想了解机制是如何工作的。

How DataFlow works

I was trying to understand checkpoints by reading the documentation for DataFlow, but there are some crucial things missing, so I went through a MillWheel paper. Let me first explain how I understood the concept laid out in this paper. I will focus on interaction between source and its consumer in strong production setup in terms of dataflow API:

我试图通过阅读DataFlow的文档来了解检查点,但是缺少一些关键的东西,所以我浏览了一篇MillWheel论文。首先让我解释一下我如何理解本文中提出的概念。在数据流API方面,我将重点介绍源代码与其消费者之间在强大的生产设置中的交互:

  • createReader() is called on source with null value passed as a CheckpointMark
  • 在源上调用createReader(),并将null值作为CheckpointMark传递
  • start() is called on reader instance
  • 在reader实例上调用start()
  • advance() is called X times on reader
  • advance()在读者上被调用X次
  • now the worker decides to make a checkpoint mark. It calls getCheckpointMark() on the reader.
  • 现在工人决定制作一个检查站标记。它在阅读器上调用getCheckpointMark()。
  • checkpoint is persisted by worker
  • 检查点由工人持久化
  • finalizeCheckpoint() is called on checkpoint object
  • 在checkpoint对象上调用finalizeCheckpoint()
  • data read so far is sent to the consumer, who stores the records in cache in order to deduplicate for possible retries
  • 到目前为止读取的数据被发送给消费者,消费者将记录存储在高速缓存中以便对可能的重试进行重复数据删除
  • consumer sends ACK to the source. At this point checkpoint is removed from source and when ACK is accepted, then consumer removes the records from cache (because the source won't retry at this point)
  • 消费者向源发送ACK。此时检查点从源中删除,当接受ACK时,消费者从缓存中删除记录(因为此时源不会重试)
  • if source fails to receive ACK, then it will create new reader instance passing the last checkpoint as the argument and it will retry sending data to the consumer. If the consumer receives retried data, it will try to deduplicate
  • 如果源无法接收ACK,那么它将创建新的读取器实例,将最后一个检查点作为参数传递,它将重试向消费者发送数据。如果消费者收到重试数据,它将尝试进行重复数据删除
  • everything is repeated. How it exactly happens is unclear to me: is the first reader instance used to continue reading from stream? Or the new reader with null checkpoint mark is created in order to do this? Or is the second reader (with checkpoint data) used to continue reading from stream?
  • 一切都重复了。它是如何发生的并不清楚:是第一个用于继续从流中读取的读者实例?或者创建具有空检查点标记的新阅读器以执行此操作?或者是用于继续从流中读取的第二个读取器(带有检查点数据)?

PubSub vs. Kinesis

Now, please let me say a few words about how Kinesis queue operates, because it has significant differences with Pub/Sub (for as much as I understand how Pub/Sub works, I haven't been using it myself).

现在,请让我谈谈Kinesis队列如何运作,因为它与Pub / Sub存在显着差异(因为我了解Pub / Sub的工作方式,我自己并没有使用它)。

Pub/Sub

I see that Pub/Sub pull model heavily relies on ACKs, i.e. the messages received by client are ACKed and then the "internal checkpoint" in Pub/Sub moves forward -> this means that upcoming pull request will receive consecutive records after the previous ACK.

我看到Pub / Sub pull模型严重依赖于ACK,即客户端接收的消息被确认,然后Pub / Sub中的“内部检查点”向前移动 - >这意味着即将到来的pull请求将在前一个ACK之后接收连续记录。

Kinesis

Kinesis pull interface (there's no push here at all) is more similar to how you interact with a file. You can start reading at any location in the stream (with special values TRIM_HORIZON being the oldest record in the stream and LATEST being latest record in the stream) and then move forward record by record using iterator (iterators are stored on server side and have 5 minutes expiry time if unused). There are no ACKs to the server - it's responsibility of the client to keep track the position in the stream and you can always re-read old records (unless they have expired, of course).

Kinesis pull界面(根本没有推送)更类似于你与文件的交互方式。您可以在流中的任何位置开始读取(特殊值TRIM_HORIZON是流中最旧的记录,LATEST是流中的最新记录)然后使用迭代器按记录向前移动(迭代器存储在服务器端并具有5分钟到期时间,如果未使用)。服务器没有ACK - 客户端负责跟踪流中的位置,并且您可以随时重新读取旧记录(当然,除非它们已过期)。

Question / Issues

  • how the checkpoint should look like? Is a reader, given checkpoint, expected to read just part of data it relates to or is it expected to read all data from the checkpoint? In other words should my checkpoint be like: "data between x and y" or "all data after x"?
  • 检查点应该如何?给定检查点的读者是否只读取与其相关的部分数据,或者是否希望从检查点读取所有数据?换句话说,我的检查点应该是:“x和y之间的数据”或“x之后的所有数据”?
  • I know that the first reader gets null as checkpoint mark and that's perfectly fine - it means that I should start reading from point defined by application developer. But can DataFlow create other readers with null like this (for example, I'd imagine the situation when reader jvm dies, then DataFlow creates new one with new reader passing null as checkpoint)? In such situation I don't know what is my starting position as I might have already read some data using previous reader and now the mark of progress is lost.
  • 我知道第一个读取器作为检查点标记变为空,这非常好 - 这意味着我应该从应用程序开发人员定义的点开始阅读。但是,DataFlow可以像这样创建其他读取器吗(例如,我想象读取器jvm死亡的情况,然后DataFlow创建新的读取器传递null作为检查点的新读者)?在这种情况下,我不知道我的起始位置是什么,因为我可能已经使用以前的读者阅读了一些数据,现在进度的标记丢失了。
  • what id is used for deduplication of records on consumer side? Is it value returned by getCurrentRecordId? I'm asking this question, because I thought about using the position in the stream for that, because it's unique for particular stream. But what would happen if I later join few kinesis sources by flattening them -> this would lead to situation where different records may share the same id. Should I rather use (stream name, position) tuple for the id (which is unique in this case).
  • 哪个id用于消费者方面的记录重复数据删除?它是由getCurrentRecordId返回的值吗?我问这个问题,因为我考虑过使用流中的位置,因为它对于特定的流是唯一的。但是,如果我稍后通过展平它们来加入几个kinesis来源会发生什么 - >这会导致不同记录可能共享相同ID的情况。我应该使用(流名称,位置)元组作为id(在这种情况下是唯一的)。

Cheers, Przemek

干杯,Przemek

1 个解决方案

#1


3  

We are excited to see that you’re using Dataflow with Kinesis. We would love a pull request to our GitHub project with a contrib connector for Kinesis. We would also be happy to review your code via GitHub as you develop and give you feedback there.

我们很高兴看到您正在使用Dataflow和Kinesis。我们希望通过Kinesis的contrib连接器向我们的GitHub项目提出拉取请求。我们也很乐意在您开发时通过GitHub审核您的代码并在那里给您反馈。

how the checkpoint should look like? Is a reader, given checkpoint, expected to read just part of data it relates to or is it expected to read all data from the checkpoint? In other words should my checkpoint be like: "data between x and y" or "all data after x"?

检查点应该如何?给定检查点的读者是否只读取与其相关的部分数据,或者是否希望从检查点读取所有数据?换句话说,我的检查点应该是:“x和y之间的数据”或“x之后的所有数据”?

The checkpoint mark should represent “data that has been produced and finalized by this reader”. E.g., if a reader is responsible for a specific shard, the checkpoint mark might consist of the shard identifier and the last sequence number Y within that shard that has been successfully read, indicating “all data up to and including Y has been produced”.

检查点标记应代表“由此读者生成并最终确定的数据”。例如,如果读者负责特定分片,则检查点标记可能包括分片标识符和该分片中已成功读取的最后序列号Y,表示“已生成所有数据,包括Y”。

I know that the first reader gets null as checkpoint mark and that's perfectly fine - it means that I should start reading from point defined by application developer. But can DataFlow create other readers with null like this (for example, I'd imagine the situation when reader jvm dies, then DataFlow creates new one with new reader passing null as checkpoint)? In such situation I don't know what is my starting position as I might have already read some data using previous reader and now the mark of progress is lost.

我知道第一个读取器作为检查点标记变为空,这非常好 - 这意味着我应该从应用程序开发人员定义的点开始阅读。但是,DataFlow可以像这样创建其他读取器吗(例如,我想象读取器jvm死亡的情况,然后DataFlow创建新的读取器传递null作为检查点的新读者)?在这种情况下,我不知道我的起始位置是什么,因为我可能已经使用以前的读者阅读了一些数据,现在进度的标记丢失了。

Finalized checkpoints are persisted, even across JVM failure. In other words, when a JVM dies, the reader will be constructed with the last checkpoint that has been finalized. You should not see readers created with null checkpoints unless they are intended to read from the beginning of a source, or in your scenario when the JVM died before the first successful call to finalizeCheckpoint(). You can use the checkpoint mark at the new reader to construct a new iterator for the same shard that starts from the next record to be read, and you can continue without data loss.

即使在JVM失败的情况下,也会保留最终的检查点。换句话说,当JVM死亡时,读者将使用最终确定的最后一个检查点构建。您不应该看到使用空检查点创建的读者,除非它们打算从源的开头读取,或者在JVM在第一次成功调用finalizeCheckpoint()之前死亡的情况下读取。您可以使用新读取器上的检查点标记为从要读取的下一条记录开始的相同分片构造新的迭代器,并且可以继续而不会丢失数据。

what id is used for deduplication of records on consumer side? Is it value returned by getCurrentRecordId? I'm asking this question, because I thought about using the position in the stream for that, because it's unique for particular stream. But what would happen if I later join few kinesis sources by flattening them -> this would lead to situation where different records may share the same id. Should I rather use (stream name, position) tuple for the id (which is unique in this case).

哪个id用于消费者方面的记录重复数据删除?它是由getCurrentRecordId返回的值吗?我问这个问题,因为我考虑过使用流中的位置,因为它对于特定的流是唯一的。但是,如果我稍后通过展平它们来加入几个kinesis来源会发生什么 - >这会导致不同记录可能共享相同ID的情况。我应该使用(流名称,位置)元组作为id(在这种情况下是唯一的)。

In Dataflow, each UnboundedSource (that implements getCurrentRecordId and overrides requiresDeduping to return true) is de-duped on its own. Thus, record IDs are only required to be unique for the same source instance. Records from two different sources can use the same record IDs, and they will not be treated as “duplicates” during flattening. So if Amazon Kinesis guarantees that all records have IDs that are globally unique (across all shards within a stream) and persistent (across resharding operations, for example), then these should be suitable for use as the record ID.

在Dataflow中,每个UnboundedSource(实现getCurrentRecordId和覆盖requireDeduping返回true)都是自己去掉的。因此,记录ID仅对同一源实例是唯一的。来自两个不同来源的记录可以使用相同的记录ID,并且在展平期间不会将它们视为“重复”。因此,如果Amazon Kinesis保证所有记录都具有全局唯一的ID(跨流中的所有分片)和持久性(例如,通过重新分片操作),那么这些ID应该适合用作记录ID。

Note that getCurrentRecordId is an optional method for UnboundedReader-- you do not need to implement it if your checkpointing scheme uniquely identifies each record. Kinesis lets you read records in sequence number order, and it looks like sequence numbers are globally unique. Thus you might be able to assign each shard to a different worker in generateInitialSplits, and each worker may not ever produce duplicate data -- in this case, you may not need to worry about record IDs at all.

请注意,getCurrentRecordId是UnboundedReader的可选方法 - 如果检查点方案唯一标识每条记录,则无需实现它。 Kinesis允许您按顺序编号顺序读取记录,看起来序列号是全局唯一的。因此,您可以将每个分片分配给generateInitialSplits中的不同工作程序,并且每个工作程序可能不会生成重复数据 - 在这种情况下,您可能根本不需要担心记录ID。

Most of this answer has assumed the simple case where your Kinesis streams do not ever change their shards. On the other hand, if the sharding on the stream changes then your solution will become more complex. E.g., each worker could be responsible for more than 1 shard, so the checkpoint mark would be a map of shard -> sequence number instead of sequence number. And split and merged shards may move around between different Dataflow workers to balance load, and it may be hard to guarantee that no Kinesis record is ever read twice by two different workers. In this case, using Kinesis record IDs with the semantics you described should suffice.

大多数答案都假设了一个简单的情况,即你的Kinesis流不会改变它们的碎片。另一方面,如果流上的分片发生变化,那么您的解决方案将变得更加复杂。例如,每个工人可能负责超过1个碎片,因此检查点标记将是碎片 - >序列号而不是序列号的映射。拆分和合并的分片可以在不同的数据流工作者之间移动以平衡负载,并且可能很难保证两个不同的工作人员不会读取两次Kinesis记录。在这种情况下,使用带有您描述的语义的Kinesis记录ID就足够了。

#1


3  

We are excited to see that you’re using Dataflow with Kinesis. We would love a pull request to our GitHub project with a contrib connector for Kinesis. We would also be happy to review your code via GitHub as you develop and give you feedback there.

我们很高兴看到您正在使用Dataflow和Kinesis。我们希望通过Kinesis的contrib连接器向我们的GitHub项目提出拉取请求。我们也很乐意在您开发时通过GitHub审核您的代码并在那里给您反馈。

how the checkpoint should look like? Is a reader, given checkpoint, expected to read just part of data it relates to or is it expected to read all data from the checkpoint? In other words should my checkpoint be like: "data between x and y" or "all data after x"?

检查点应该如何?给定检查点的读者是否只读取与其相关的部分数据,或者是否希望从检查点读取所有数据?换句话说,我的检查点应该是:“x和y之间的数据”或“x之后的所有数据”?

The checkpoint mark should represent “data that has been produced and finalized by this reader”. E.g., if a reader is responsible for a specific shard, the checkpoint mark might consist of the shard identifier and the last sequence number Y within that shard that has been successfully read, indicating “all data up to and including Y has been produced”.

检查点标记应代表“由此读者生成并最终确定的数据”。例如,如果读者负责特定分片,则检查点标记可能包括分片标识符和该分片中已成功读取的最后序列号Y,表示“已生成所有数据,包括Y”。

I know that the first reader gets null as checkpoint mark and that's perfectly fine - it means that I should start reading from point defined by application developer. But can DataFlow create other readers with null like this (for example, I'd imagine the situation when reader jvm dies, then DataFlow creates new one with new reader passing null as checkpoint)? In such situation I don't know what is my starting position as I might have already read some data using previous reader and now the mark of progress is lost.

我知道第一个读取器作为检查点标记变为空,这非常好 - 这意味着我应该从应用程序开发人员定义的点开始阅读。但是,DataFlow可以像这样创建其他读取器吗(例如,我想象读取器jvm死亡的情况,然后DataFlow创建新的读取器传递null作为检查点的新读者)?在这种情况下,我不知道我的起始位置是什么,因为我可能已经使用以前的读者阅读了一些数据,现在进度的标记丢失了。

Finalized checkpoints are persisted, even across JVM failure. In other words, when a JVM dies, the reader will be constructed with the last checkpoint that has been finalized. You should not see readers created with null checkpoints unless they are intended to read from the beginning of a source, or in your scenario when the JVM died before the first successful call to finalizeCheckpoint(). You can use the checkpoint mark at the new reader to construct a new iterator for the same shard that starts from the next record to be read, and you can continue without data loss.

即使在JVM失败的情况下,也会保留最终的检查点。换句话说,当JVM死亡时,读者将使用最终确定的最后一个检查点构建。您不应该看到使用空检查点创建的读者,除非它们打算从源的开头读取,或者在JVM在第一次成功调用finalizeCheckpoint()之前死亡的情况下读取。您可以使用新读取器上的检查点标记为从要读取的下一条记录开始的相同分片构造新的迭代器,并且可以继续而不会丢失数据。

what id is used for deduplication of records on consumer side? Is it value returned by getCurrentRecordId? I'm asking this question, because I thought about using the position in the stream for that, because it's unique for particular stream. But what would happen if I later join few kinesis sources by flattening them -> this would lead to situation where different records may share the same id. Should I rather use (stream name, position) tuple for the id (which is unique in this case).

哪个id用于消费者方面的记录重复数据删除?它是由getCurrentRecordId返回的值吗?我问这个问题,因为我考虑过使用流中的位置,因为它对于特定的流是唯一的。但是,如果我稍后通过展平它们来加入几个kinesis来源会发生什么 - >这会导致不同记录可能共享相同ID的情况。我应该使用(流名称,位置)元组作为id(在这种情况下是唯一的)。

In Dataflow, each UnboundedSource (that implements getCurrentRecordId and overrides requiresDeduping to return true) is de-duped on its own. Thus, record IDs are only required to be unique for the same source instance. Records from two different sources can use the same record IDs, and they will not be treated as “duplicates” during flattening. So if Amazon Kinesis guarantees that all records have IDs that are globally unique (across all shards within a stream) and persistent (across resharding operations, for example), then these should be suitable for use as the record ID.

在Dataflow中,每个UnboundedSource(实现getCurrentRecordId和覆盖requireDeduping返回true)都是自己去掉的。因此,记录ID仅对同一源实例是唯一的。来自两个不同来源的记录可以使用相同的记录ID,并且在展平期间不会将它们视为“重复”。因此,如果Amazon Kinesis保证所有记录都具有全局唯一的ID(跨流中的所有分片)和持久性(例如,通过重新分片操作),那么这些ID应该适合用作记录ID。

Note that getCurrentRecordId is an optional method for UnboundedReader-- you do not need to implement it if your checkpointing scheme uniquely identifies each record. Kinesis lets you read records in sequence number order, and it looks like sequence numbers are globally unique. Thus you might be able to assign each shard to a different worker in generateInitialSplits, and each worker may not ever produce duplicate data -- in this case, you may not need to worry about record IDs at all.

请注意,getCurrentRecordId是UnboundedReader的可选方法 - 如果检查点方案唯一标识每条记录,则无需实现它。 Kinesis允许您按顺序编号顺序读取记录,看起来序列号是全局唯一的。因此,您可以将每个分片分配给generateInitialSplits中的不同工作程序,并且每个工作程序可能不会生成重复数据 - 在这种情况下,您可能根本不需要担心记录ID。

Most of this answer has assumed the simple case where your Kinesis streams do not ever change their shards. On the other hand, if the sharding on the stream changes then your solution will become more complex. E.g., each worker could be responsible for more than 1 shard, so the checkpoint mark would be a map of shard -> sequence number instead of sequence number. And split and merged shards may move around between different Dataflow workers to balance load, and it may be hard to guarantee that no Kinesis record is ever read twice by two different workers. In this case, using Kinesis record IDs with the semantics you described should suffice.

大多数答案都假设了一个简单的情况,即你的Kinesis流不会改变它们的碎片。另一方面,如果流上的分片发生变化,那么您的解决方案将变得更加复杂。例如,每个工人可能负责超过1个碎片,因此检查点标记将是碎片 - >序列号而不是序列号的映射。拆分和合并的分片可以在不同的数据流工作者之间移动以平衡负载,并且可能很难保证两个不同的工作人员不会读取两次Kinesis记录。在这种情况下,使用带有您描述的语义的Kinesis记录ID就足够了。