Kafka-参数详解

时间:2024-10-18 15:20:49

一、上下文

从《Kafka-初识》中可以看到运行kafka-console-producer和 kafka-console-consumer来生产和消费数据时会打印很多参数,这些参数给我们应对多种场景提供了遍历,除了producer和consumer的提供了参数外,Kafka服务器集群中的broker也提供了相应的参数,我们可以在server.properties中找到对应的参数,下面我们就从producer、consumer、broker三方面对Kafka的参数分别做下整理。

二、producer

acks        默认值:cdh中为1 官网为all

producer要求分区所属leader在考虑请求完成之前收到的确认数量。这控制了发送的记录的持久性。允许以下设置:

acks=0如果设置为零,则生产者根本不会等待服务器的任何确认。记录将立即添加到套接字缓冲区并被视为已发送。在这种情况下,无法保证服务器已收到记录,重试配置将不会生效(因为客户端通常不会知道任何故障)。每条记录的偏移量将始终设置为-1。

acks=1这意味着leader将把记录写入其本地日志,但不会等待所有followers的完全确认。在这种情况下,如果leader在确认记录后立即失败,但在followers复制记录之前失败,那么记录就会丢失。

acks=all这意味着leader将等待整套同步副本确认记录。这保证了只要至少有一个同步副本仍然有效,记录就不会丢失。这是最有力的保证。这相当于acks=-1设置。

请注意,启用幂等性要求此配置值为“all”。如果设置了冲突的配置,并且未显式启用幂等性,则禁用幂等性。
batch.size        默认值:16384 16k

每当多条记录被发送到同一分区时,生产者将尝试将记录一起批处理成更少的请求。这有助于提高客户端和服务器的性能。此配置控制默认批大小(以字节为单位)。

将不会尝试批量处理大于此大小的记录。

发送给broker的请求将包含多个批次,每个分区一个批次,其中包含可发送的数据。

小批量将使批处理不那么常见,并可能降低吞吐量(批量大小为零将完全禁用批处理)。非常大的批处理大小可能会更浪费内存,因为我们总是会分配指定批处理大小的缓冲区,以应对额外的记录。

注意:此设置给出了要发送的批大小的上限。如果此分区的累积字节数少于此数,我们将“徘徊”linger.ms时间,等待更多记录出现。此linger.ms设置默认为0,这意味着即使累积的批大小在此batch.size设置下,我们也会立即发送一条记录。
bootstrap.servers        默认值:cdh中为:[cdh1:9092, cdh2:9092, cdh3:9092] ,因为cdh整体为你提供的服务,只能你选了哪些节点安装kafka。 官网为 ""

用于建立与Kafka集群的初始连接的主机/端口对列表。客户端将使用所有服务器,而不管这里指定了哪些服务器进行引导——此列表仅影响用于发现整套服务器的初始主机。此列表的格式应为host1:port1,host2:port2,...由于这些服务器仅用于初始连接以发现完整的集群成员资格(这可能会动态变化),因此此列表不需要包含完整的服务器集(不过,如果服务器停机,您可能需要多个服务器)。
buffer.memory        默认值:33554432

生产者可以用来缓冲等待发送到服务器的记录的内存总字节数。如果记录的发送速度超过了它们可以传递到服务器的速度,生产者将阻止max.block.ms,之后将抛出异常。

此设置应大致对应于生产者将使用的总内存,但不是硬限制,因为生产者使用的并非所有内存都用于缓冲。一些额外的内存将用于压缩(如果启用了压缩)以及维护正在进行的请求。
client.dns.lookup        默认值:use_all_dns_ips

控制客户端如何使用DNS查找。如果设置为use_all_dns_ips,则按顺序连接到每个返回的IP地址,直到建立成功的连接。断开连接后,将使用下一个IP。一旦所有IP都被使用一次,客户端就会再次从主机名解析IP(但是JVM和操作系统缓存DNS名称查找都是如此)。如果设置为resolve_canonical_boottrap_servers_only,则将每个引导地址解析为规范名称列表。在引导阶段之后,其行为与use_all_dns_ips相同。
client.id        默认值:cdh为:console-producer  官网为 ""

