Kafka安装配置
我们使用5台机器搭建Kafka集群:
1. cluster-1-namenode-1-001 172.16.0.147
2. cluster-1-datanode-1-001 172.16.0.144
3. cluster-1-datanode-1-003 172.16.0.145
4. cluster-1-datanode-1-002 172.16.0.146
5. cluster-1-datanode-1-004 172.16.0.148
由于之前已经安装了Zookeeper,就不再使用Kafka自带的Zookeeper。
首先,在namenode上准备Kafka安装文件,执行如下命令:
cd /opt
wget http://www-eu.apache.org/dist/kafka/0.10.0.0/kafka_2.11-0.10.0.0.tgz
tar -xvzf kafka_2.11-0.10.0.0.tgz
解压好之后,修改配置文件/opt/kafka_2.11-0.10.0.0/config/server.properties,原配置为:
broker.id=0
zookeeper.connect=localhost:2181
修改为:
broker.id=0
listeners=PLAINTEXT://cluster-1-namenode-1-001:9092
advertised.listeners=PLAINTEXT://cluster-1-namenode-1-001:9092
log.dirs=/opt/kafka_2.11-0.10.0.0/logs/kafka-logs
zookeeper.connect=cluster-1-namenode-1-001:2181,cluster-1-datanode-1-001:2181,cluster-1-datanode-1-003:2181,cluster-1-datanode-1-002:2181,cluster-1-datanode-1-004:2181
这里需要说明的是,默认Kafka会使用ZooKeeper默认的/路径,这样有关Kafka的ZooKeeper配置就会散落在根路径下面,如果你有其他的应用也在使用ZooKeeper集群,查看ZooKeeper中数据可能会不直观,所以强烈建议指定一个chroot路径,直接在zookeeper.connect配置项中指定。
由于kafka-logs不存在,创建并给与权限:
cd /opt/kafka_2.11-0.10.0.0/logs
mkdir kafka-logs
chmod -R 777 kafka-logs
然后,将配置好的文件同步到datanode节点上:
scp -r /opt/kafka_2.11-0.10.0.0/ cluster-1-datanode-1-001:/opt/
scp -r /opt/kafka_2.11-0.10.0.0/ cluster-1-datanode-1-003:/opt/
scp -r /opt/kafka_2.11-0.10.0.0/ cluster-1-datanode-1-002:/opt/
scp -r /opt/kafka_2.11-0.10.0.0/ cluster-1-datanode-1-004:/opt/
并修改/opt/kafka_2.11-0.10.0.0/config/server.properties内容:
broker.id=1
listeners=PLAINTEXT:// cluster-1-datanode-1-001:9092
advertised.listeners=PLAINTEXT:// cluster-1-datanode-1-001:9092
# 在cluster-1-datanode-1-001修改
broker.id=2
listeners=PLAINTEXT:// cluster-1-datanode-1-002:9092
advertised.listeners=PLAINTEXT:// cluster-1-datanode-1-002:9092
# 在cluster-1-datanode-1-002修改
broker.id=3
listeners=PLAINTEXT:// cluster-1-datanode-1-003:9092
advertised.listeners=PLAINTEXT:// cluster-1-datanode-1-003:9092
# 在cluster-1-datanode-1-003修改
broker.id=4
listeners=PLAINTEXT:// cluster-1-datanode-1-004:9092
advertised.listeners=PLAINTEXT:// cluster-1-datanode-1-004:9092
# 在cluster-1-datanode-1-004修改
因为Kafka集群需要保证各个Broker的id在整个集群中必须唯一,需要调整这个配置项的值。
通过检查进程状态,使用守护进程模式启动kafka,保证kafka集群启动成功,并且kafka不会自动关闭:
./kafka-server-start.sh -daemon /opt/kafka_2.11-0.10.0.0/config/server.properties &
分别启动节点上的kafka。
测试kafka的状态
- 在namenode上创建mytest主题(kafka有几个,replication-factor就填几个)
[root@cluster-1-namenode-1-001 bin]# ./kafka-topics.sh --create --topic mytest --replication-factor 5 --partitions 2 --zookeeper cluster-1-namenode-1-001:2181
Created topic "mytest".
[root@cluster-1-namenode-1-001 bin]#
- 在namenode上查看刚才创建的mytest主题
[root@cluster-1-namenode-1-001 bin]# ./kafka-topics.sh --list --zookeeper cluster-1-namenode-1-001:2181
idoall
idoall_testTopic
my_test
mytest
test
[root@cluster-1-namenode-1-001 bin]#
- 在datanode1上发送消息至kafka,发送消息“this is for test”
[root@cluster-1-datanode-1-001 bin]# ./kafka-console-producer.sh --broker-list cluster-1-namenode-1-001:9092 --sync --topic mytest
this is for test
- 在datanode2上开启一个消费者,模拟consumer,可以看到刚才发送的消息
[root@cluster-1-datanode-1-002 bin]# ./kafka-console-consumer.sh --zookeeper cluster-1-namenode-1-001:2181 --topic mytest --from-beginning
this is for test
^CProcessed a total of 1 messages
[root@cluster-1-datanode-1-002 bin]#