1. 查看kafka都有那些topic
a. list
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper test1.hadoop.puhuifinance.com:2181
b. describe
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --describe --zookeeper test1.hadoop.puhuifinance.com:2181 --topic test
c. delete, 这里要设置 delete.topic.enable=true
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --zookeeper test1.hadoop.puhuifinance.com:2181 --topic test
d. 查看topic中的数据
/usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --zookeeper test1.hadoop.puhuifinance.com:2181 --topic test
# --from-beginning 从头开始读取
# --property print.key=true 打印key
e. 查看针对topic每个group 消费的程度, 是不是特定的zookeeper path?
/usr/hdp/current/kafka-broker/bin/kafka-consumer-offset-checker.sh --zookeeper=test1.hadoop.puhuifinance.com:2181 --group=test
f. 生产者:
/usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list test1.hadoop.puhuifinance.com:6667 --topic test < flume-spark.log
2. topic的创建与修改.
a. create
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper test1.hadoop.puhuifinance.com:2181 --replication-factor 2 --partitions 4 --topic test
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper test1.hadoop.puhuifinance.com:2181 --replication-factor 3 --partitions 3 --topic test
b. alter partitions
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --alter --zookeeper test1.hadoop.puhuifinance.com:2181 --partitions 3 --topic test
3. 更改replication
a. 首先查看topic当前的partition 及replication状态.
cat topics-info.json
{"topics": [{"topic": "TaoBaoTopic"}],
"version":1
}
/usr/hdp/current/kafka-broker/bin/kafka-reassign-partitions.sh --zookeeper test1.hadoop.puhuifinance.com:2181 --topics-to-move-json-file topics-info.json --broker-list "1,2" --generate
b. 会得到类似下面的结果:
Current partition replica assignment
{
"version": 1,
"partitions": [
{
"topic": "TaoBaoTopic",
"partition": 2,
"replicas": [
1
]
},
{
"topic": "TaoBaoTopic",
"partition": 0,
"replicas": [
2
]
},
{
"topic": "TaoBaoTopic",
"partition": 1,
"replicas": [
0
]
}
]
}
c. 根据要改动的replication策略, 例如:
cat TaoBao_Topic_alter_replication.json
{
"version": 1,
"partitions": [
{
"topic": "TaoBaoTopic",
"partition": 2,
"replicas": [
1,2,0
]
},
{
"topic": "TaoBaoTopic",
"partition": 0,
"replicas": [
2,0,1
]
},
{
"topic": "TaoBaoTopic",
"partition": 1,
"replicas": [
0,1,2
]
}
]
}
d. 将更改的策略进行实施, TaoBao_Topic_alter_replication.json
/usr/hdp/current/kafka-broker/bin//kafka-reassign-partitions.sh --zookeeper test1.hadoop.puhuifinance.com:2181 --reassignment-json-file TaoBao_Topic_alter_replication.json --execute
Current partition replica assignment
{"version":1,"partitions":[{"topic":"TaoBaoTopic","partition":2,"replicas":[1]},{"topic":"TaoBaoTopic","partition":0,"replicas":[2]},{"topic":"TaoBaoTopic","partition":1,"replicas":[0]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions {"version":1,"partitions":[{"topic":"TaoBaoTopic","partition":2,"replicas":[1,2,0]},{"topic":"TaoBaoTopic","partition":0,"replicas":[2,0,1]},{"topic":"TaoBaoTopic","partition":1,"replicas":[0,1,2]}]}
e. 查看replication的过程,或者是服务迁移的过程, 例如将原有的topic分配到新增加的服务器上.
/usr/hdp/current/kafka-broker/bin//kafka-reassign-partitions.sh --zookeeper test1.hadoop.puhuifinance.com:2181 --reassignment-json-file TaoBao_Topic_alter_replication.json --verify
Status of partition reassignment:
Reassignment of partition [TaoBaoTopic,2] completed successfully
Reassignment of partition [TaoBaoTopic,0] completed successfully
Reassignment of partition [TaoBaoTopic,1] completed successfully
f. 通过之前的命令, 可以再查看下当前的状态.
/usr/hdp/current/kafka-broker/bin/kafka-reassign-partitions.sh --zookeeper test1.hadoop.puhuifinance.com:2181 --topics-to-move-json-file topics-info.json --broker-list "0,1,2" --generate
或者是describe
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --describe --zookeeper test1.hadoop.puhuifinance.com:2181 --topic test
4. 一些参数设置
a. producer http://kafka.apache.org/documentation.html#producerconfigs
request.required.acks=1
producer.type=sync
compression.codec=snappy