发出请求时传递给服务器的id字符串。这样做的目的是通过允许在服务器端请求日志中包含逻辑应用程序名称,能够跟踪除ip/port之外的请求来源。
compression.type        默认值:cdh为:none 官网为:producer

指定给定topic的最终压缩类型。此配置接受标准压缩编解码器('gzip'、'snappy'、'lz4'、'zstd')。它还接受“不压缩”,这相当于没有压缩;以及“producer,这意味着保留生产者设置的原始压缩编解码器。
connections.max.idle.ms        默认值:cdh为:540000 官网为:600000  10min

空闲连接超时:服务器Socket处理器线程关闭空闲时间超过此时间的连接
delivery.timeout.ms        默认值:120000 2min

send() 调用返回后报告成功或失败的时间上限。这限制了记录在发送之前延迟的总时间、等broker确认的时间(如果预期的话)以及允许重试发送失败的时间。如果遇到不可恢复的错误、重试次数已用尽或记录被添加到已达到较早交付截止日期的批中,则生产者可能会报告未能在此配置之前发送记录。此配置的值应大于或等于request.timeout.ms和linger.ms之和
enable.idempotence        默认值:cdh为:false 官网为:true

当设置为“true”时,生产者将确保流中只写入每条消息的一个副本。如果为“false”,由于代理失败等原因,生产者重试,可能会在流中写入重试消息的副本。请注意,启用幂等性要求max.in.flight.requests.per.connection小于或等于5(保留任何允许值的消息顺序),重试次数大于0,并且acks必须为“all”。

如果没有设置冲突的配置,默认情况下会启用Idempotence。如果设置了冲突的配置,并且未显式启用幂等性,则禁用幂等性。如果显式启用了幂等性并且设置了冲突的配置,则会抛出ConfigException。
interceptor.classes        默认值:[]

用作拦截器的类列表。实现org.apache.kafka.clients.producer。ProducerInterceptor接口允许我们在生产者接收到的记录发布到Kafka集群之前对其进行拦截(并可能进行变异)。默认情况下,没有拦截器。
key.serializer        默认值:class org.apache.kafka.common.serialization.ByteArraySerializer

实现org.apache.kafka.common.serialization的键的序列化器类。序列化程序接口。
linger.ms        默认值:cdh为:1000 官网为:0

生产者将请求传输之间到达的任何记录组合成一个批处理请求。通常,只有在记录到达速度快于发送速度时,才会在负载下发生这种情况。然而,在某些情况下,即使在中等负载下,客户端也可能希望减少请求数量。此设置通过添加少量的人为延迟来实现这一点,也就是说,生产者不会立即发送记录,而是等待给定的延迟来允许发送其他记录,以便可以将发送一起批处理。这可以被认为类似于TCP中的Nagle算法。此设置给出了批处理延迟的上限:一旦我们为一个分区获得了batch.size值的记录,无论此设置如何,它都会立即发送,但是如果我们为这个分区累积的字节数少于这个数量,我们将在指定的时间内“徘徊”,等待更多的记录出现。此设置默认为0(即无延迟)。例如,设置linger.ms=5可以减少发送的请求数量,但在没有负载的情况下,会给发送的记录增加5毫秒的延迟。
max.block.ms        默认值:60000 1min

该配置控制KafkaProducer的send()、partitionsFor()、initTransactions()、sendOffsetsToTransaction()、commitTransaction()和abortTransaction()方法将阻塞多长时间。对于send(),此超时限制了等待元数据获取和缓冲区分配的总时间(用户提供的序列化器或分区器中的阻塞不计入此超时)。对于partitionsFor(),如果元数据不可用,则此超时限制了等待元数据的时间。与事务相关的方法总是阻塞,但如果无法发现事务协调器或在超时内没有响应,则可能会超时。
max.in.flight.requests.per.connection        默认值:5

在阻塞之前,客户端将在单个连接上发送的未确认请求的最大数量。请注意,如果此配置设置为大于1并且enable.idempotence设置为false,则由于重试(即如果启用了重试)导致发送失败后,存在消息重新排序的风险;如果禁用重试或enable.idempotence设置为true,则将保留顺序。此外,启用幂等性要求此配置的值小于或等于5。如果设置了冲突的配置,并且未显式启用幂等性,则禁用幂等性。
max.request.size        默认值:1048576

