kafka_2.11-0.10.0.0安装步骤

时间:2020-11-28 17:31:48

Kafka安装配置

我们使用5台机器搭建Kafka集群:

1. cluster-1-namenode-1-001       172.16.0.147

2. cluster-1-datanode-1-001       172.16.0.144

3. cluster-1-datanode-1-003       172.16.0.145

4. cluster-1-datanode-1-002       172.16.0.146

5. cluster-1-datanode-1-004       172.16.0.148

由于之前已经安装了Zookeeper,就不再使用Kafka自带的Zookeeper。

首先,在namenode上准备Kafka安装文件,执行如下命令:

cd /opt

wget http://www-eu.apache.org/dist/kafka/0.10.0.0/kafka_2.11-0.10.0.0.tgz

tar -xvzf kafka_2.11-0.10.0.0.tgz

解压好之后,修改配置文件/opt/kafka_2.11-0.10.0.0/config/server.properties,原配置为:

broker.id=0

zookeeper.connect=localhost:2181

修改为:

broker.id=0

listeners=PLAINTEXT://cluster-1-namenode-1-001:9092

advertised.listeners=PLAINTEXT://cluster-1-namenode-1-001:9092

log.dirs=/opt/kafka_2.11-0.10.0.0/logs/kafka-logs

zookeeper.connect=cluster-1-namenode-1-001:2181,cluster-1-datanode-1-001:2181,cluster-1-datanode-1-003:2181,cluster-1-datanode-1-002:2181,cluster-1-datanode-1-004:2181

这里需要说明的是,默认Kafka会使用ZooKeeper默认的/路径,这样有关Kafka的ZooKeeper配置就会散落在根路径下面,如果你有其他的应用也在使用ZooKeeper集群,查看ZooKeeper中数据可能会不直观,所以强烈建议指定一个chroot路径,直接在zookeeper.connect配置项中指定。

由于kafka-logs不存在,创建并给与权限:

cd /opt/kafka_2.11-0.10.0.0/logs

mkdir kafka-logs

chmod -R 777 kafka-logs

然后,将配置好的文件同步到datanode节点上:

scp -r /opt/kafka_2.11-0.10.0.0/ cluster-1-datanode-1-001:/opt/

scp -r /opt/kafka_2.11-0.10.0.0/ cluster-1-datanode-1-003:/opt/

scp -r /opt/kafka_2.11-0.10.0.0/ cluster-1-datanode-1-002:/opt/

scp -r /opt/kafka_2.11-0.10.0.0/ cluster-1-datanode-1-004:/opt/

并修改/opt/kafka_2.11-0.10.0.0/config/server.properties内容:

broker.id=1

listeners=PLAINTEXT:// cluster-1-datanode-1-001:9092

advertised.listeners=PLAINTEXT:// cluster-1-datanode-1-001:9092

# 在cluster-1-datanode-1-001修改

broker.id=2

listeners=PLAINTEXT:// cluster-1-datanode-1-002:9092

advertised.listeners=PLAINTEXT:// cluster-1-datanode-1-002:9092

# 在cluster-1-datanode-1-002修改

broker.id=3

listeners=PLAINTEXT:// cluster-1-datanode-1-003:9092

advertised.listeners=PLAINTEXT:// cluster-1-datanode-1-003:9092

# 在cluster-1-datanode-1-003修改

broker.id=4

listeners=PLAINTEXT:// cluster-1-datanode-1-004:9092

advertised.listeners=PLAINTEXT:// cluster-1-datanode-1-004:9092

# 在cluster-1-datanode-1-004修改

因为Kafka集群需要保证各个Broker的id在整个集群中必须唯一,需要调整这个配置项的值。

通过检查进程状态,使用守护进程模式启动kafka,保证kafka集群启动成功,并且kafka不会自动关闭:

./kafka-server-start.sh -daemon /opt/kafka_2.11-0.10.0.0/config/server.properties &

分别启动节点上的kafka。

测试kafka的状态

  1. 在namenode上创建mytest主题(kafka有几个,replication-factor就填几个)

[root@cluster-1-namenode-1-001 bin]# ./kafka-topics.sh --create --topic mytest --replication-factor 5 --partitions 2 --zookeeper cluster-1-namenode-1-001:2181

Created topic "mytest".

[root@cluster-1-namenode-1-001 bin]#

  1. 在namenode上查看刚才创建的mytest主题

[root@cluster-1-namenode-1-001 bin]# ./kafka-topics.sh --list --zookeeper cluster-1-namenode-1-001:2181

idoall

idoall_testTopic

my_test

mytest

test

[root@cluster-1-namenode-1-001 bin]#

  1. 在datanode1上发送消息至kafka,发送消息“this is for test”

[root@cluster-1-datanode-1-001 bin]# ./kafka-console-producer.sh --broker-list cluster-1-namenode-1-001:9092 --sync --topic mytest

this is for test

  1. 在datanode2上开启一个消费者,模拟consumer,可以看到刚才发送的消息

[root@cluster-1-datanode-1-002 bin]# ./kafka-console-consumer.sh --zookeeper cluster-1-namenode-1-001:2181 --topic mytest --from-beginning

this is for test

^CProcessed a total of 1 messages

[root@cluster-1-datanode-1-002 bin]#