通过KIP32,Kafka的每条消息都加进了时间戳,这个KIP在0.10.0.0被加入。
说到“时间”,先贴张图,娱乐一下(如果对星球大战系列电影不熟的话,请自动略过……)
这个KIP的文档在
KIP-32 - Add timestamps to Kafka message
下面贴一下这个KIP的关键部分,俺的注解部分用灰色的字标识。
Motivation 动机
This KIP tries to address the following issues in Kafka.
- Log retention might not be honored: Log retention is currently at the log segment level, and is driven off the last modification time of a log segment. This approach does not quite work when a replica reassignment happens because the newly created log segment will effectively have its modification time reset to now.
- Log rolling might break for a newly created replica as well because of the same reason as (1).
- Some use cases such as streaming processing needs a timestamp in messages.
To solve the above issues, we propose to add a timestamp to Kafka messages.
前两个原因都和replica的重新分配有关,replica重新分配就是把某个分区的副本迁移到另一台机器上,通常是为了调节机器的负载,增加副本数,或者移除机器。在进行replica迁移的时候,Kafka会在迁移的目的地新建一个replica,并且从当前的leader处抓取消息,直到新的副本和leader同步,然后停掉不再保留的replica。相关的文档在这里。 那么,这里就存在一个问题,就是新的replic在获得数据时,从leader的哪个offset开始拉取数据呢?如果直接从最新的数据开始拉,那么这个replica的数据就不足以承担它作为“副本”的任务,因此,肯定是从最旧的offset开始拉。这个操作,在源码里的调用过程挺复杂的……大概会经过LeaderAndIsrRequest 处理 -> becomeFollower -> createReplica -> ReplicaFetcherThreader.handleOffsetOutOfRange,然后新的replica就会从leader的最早的offset开始拉取消息,并且写成文件。这里就迁扯到了Log的rolling和retention的问题。retention的本意是为了消除不再需要的消息或者限制Kafka在本地存储的大小。当为了第一个目的时,应该删除已经保存了很久的消息,这个“很久”是指消息进入到Kafka的时间(或者消息产生的时间)距离当前时间过了很久。roll的本意是为了保持单个文件不要太大,过大的文件不利于retention。但是,由于Kafka的消息中没有时间戳,所以新的replica是不知道消息真正进入Kafka的时间(或产生的时间)的,所以roll和rotention机制就无法正确地工作。“无法正确的工作”表现在log retention是依据于log segment(对应于一个log文件)的最后修改时间,而log rolling是依据于依据于log segment的创建时间。当replica reassign发生时,新的replica里最初的这些log segment的创建时间和修改时间都不能反应这个log segment里边消息的产生或处理时间。
第三个增加timestamp的原因,是由于流处理系统的需要,比如上边那幅图,是StephanEwan在讲Apache Flink的时间举的例子。
Public Interfaces 与外部协议相关的改变
This KIP has the following public interface changes:
- Add a new timestamp field to the message format.
- Use the forth least significant bit to indicate the timestamp type. (0 for CreateTime, 1 for LogAppendTime)
- Add the following two configurations to the broker
- message.timestamp.type - This topic level configuration defines the type of timestamp in the messages of a topic. The valid values are CreateTime or LogAppendTime.
- max.message.time.difference.ms - This configuration only works when message.timestamp.type=CreateTime. The broker will only accept messages whose timestamp differs no more than max.message.time.difference.ms from the broker local time.
- Add a timestamp field to ProducerRecord and ConsumerRecord. A producer will be able to set a timestamp for a ProducerRecord. A consumer will see the message timestamp when it sees the messages.
- Add ProduceRequest/ProduceResponse V2 which uses the new message format.
- Add a timestamp in ProduceResponse V2 for each partition. The timestamp will either be LogAppendTime if the topic is configured to use it or it will be NoTimestamp if create time is used.
- Add FetchRequest/FetchResponse V2 which uses the new message format.
- Add a timestamp variable to RecordMetadata. The timestamp is the timestamp of messages appended to partition log.
最重要的有三点:
1. 由用户来指定这个时间戳的确切含义,可以指定两种含义里一个:1. 创建时间,2. 消息append到log的时间。当用户指定时间戳的含义是create time时,broker会拒绝消息的create time与它进入到broker的时间相差过大的消息。实际上,对于用户而言,这是个艰难的选择。例如,选择create time的话,这个timestamp是由用户指定的,所以就可能在错误或者偏差,从而影响到rolling和retention的正常工作(比如可能根本不retention,从而写满磁盘)。对这种情况,可以通过max.message.time.difference.ms来避免。但是,还有些其它的情况比这种要复杂,在这个KIP的文档里讨论了各种选择的优缺点。
2. 用户可以通过producer指定这个时间以人为戳,consumer可以获得这个时间戳。但是时间戳的含义还是由第一点来确定。
3. 需要更改Kafka协议,所以会导致与旧版本的兼容性问题。在KIP的文档里描述了升级的方案,所以也不必过去担心。
CreateTime和LogAppendTime的优劣(简要)
There are three options proposed before this proposal. The details of option 1, option 2 and Option 3 are in the Rejected Alternatives section.
The key decision we made in this KIP is whether use LogAppendTime(Broker Time) or CreateTime(Application Time)
The good things about LogAppendTime are:
- Broker is more robust.
- Monotonically increasing.
- Deterministic behavior for log rolling and retention.
- If CreateTime is required, it can always be put into the message payload.
LogAppendTime的好处是:
- Broker更加健壮(与create time相比,timestamp完全在broker的掌握之中,所以broker的行为更确定)
- 单调增长
- log rolling和retention的行为是确定的
- 如果需要CreateTime,总是可以把它放在消息的负载中
The good things about CreateTime are:
- More intuitive to users.
- User may want to have log retention based on when the message is created instead of when the message enters the pipeline.
- Immutable after entering the pipeline.
CreateTime的好处在于:
- 对于用户更直观
- 用户可能希望log retention基于消息的创建时间而不是消息进入流水线(指消息的处理流程)的时间。
- 进入流水线以后就不再改变。
Because both LogAppendTime and CreateTime have their own use cases, the proposed change provides users with the flexibility to choose which one they want to use.
For more detail discussion please refer to the mail thread as well as the Rejected Alternatives section.
This KIP is closely related to KIP-33. Some of the contents listed in this KIP are actually part of KIP-33. They are put here because they are important for the design decision. More specifically, KIP-33 will implement the following changes:
- Build a time index for each log segment using the timestamps in messages.
- Enforce log retention and log rolling use time based index.
由于LogAppendTime和CreateTime各自有它们的使用场景,这个KIP的提议是由用户自己选择。
关于更详尽地讨论请参照邮件组的讨论,以及下面的Rejected Alternatives一节。
这个KIP与KIP-33紧密相关,KIP-33将会做下面的变化:
1. 基于消息中的时间戳,为每个log segment创建基于时间的索引。
2. 使用基于时间的索引来加强log retention和log rolling。
加了时间戳以后,Kafka的工作细节
- 允许用户在生产消息时候加入时间戳
- 当一个作为leader的broker收到消息时
- 如果message.timestamp.type=LogAppendTime, broker会用自己的本地时间覆盖消息的时间戳,并且把消息追加到log
- 如果收到的是压缩的消息,那么包装后的消息的TS(timestamp)将会用当前的服务器时间覆盖。Broker将会把包装后消息的timestamp type位 置为1。Broker会忽略内部消息的时间戳。在使用LogAppendTime时,之所以不修改每个内部消息的TS,是为了避免重新压缩带来的性能损失。
- 如果消息没有压缩,那么消息的TS将会被覆盖为服务器的本地时间
- 如果message.timestamp.type=CreateTime
- 如果时间差在max.message.time.difference.ms之内,那么broker将会接收这个消息并且把它追加到log。对于压缩后的消息,broker将会把压缩后消息的TS更新为内部消息的最大的TS。
- 如果时间差超过了max.message.time.difference.ms, broker将会以TimestampExceededThresholdException的形式拒绝整批消息。
- 如果message.timestamp.type=LogAppendTime, broker会用自己的本地时间覆盖消息的时间戳,并且把消息追加到log
- 当一个follwer broker收到一个消息时
- 如果这个消息是压缩后的消息,follower broker会使用压缩后消息的TS来构建索引。也就是说,一个压缩后消息的TS总是它的所有内部消息的TS里最大的一个(译注:之所以这么做,是为了构建索引,这与按TS索引的算法有关)。
- 如果这个消息是一个没有压缩的消息,那么这个消息的TS将会被用来构建索引。
- 当一个consumer收到消息时
- 如果这个消息是一个压缩后的消息
- 如果包装后消息的timestamp属性位是0(CreateTime),那么将会使用内部消息的时间戳
- 如果包装后消息的timestamp属性位是1,那么包装消息的TS将会被用作内部消息的TS
- 如果消息是一个没有压缩的消息,那么这个消息的TS将会被使用。
- 如果这个消息是一个压缩后的消息
- message.timestamp.type和max.message.time.difference.ms将会是可以按topic配置的
- 在ProduceResponseV2中,每个partition都会返回一个TS
- 如果topic使用LogAppendTime,那么返回的TS将会是这个message set的LogAppendTime.
- 如果topic使用CreateTime,那么返回的TS将会是NoTimestamp
- 如果producer为每个消息启用了callback,那么如果produce response不是NoTimestamp,它就会使用produce response中的TS,否则就使用producer记录的的TS。
- 在这种情况下,producer将无法分辨TS是LogAppendTime还是CreateTime。
- 基于使用的索引有以下的保证(请注意基于时间的索引将会在KIP-33中被实现,而不是这个KIP。之所以在这里讨论索引的问题,是因为它和这个KIP的设计紧密相关)
- 如果用户索引一个时间戳:
- 所以在这个时间戳之后的消息都将会被消息
- 用户可能会看到更早的消息
- log retention将会按照时间索引文件的最后一个条目。因为最后一个条目将会是整个log segment里将新的timestamp。如果这个entry过期了,那么整个log segment将会被删除。
- log rolling将会依据所有见到过的消息的最大的timestamp。If no message is appended in log.roll.ms after largest appended timestamp, a new log segment will be rolled out.(????这个不合逻辑呀,明显要用最早的timestamp)
- 如果用户索引一个时间戳:
- 这个提议的不好的方面包括:
- 如果message.timestamp.typ=CreateTime的话,timestamp可能不会是递增的
- log retention可能不是确定的。也就是说,一个应该被删除的消息现在依赖于同一个log segment的其它消息。并且,这些由用户提供的时间戳依赖于被配置的时间差的阀值(即,max.message.time.differnece.ms)。
- 尽管这个提议有这些缺点,但是它给了用户使有时间戳的灵活性。
- 如果message.timestamp.type=CreateTime
- 如果时间差阀值被设为Long.MaxValue,那么消息里的时间戳就等于CreateTime
- 如果时间差阀值在0和Long.MaxValue之间,就能保证消息的时间戳总能在一个确定的范围之内
- 如果message.timestamp.type=LogAppendTime,那么时间戳就会是log append time.
- 如果message.timestamp.type=CreateTime
总结:
关于时间戳的类型的选择:CreateTime还是LogAppendTime,还是得依据于具体的使用场景。比如,如果强烈需要使用event time来进行后续的处理,那就只能选create time。重要的是在选择好一种类型以后,了解它对于Kafka的各种行为的影响。