请求的最大大小(字节)。此设置将限制生产者在单个请求中发送的记录批数,以避免发送大量请求。这也有效地限制了最大未压缩记录批大小。请注意,服务器对记录批大小有自己的上限(如果启用了压缩,则在压缩后),这可能与此不同。
metadata.max.age.ms        默认值:300000  5min

一段时间(毫秒),在此之后,即使我们没有看到任何分区领导层发生变化,我们也会强制刷新元数据,以主动发现任何新的broker或分区。
metric.reporters        默认值:[]

用作指标报告器的类列表。实现org.apache.kafka.common.metrics.MetricsReporter接口允许插入将收到新度量创建通知的类。始终包含JmxReporter来注册JMX统计信息
metrics.num.samples        默认值:2

为计算度量而维护的样本数量
metrics.recording.level        默认值:INFO

指标的最高记录级别
metrics.sample.window.ms        默认值:30000  30s

计算度量样本的时间窗口。
partitioner.class        默认值:class  cdh为: org.apache.kafka.clients.producer.internals.DefaultPartitioner  官网为:null

确定生成记录时将记录发送到哪个分区。可用选项包括:

1、如果未设置,则使用默认分区逻辑。此策略将记录发送到分区,直到该分区至少产生batch.size字节。它与以下策略协同工作:

        a、如果没有指定分区,但存在key,请根据key的哈希值选择分区。

        b、如果没有分区或key,请选择在分区产生至少batch.size字节时发生变化的粘性分区。

2、org.apache.kafka.clients.producer.RoundRobinPartitioner:一种分区策略,其中一系列连续记录中的每条记录都被发送到不同的分区,无论是否提供了“key”,直到分区用完,过程重新开始。注意:在创建新批次时,有一个已知的问题会导致分布不均。更多详细信息请参见KAFKA-9965。

实现org.apache.kafka.clients.producer.Partitioner接口允许您插入自定义分区器。
receive.buffer.bytes        默认值:32768  32k

读取数据时使用的TCP接收缓冲区(SO_RCVBUF)的大小。如果该值为-1,则将使用操作系统默认值。
reconnect.backoff.max.ms        默认值:1000  1s

重新连接到反复连接失败的代理时等待的最长时间(毫秒)。如果提供,每台主机的回退将随着每次连续连接失败呈指数级增长,直至达到此最大值。在计算退避增加后,添加20%的随机抖动以避免连接风暴。
reconnect.backoff.ms        默认值:50

尝试重新连接到给定主机之前等待的基本时间量。这避免了在紧环中重复连接到主机。此回退适用于客户端向代理的所有连接尝试。此值是初始回退值,每次连续连接失败时都会呈指数级增长,最高可达reconnect.backoff.max.ms值。
request.timeout.ms        默认值:cdh为:1500 官网为:40000  40s

配置控制客户端等待请求响应的最长时间。如果在超时时间过去之前未收到响应,则客户端将在必要时重新发送请求,或者在重试次数用尽时使请求失败。
retries        默认值:cdh为:3  官网为:0

设置大于零的值将导致客户端重新发送任何失败的请求,并可能出现暂时性错误。建议将该值设置为零或“MAX_VALUE”,并使用相应的超时参数来控制客户端应重试请求的时间。
retry.backoff.ms        默认值:100

在尝试重试对给定topic分区的失败请求之前等待的时间量。这避免了在某些故障情况下在紧循环中重复发送请求。此值是初始退避值,对于每个失败的请求,它将呈指数级增加,直到retry.backff.max.ms值。
transaction.timeout.ms        默认值:60000  1min

事务在协调器主动中止之前保持打开状态的最长时间(毫秒)。事务的开始时间是在向其添加第一个分区时设置的。如果此值大于代理中的transaction.max.timeout.ms设置,则请求将失败,并出现InvalidTxnTimeoutException错误。
transactional.id        默认值:null

