以下内容,仅为文档Kafka Documentation的阅读笔记。阅读时的版本为0.9.0.x。
1 开始
1.1 引言
kafka提供提交日志的服务,具有分布式、分块、可复制的特点。
1.2 使用场景
- 传送消息。解构数据生产和数据消费。
- 网站活动追踪。用作管道,不同类别信息使用不同topic。
- 度量。分布式数据聚合的中心点。
- 日志累积。持久化store的中转站。
- 流处理。流处理框架的中转站。
- 事件源。
- 提交日志。
1.3 快速开始
1.4 相关技术生态
1.5 升级
2 API
2.1 Producer
依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.0</version>
</dependency>
2.2 Consumer
2.2.1 高级消费者API(旧)
2.2.2 低级消费者API(旧)
2.2.3 消费者API(新)
新的消费者API不再区分高级/低级。
依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.0</version>
</dependency>
3 配置
4 设计
4.1 动机
高吞吐量,支持大容量事件流;
高抗压,支持离线系统周期性的数据压力;
低延迟;
动态分布式分区;
容错;
4.2 持久化
磁盘线性读写要比随机读写快很多;
常量时间。基于文件的读和追加来构建持久化队列;
4.3 效率
打包发送消息。
producer、broker和consumer同时使用标准二进制消息格式。
zero-copy。
数据压缩
4.4 producer
producer只向leader partition发送消息
通过load balance,将消息发送到不同的partition。
批量发送
4.5 consumer
kafka采用pull模式。
- push VS pull
push是一种推送行为,及时性好,对consumer要求较高,consumer的消费速率能够跟得上producer的生产速率。
pull是一种拉取行为,可以在consumer端控制消费速率,可使消费速率最大化。但若producer没有数据生产时,consumer会陷入轮询。
- 消费点
通过offset指示消费的位置。
4.6 消息传递
消息传递保证措施一般有以下几种:
- 最多一次。 消息不成功则不重传,可能丢消息。
- 最少一次。消息不成功则重传。
- 只传一次。消息只传递一次且成功。
kafka的做法:
4.7 Replication
哪些follower会备份
和大部分分布式系统一样,Kafka处理失败需要明确定义一个Broker是否“活着”。对于Kafka而言,节点存活满足两个条件:
- 节点必须维持它和zookeeper的session。
- Follower必须能够及时将Leader的消息复制过来,且不能落后太多。
Leader会跟踪与其保持同步的follower列表,该列表称为ISR(即in-sync Replica)。只有ISR中的follower执行复制。一旦leader发现follower未存活,则将其从ISR中删除,follower又存活,则将其添加到ISR中。Kafka只解决这种fail/recover,不处理“Byzantine”(“拜占庭”)问题。
一条消息只有被ISR里的所有replica都从Leader复制过去才会被认为“committed”。这种机制确保了只要ISR有一个或以上的replica,被“committed”的消息就一定不会丢失。
如何选举leader
Kafka在ZooKeeper中动态维护了一个ISR(in-sync replicas),这个ISR里的所有Replica都跟上了leader,只有ISR里的成员才有被选为Leader的可能。在这种模式下,对于f+1个Replica,一个Partition能在保证不丢失已经commit的消息的前提下容忍f个Replica的失败。
在ISR中至少有一个follower时,Kafka可以确保已经commit的数据不丢失,但如果某个Partition的所有Replica都宕机了,就无法保证数据不丢失了。这种情况下有两种可行的方案:
- 等待ISR中的任一个Replica“活”过来,并且选它作为Leader
- 选择第一个“活”过来的Replica(不一定是ISR中的)作为Leader
这就需要在可用性和一致性当中作出一个简单的折衷。如果一定要等待ISR中的Replica“活”过来,那不可用的时间就可能会相对较长。而且如果ISR中的所有Replica都无法“活”过来了,或者数据都丢失了,这个Partition将永远不可用。选择第一个“活”过来的Replica作为Leader,而这个Replica不是ISR中的Replica,那即使它并不保证已经包含了所有已commit的消息,它也会成为Leader而作为consumer的数据源(前文有说明,所有读写都由Leader完成)。Kafka0.9.*使用了第二种方式。
可用性和耐用性保证
两种方案:
- 禁用leader选举
- 控制最小ISR大小。ISR的大小达到一定的数量后,才可以进行写入。提高了耐用性,但这样降低了可用性。
replica管理
分配Replica的算法如下:
- (1) 将n个Broker和待分配的Partition排序
- (2) 第i个Partition分配到第i%n个Broker
- (3) 第i个Partition的第j个Replica分配到第(i + j) %n个Broker
Leader Election方案:
在所有broker中选出一个controller,所有Partition的Leader选举都由controller决定。controller会将Leader的改变直接通过RPC的方式通知给需要为此作出响应的Broker。同时controller也负责增删Topic以及Replica的重新分配。
controller不可用时,剩余的可用broker之一会成为新的controller。
4.8 日志压缩
4.9 Quotas
0.9版本提供。
Quotas(限额)以client-id为单位。
Quotas的出现是为了防止不良client独占broker资源,造成网络饱和、拒绝服务。
quota.producer.default, quota.consumer.default默认值可认为是无穷大。
违反quotas的client会被降速。
5 实现
6 操作
6.1 常用操作
6.1.1 topic操作
新增topic
bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name --partitions 20 --replication-factor 3 --config x=y
修改topic
-
修改partitions
bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --partitions 40
-
增加configs
bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --config x=y
-
删除configs
bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --deleteConfig x
删除topic
bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name
默认情况下topic是不能删除的,删除操作需要delete.topic.enable=true支持。
6.1.2 关闭集群
优雅的stop需要满足两个条件:
- 同步所有的log
- leader关闭前需要将数据移动replica
6.1.3 平衡leader
手动:
bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot
自动:
auto.leader.rebalance.enable=true
6.1.4 集群间镜像数据
示例:
bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config consumer-1.properties --consumer.config consumer-2.properties --producer.config producer.properties --whitelist my-topic
从2个集群(consumer-1.properties和consumer-2.properties)镜像my-topic的数据到新集群(producer.properties)
需要以下配置配合:
auto.create.topics.enable=true
6.1.5 查看consumer位置
示例:
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect localhost:2181 --group test
6.1.6 集群扩展
集群扩展涉及到数据重分配。
自动移动数据
示例移动foo1和foo2所有的分区到5,6上
topics-to-move.json:
{"topics": [{"topic": "foo1"},
{"topic": "foo2"}],
"version":1
}
生成移动方案。:
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate
执行:
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --execute
检查移动结果:
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --verify
自定义移动数据
定义分配计划custom-reassignment.json:
{"version":1,"partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},{"topic":"foo2","partition":1,"replicas":[2,3]}]}
执行:
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --execute
检查:
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --verify
6.1.7 退役中的broker
6.1.8 增加replication因子
increase-replication-factor.json:
{"version":1, "partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]}
执行:
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute
验证:
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --verify
查看topic配置:
bin/kafka-topics.sh --zookeeper localhost:2181 --topic foo --describe
6.1.9 设置阈值
设置默认阈值:
quota.producer.default=10485760
quota.consumer.default=10485760
设置特定用户的阈值:
bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048' --entity-name clientA --entity-type clients
查看:
./kafka-configs.sh --zookeeper localhost:2181 --describe --entity-name clientA --entity-type clients
6.2 数据中心
6.3 kafka配置
producer相关的比较重要的配置:
- 压缩率
- 同步或异步
- 批量大小
6.4 jdk
建议jdk版本1.8
6.5 硬盘和操作系统
- memory:吞吐率*时间
- os:推荐unix系统
- disk:建议使用多块磁盘,可提高吞吐率。
linux清除策略
file -> pagecache -> disk
Pdflush控制pagecache -> disk的flush,如果flush速率跟不上写入pagecache的速率,会阻塞写入过程。
Ext4文件系统
6.4 硬盘和操作系统
6.6 监控
6.7 zookeeper
7 安全
7.1 安全概述
7.2 使用SSL进行加密和认证
示例:
#!/bin/bash
#Step 1
keytool -keystore server.keystore.jks -alias localhost -validity 365 -genkey
#Step 2
openssl req -new -x509 -keyout ca-key -out ca-cert -days 365
keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert
keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert
#Step 3
keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-file
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:test1234
keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore server.keystore.jks -alias localhost -import -file cert-signed
broker配置:
listeners=PLAINTEXT://host.name:port,SSL://host.name:port
ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234
ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
ssl.truststore.password=test1234
security.inter.broker.protocol=SSL
检查:
openssl s_client -debug -connect localhost:9093 -tls1
配置client:
# 只支持0.9
security.protocol=SSL
ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
ssl.truststore.password=test1234
ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234
# Examples using console-producer and console-consumer:
kafka-console-producer.sh --broker-list localhost:9093 --topic test --producer.config client-ssl.properties
kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --new-consumer --consumer.config client-ssl.properties