CentOS 7.9 安装 kafka_2.13

时间:2022-10-13 07:12:38

一、CentOS 7.9 安装 kafka_2.13

地址

CentOS 7.9 安装 kafka_2.13

 

二、安装准备

1 安装JDK

在安装kafka之前必须先安装JDK和zookeeper,如何安装JDK,可以查看:CentOS 7.9 安装 jdk-8u333

2 下载安装zookeeper

如何在CentOS 7 下安装zookeeper,可以查看:CentOS 7.9 安装 zookeeper-3.6.3

 

二、kafka

1 进入Apache官网 http://kafka.apache.org/downloads.html 选择Binary downloads,选择版本进行下载。

2 wget下载

wget https://archive.apache.org/dist/kafka/3.0.1/kafka_2.13-3.0.1.tgz

3 解压

tar -zxvf /opt/software/kafka_2.13-3.0.1.tgz -C /opt/
ll /opt/kafka_2.13-3.0.1/

4 进入kafka目录

CentOS 7.9 安装 kafka_2.13

5 启动kafka之前要确保zookeeper已经启动,如果没有启动,执行以下命令

zkServer.sh start

6 修改kafka配置文件中的zookeeper地址,打开配置文件

vim /opt/kafka_2.13-3.0.1/config/server.properties

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from 
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://192.168.0...:8091
# Hostname and port the broker will advertise to producers and consumers. If not set, 
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092

# A comma separated list of directories under which to store log files
log.dirs=/opt/kafka_2.13-3.0.1/logs

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=192.168.0...:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000

7 新建日志储存路径

mkdir /opt/kafka_2.13-3.0.1/logs

8 启动kafka

./bin/kafka-server-start.sh config/server.properties
# 后台启动
./bin/kafka-server-start.sh -daemon config/server.properties

 

四、设置开机自动启动

1 切换到/lib/systemd/system/目录,创建自启动文件

vim /lib/systemd/system/kafka.service
[Unit]
Description=kafkaservice
After=network.target

[Service]
WorkingDirectory=/opt/kafka_2.13-3.0.1
ExecStart=/opt/kafka_2.13-3.0.1/bin/kafka-server-start.sh /opt/kafka_2.13-3.0.1/config/server.properties
ExecStop=/opt/kafka_2.13-3.0.1/bin/kafka-server-stop.sh
User=root
Group=root
Restart=always
RestartSec=10

[Install]
WantedBy=multi-user.target

2 设置自启动

systemctl enable kafka.service

3 立即启动服务

systemctl start kafka.service

4 查看启动状态

systemctl status kafka.service

5 关闭服务

systemctl stop kafka.service

 

五、Kafka命令测试

1 创建topic

  • create:创建topic
  • partitions:topic创建时,若不指定分区个数,则使用server.properties中配置的num.partitions值,也可以自己指定
  • replication:用来设置主题的副本数。每个主题可以有多个副本,副本位于集群中不同的broker上,也就是说副本的数量不能超过broker的数量,否则创建主题时会失败。
./bin/kafka-topics.sh --bootstrap-server 192.168.0...:8091 --create --topic iyuyixyz --partitions 2 --replication-factor 1

2 列出所有topic

./bin/kafka-topics.sh --bootstrap-server 192.168.0...:8091 --list

3 列出所有topic的信息

./bin/kafka-topics.sh --bootstrap-server 192.168.0...:8091 --describe

4 列出指定topic的信息

./bin/kafka-topics.sh --bootstrap-server 192.168.0...:8091 --describe --topic iyuyixyz

5 生产者(消息发送程序)

./bin/kafka-console-producer.sh --broker-list 192.168.0...:8091 --topic iyuyixyz

6 消费者(消息接收程序)

./bin/kafka-console-consumer.sh --bootstrap-server 192.168.0...:8091 --topic iyuyixyz

 

六、配置系统环境变量

vim /etc/profile
export PATH=$PATH:/opt/kafka_2.13-3.0.1/bin
# 使配置生效
source /etc/profile

 

七、报错

1 zookeeper is not a recognized option

新版已不支持zookeeper参数,需要换成bootstrap-server参数。

./bin/kafka-topics.sh --create --zookeeper 192.168.0.98:2181 --replication-factor 1 --partitions 1 --topic iyuyixyz

原来新版本的kafka,已经不需要依赖zookeeper来创建topic,新版的kafka创建topic指令为下

./bin/kafka-topics.sh --bootstrap-server 192.168.0...:8091 --create --topic iyuyixyz --partitions 2 --replication-factor 1

CentOS 7.9 安装 kafka_2.13

 

2 节点响应超时(请求超时)

[2022-10-13 01:11:34,670] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (/192.168.0...:8092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2022-10-13 01:11:37,676] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (/192.168.0...:8092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2022-10-13 01:11:40,682] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (/192.168.0...:8092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
Error while executing topic command : Timed out waiting for a node assignment. Call: createTopics
[2022-10-13 01:11:43,600] ERROR org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: createTopics
 (kafka.admin.TopicCommand$)

该错误是由于没有用对应版本的命令,访问了错的主机名(或IP)和端口,所以请求失败导致超时。

其中的createTopicslistTopics等都会出现该错误。如果没配置server.properties文件的listeners值,也会报以上错误。

listeners值默认是PLAINTEXT://:9092,要改为PLAINTEXT://localhost:9092或PLAINTEXT://IP:9092

CentOS 7.9 安装 kafka_2.13