用于事务传递的TransactionalId。这实现了跨越多个生产者会话的可靠性语义,因为它允许客户端保证在开始任何新事务之前,使用相同TransactionalId的事务已经完成。如果没有提供TransactionalId,则生产者仅限于幂等传递。如果配置了TransactionalId,则暗示了enable.idempotence。默认情况下,未配置TransactionId,这意味着无法使用事务。请注意,默认情况下,交易需要至少三个broker的集群,这是生产的推荐设置;对于开发,您可以通过调整代理设置transaction.state.log.replication.factor来更改此设置。
value.serializer        默认值:class cdh为: org.apache.kafka.common.serialization.ByteArraySerializer 官网为:null

实现org.apache.kafka.common.serialization的值的序列化器类

三、consumer

auto.commit.interval.ms        默认值:5000  5s

如果enable.auto.commit设置为true,则消费者偏移量自动提交到Kafka的频率(毫秒)
auto.offset.reset        默认值:cdh为:earliest 官网为:latest

当Kafka中没有初始偏移量时,或者如果服务器上不再存在当前偏移量(例如,因为该数据已被删除),该怎么办:

earliest:自动将偏移重置为最早的偏移

latest:自动将偏移量重置为最新偏移量

none:如果没有找到消费者组的先前偏移量,则向消费者抛出异常

anything else:向消费者抛出例外。

请注意,在将此配置设置为最新时更改分区号可能会导致消息传递丢失,因为生产者可能会在消费者重置其偏移量之前开始向新添加的分区发送消息(即还不存在初始偏移量)。
bootstrap.servers        默认值:因为通过cm界面安装kafka时已经指定了机器,因此可以给出具体的列表:[cdh1:9092, cdh2:9092, cdh3:9092] 官网:localhost:9092      

用于建立与Kafka集群的初始连接的主机/端口对列表。客户端将使用所有服务器,而不管这里指定了哪些服务器进行引导——此列表仅影响用于发现整套服务器的初始主机。此列表的格式应为host1:port1,host2:port2,...由于这些服务器仅用于初始连接以发现完整的集群成员资格(这可能会动态变化),因此此列表不需要包含完整的服务器集(不过,如果服务器停机,您可能需要多个服务器)。
check.crcs        默认值:true

自动检查所消耗记录的CRC32。这确保了消息不会在线路或磁盘上发生损坏。此检查会增加一些开销,因此在寻求极致性能的情况下可能会将其禁用。
client.dns.lookup        默认值:cdh为:default 官网为:use_all_dns_ips

控制客户端如何使用DNS查找。如果设置为use_all_dns_ips,则按顺序连接到每个返回的IP地址,直到建立成功的连接。断开连接后,将使用下一个IP。一旦所有IP都被使用一次,客户端就会再次从主机名解析IP(但是JVM和操作系统缓存DNS名称查找都是如此)。如果设置为resolve_canonical_bootstrap_servers_only,则将每个引导地址解析为规范名称列表。在引导阶段之后,其行为与use_all_dns_ips相同。
client.id = 

发出请求时传递给服务器的id字符串。这样做的目的是通过允许在服务器端请求日志中包含逻辑应用程序名称,能够跟踪除ip/port之外的请求来源。
connections.max.idle.ms        默认值:cdh为:540000 9min  官网为:300000 5min

在此配置指定的毫秒数后关闭空闲连接
default.api.timeout.ms        默认值:60000 1min

指定客户端API的超时时间(以毫秒为单位)。此配置用作所有未指定超时参数的客户端操作的默认超时。
enable.auto.commit        默认值:cdh中为:false  官网为:true

如果为真,则消费者的偏移将定期在后台提交。
exclude.internal.topics        默认值:true

是否应从订阅中排除与订阅模式匹配的内部topic。始终可以明确地订阅内部topic。
fetch.max.bytes        默认值:52428800  50m

服务器应为获取请求返回的最大数据量。记录由消费者分批提取,如果提取的第一个非空分区中的第一个记录批次大于此值,则仍将返回记录批次,以确保消费者能够取得进展。因此,这不是一个绝对的最大值。broker接受的最大记录批大小是通过message.max.bytes(代理配置)或max.message.bytes(主题配置)定义的。请注意,消费者并行执行多个提取。
fetch.max.wait.ms        默认值:500

服务器在响应获取请求之前将阻止的最长时间——没有足够的数据来立即满足fetch.min.bytes给出的要求。此配置仅用于本地日志获取。要调整远程获取的最大等待时间,请参阅“remote.reache.max.wait.ms”broker 配置
fetch.min.bytes        默认值:1

服务器应为获取请求返回的最小数据量。如果可用数据不足,请求将等待积累那么多数据后再回复请求。1字节的默认设置意味着,一旦有那么多字节的数据可用,或者获取请求在等待数据到达时超时,就会得到响应。将其设置为更大的值将导致服务器等待大量数据累积,这可以以一些额外的延迟为代价提高服务器吞吐量。
group.id        默认值:console-consumer-26289 默认时没有值的,需要用户指定,如果没有指定,kafka给你默认分配了一个

标识此工作程序所属的Connect群集组的唯一字符串
heartbeat.interval.ms        默认值:3000 3s

使用Kafka的组管理工具时,组协调员心跳之间的预期时间。心跳用于确保工人的会话保持活跃,并在新成员加入或离开组时促进重新平衡。该值必须设置为低于session.timeout.ms,但通常不应设置为高于该值的1/3。它可以调整得更低,以控制正常再平衡的预期时间。
interceptor.classes        默认值:[]

用作拦截器的类列表。实现org.apache.kafka.clients.consumer.ConsumerInterceptor接口允许您拦截(并可能修改)消费者收到的记录。默认情况下,没有拦截器。
isolation.level        默认值:read_uncommitted

控制如何读取以事务方式编写的消息。如果设置为read_committed,consumer.poll() 将只返回已提交的事务消息。如果设置为read_uncommitted(默认值),consumer.poll()将返回所有消息,甚至是已中止的事务消息。在任何一种模式下,非事务性消息都将无条件返回。

消息将始终按偏移顺序返回。因此,在read_committed模式下,consumer.poll()将只返回直到最后一个稳定偏移量(LSO)的消息,该偏移量小于第一个打开事务的偏移量。特别是,在属于正在进行的交易的消息之后出现的任何消息都将被扣留,直到相关交易完成。因此,当有正在进行的交易时,read_committed消费者将无法读取高水位。

此外,当处于read_committed状态时,seekToEnd方法将返回LSO
key.deserializer        默认值:class cdh中为: org.apache.kafka.common.serialization.ByteArrayDeserializer  官方中没有默认值

实现org.apache.kafka.common.serialization的key的反序列化器类
max.partition.fetch.bytes        默认值:1048576

服务器将返回的每个分区的最大数据量。记录由消费者分批提取。如果获取的第一个非空分区中的第一个记录批大于此限制,则仍将返回该批以确保消费者可以取得进展。代理接受的最大记录批大小是通过message.max.bytes(代理配置)或max.message.bytes(主题配置)定义的。有关限制消费者请求大小的信息,请参阅fetch.max.bytes。
max.poll.interval.ms        默认值:300000  5min

使用消费者组管理时调用poll()之间的最大延迟。这为消费者在获取更多记录之前可以空闲的时间设置了上限。如果在此超时到期之前未调用poll(),则认为消费者失败,组将重新平衡,以便将分区重新分配给另一个成员。对于使用非空group.instance.id达到此超时的消费者,分区不会立即重新分配。相反,消费者将停止发送心跳,分区将在session.timeout.ms过期后重新分配。这反映了已关闭的静态消费者的行为。
max.poll.records        默认值:500

单次调用poll()时返回的最大记录数。请注意,max.poll.records不会影响底层的获取行为。消费者将缓存每个获取请求的记录,并从每次轮询中递增地返回这些记录。
metadata.max.age.ms        默认值:300000  5min

一段时间(毫秒),在此之后,即使我们没有看到任何分区leader发生变化,我们也会强制刷新元数据,以主动发现任何新的代理或分区。
partition.assignment.strategy        默认值:[class cdh为: org.apache.kafka.clients.consumer.RangeAssignor]  官网为:

class org.apache.kafka.clients.consumer.RangeAssignor,

class org.apache.kafka.clients.consumer.CooperativeStickyAssignor

按优先级排序的类名或类类型列表,其中包含客户端在使用组管理时将用于在消费者实例之间分配分区所有权的支持分区分配策略。可用选项包括:

org.apache.kafka.clients.consumer.RangeAssignor:按主题分配分区。

org.apache.kafka.clients.consumer.RoundRobinAssignor:以循环方式为消费者分配分区。

org.apache.kafka.clients.consumer.StickyAssignor:保证分配达到最大平衡,同时保留尽可能多的现有分区分配。

org.apache.kafka.clients.consumer.CooperativeStickyAssignor:遵循相同的StickyAassignor逻辑,但允许合作再平衡。

默认的分配器是[RangeAssignor,Cooperative StickyAssignor],默认情况下将使用RangeAssignor,但允许升级到Cooperative StickeyAssignor,只需一次滚动反弹即可从列表中删除RangeAssignor。

实现org.apache.kafka.clients.consumer.ConsumerPartitionAssignor接口允许您插入自定义分配策略。
receive.buffer.bytes        默认值:65536 64k

读取数据时使用的TCP接收缓冲区(SO_RCVBUF)的大小。如果该值为-1,则将使用操作系统默认值。
reconnect.backoff.max.ms        默认值:1000 1s

重新连接到反复连接失败的代理时等待的最长时间(毫秒)。如果提供,每台主机的回退将随着每次连续连接失败呈指数级增长,直至达到此最大值。在计算退避增加后,添加20%的随机抖动以避免连接风暴。
reconnect.backoff.ms        默认值:50

尝试重新连接到给定主机之前等待的基本时间量。这避免了在紧环中重复连接到主机。此回退适用于客户端向代理的所有连接尝试。此值是初始回退值,每次连续连接失败时都会呈指数级增长,最高可达reconnect.backoff.max.ms值。
request.timeout.ms        默认值:cdh为:30000 30s 官网为:40000 40s

配置控制客户端等待请求响应的最长时间。如果在超时时间过去之前未收到响应,则客户端将在必要时重新发送请求,或者在重试次数用尽时使请求失败。
retry.backoff.ms        默认值:100

在尝试重试对给定主题分区的失败请求之前等待的时间量。这避免了在某些故障情况下在紧循环中重复发送请求。此值是初始退避值,对于每个失败的请求,它将呈指数级增加,直到retry.backff.max.ms值。
security.protocol        默认值:PLAINTEXT

用于与broker通信的协议。有效值为:PLAINTEXT、SSL、SASL_PLAINTEXT和SASL_SSL。
send.buffer.bytes        默认值:131072

发送数据时使用的TCP发送缓冲区(SO_SNDBUF)的大小。如果该值为-1,则将使用操作系统默认值。
session.timeout.ms        默认值:10000  10s

用于检测工作程序故障的超时时间。worker定期向代理发送心跳,以指示其活动状态。如果代理在此会话超时到期之前没有收到心跳,则代理将从组中删除该worker并启动重新平衡。请注意,该值必须在group.min.session.timeout.ms和group.max.session.ttimeout.ms在代理配置中配置的允许范围内。
value.deserializer        默认值:class cdh中为

org.apache.kafka.common.serialization.ByteArrayDeserializer 官网为空

实现org.apache.kafka.common.serialization的value的反序列化器类

四、broker

服务器基础配置:

broker.id        默认值:0

必须为每一个broker设置一个唯一的整数

此服务器的唯一id。如果未设置,将生成一个唯一的整数作为id。为了避免ZooKeeper生成的id和用户配置的id之间的冲突,生成的id从reserved.broker.max.id+1开始。

Socket配置:

listeners        默认值:PLAINTEXT://:9092

Socket服务器监听的地址,r如果没有配置,将从java.net.InetAddress.getCanonicalHostName()获取
advertised.listeners        默认值:PLAINTEXT://your.host.name:9092

listener.security.protocol.map        默认值:PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

broker 向生产者和消费者展示的主机名和端口,如果未设置,则使用listeners的值(如果已配置)。否则,它将使用java.net.InetAddress.getCanonicalHostName()。

num.network.threads        默认值:3

服务器用于从网络接收请求并向网络发送响应的线程数

num.io.threads        默认值:8

服务器用于处理请求的线程数,其中可能包括磁盘I/O

socket.send.buffer.bytes        默认值:102400 100K

Socket的发送缓冲区(SO_SNDBUF)

socket.receive.buffer.bytes        默认值:102400 100K

Socket的接收缓冲区(SO_RCVBUF)

socket.request.max.bytes        默认值:104857600 100M

Socket接受请求的最大大小(防止OOM)

日志设置:

log.dirs        默认值:/tmp/kafka-logs

以逗号分隔的存储日志文件的目录列表

num.partitions        默认值:1

每个topic的默认日志分区数。更多的分区允许更大的并行性用于消费,但这也会导致跨broker的文件更多。

num.recovery.threads.per.data.dir        默认值:1

启动时用于日志恢复和关闭时用于刷新的每个数据目录的线程数。对于RAID阵列中有数据驱动器的安装,建议增加此值。

内部Topic设置:

offsets.topic.replication.factor        默认值:cdh中是1,官网是3
transaction.state.log.replication.factor        默认值:cdh中是1,官网是3

对于内部的topic,例如“__consumer_offsets”和“__transaction_state”的副本数建议使用大于1的值以确保可用性,例如3
transaction.state.log.min.isr        默认值:cdh中是1,官网是2

生产者在向topci打数据时需要确认写入的最小副本数量。

日志写入策略:

消息会立即写入文件系统,但默认情况下,我们只使用fsync()来延迟同步操作系统缓存。以下配置控制数据到磁盘的刷新。这里有一些重要的权衡(trade-offs):

        1、持久性:如果不使用副本,未持久化的数据可能会丢失。

        2、延迟:当确实发生写入时,非常大的写入间隔可能会导致延迟峰值,因为将有大量数据需要写入磁盘。

        3、吞吐量:持久化磁盘通常是最昂贵的操作,较小的写入磁盘间隔可能会导致过度磁盘寻道。

以下设置允许用户配置写入磁盘策略,以便在一段时间后或每N条消息(或两者兼而有之)刷新数据。这可以在全局范围内完成,并在每个topic的基础上覆盖。

log.flush.interval.messages        默认值:cdh中是10000 源码中是9223372036854775807

当接受的消息数达到该值后,强制将数据刷新到磁盘

log.flush.interval.ms        默认值:cdh中是1000,官网是null

消息可以在日志中停留的最长时间,当达到该阈值,强制将数据刷新到磁盘

日志保留策略:

以下配置控制日志段的处置。该策略可以设置为在一段时间后或累积给定大小后进行删除。只要满足这些条件中的任何一个,就会删除一个片段。删除总是从日志末尾开始。

log.retention.hours        默认值:168

日志保留的小时数,当达到这个阈值时,就要对数据进行删除

log.retention.bytes        默认值:cdh中为1073741824   1G 官网为-1

该删除策略独立于log.retenation.hours

日志保留的最大大小,当达到这个阈值时,就要对数据进行删除

log.segment.bytes        默认值:1073741824 1G

日志段文件的最大大小。当达到此大小时,将创建一个新的日志段。

log.retention.check.interval.ms        默认值:300000  5min

根据保留策略检查日志段是否可以删除的间隔

Zookeeper配置:

zookeeper.connect        默认值:localhost:2181

Zookeeper连接字符串。这是一个逗号分隔的主机:端口对,每个对应一个zk服务器。例如“127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002”。我们还可以在url后附加一个可选的chroot字符串,以指定所有kafka-znode的根目录。例如:127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/kafka

zookeeper.connection.timeout.ms        默认值:6000

连接到Zookeeper的超时时间(毫秒)

组协调设置

以下配置指定了GroupCoordinator将延迟初始消费者重新平衡的时间(以毫秒为单位)。

随着新成员加入组,group.initial.rebalance.delay.ms的值将进一步延迟重新平衡,最大值为max.poll.interval.ms。

默认值为3秒。

我们在这里将其重写为0,因为它为开发和测试提供了更好的开箱即用体验。

然而,在生产环境中,默认值3秒更合适,因为这将有助于避免在应用程序启动期间进行不必要且可能昂贵的重新平衡

group.initial.rebalance.delay.ms        默认值:chd中是0 官网是3