一、CentOS 7.9 安装 kafka_2.13
地址
二、安装准备
1 安装JDK
在安装kafka之前必须先安装JDK和zookeeper,如何安装JDK,可以查看:CentOS 7.9 安装 jdk-8u333
2 下载安装zookeeper
如何在CentOS 7 下安装zookeeper,可以查看:CentOS 7.9 安装 zookeeper-3.6.3
二、kafka
1 进入Apache官网 http://kafka.apache.org/downloads.html
选择Binary downloads,选择版本进行下载。
2 wget下载
wget https://archive.apache.org/dist/kafka/3.0.1/kafka_2.13-3.0.1.tgz
3 解压
tar -zxvf /opt/software/kafka_2.13-3.0.1.tgz -C /opt/
ll /opt/kafka_2.13-3.0.1/
4 进入kafka目录
5 启动kafka之前要确保zookeeper已经启动,如果没有启动,执行以下命令
zkServer.sh start
6 修改kafka配置文件中的zookeeper地址,打开配置文件
vim /opt/kafka_2.13-3.0.1/config/server.properties
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0
############################# Socket Server Settings #############################
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://192.168.0...:8091
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092
# A comma separated list of directories under which to store log files
log.dirs=/opt/kafka_2.13-3.0.1/logs
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=192.168.0...:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000
7 新建日志储存路径
mkdir /opt/kafka_2.13-3.0.1/logs
8 启动kafka
./bin/kafka-server-start.sh config/server.properties
# 后台启动
./bin/kafka-server-start.sh -daemon config/server.properties
四、设置开机自动启动
1 切换到/lib/systemd/system/目录,创建自启动文件
vim /lib/systemd/system/kafka.service
[Unit]
Description=kafkaservice
After=network.target
[Service]
WorkingDirectory=/opt/kafka_2.13-3.0.1
ExecStart=/opt/kafka_2.13-3.0.1/bin/kafka-server-start.sh /opt/kafka_2.13-3.0.1/config/server.properties
ExecStop=/opt/kafka_2.13-3.0.1/bin/kafka-server-stop.sh
User=root
Group=root
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
2 设置自启动
systemctl enable kafka.service
3 立即启动服务
systemctl start kafka.service
4 查看启动状态
systemctl status kafka.service
5 关闭服务
systemctl stop kafka.service
五、Kafka命令测试
1 创建topic
- create:创建topic
- partitions:topic创建时,若不指定分区个数,则使用server.properties中配置的num.partitions值,也可以自己指定
- replication:用来设置主题的副本数。每个主题可以有多个副本,副本位于集群中不同的broker上,也就是说副本的数量不能超过broker的数量,否则创建主题时会失败。
./bin/kafka-topics.sh --bootstrap-server 192.168.0...:8091 --create --topic iyuyixyz --partitions 2 --replication-factor 1
2 列出所有topic
./bin/kafka-topics.sh --bootstrap-server 192.168.0...:8091 --list
3 列出所有topic的信息
./bin/kafka-topics.sh --bootstrap-server 192.168.0...:8091 --describe
4 列出指定topic的信息
./bin/kafka-topics.sh --bootstrap-server 192.168.0...:8091 --describe --topic iyuyixyz
5 生产者(消息发送程序)
./bin/kafka-console-producer.sh --broker-list 192.168.0...:8091 --topic iyuyixyz
6 消费者(消息接收程序)
./bin/kafka-console-consumer.sh --bootstrap-server 192.168.0...:8091 --topic iyuyixyz
六、配置系统环境变量
vim /etc/profile
export PATH=$PATH:/opt/kafka_2.13-3.0.1/bin
# 使配置生效
source /etc/profile
七、报错
1 zookeeper is not a recognized option
新版已不支持zookeeper参数,需要换成bootstrap-server参数。
./bin/kafka-topics.sh --create --zookeeper 192.168.0.98:2181 --replication-factor 1 --partitions 1 --topic iyuyixyz
原来新版本的kafka,已经不需要依赖zookeeper来创建topic,新版的kafka创建topic指令为下
./bin/kafka-topics.sh --bootstrap-server 192.168.0...:8091 --create --topic iyuyixyz --partitions 2 --replication-factor 1
2 节点响应超时(请求超时)
[2022-10-13 01:11:34,670] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (/192.168.0...:8092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2022-10-13 01:11:37,676] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (/192.168.0...:8092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2022-10-13 01:11:40,682] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (/192.168.0...:8092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
Error while executing topic command : Timed out waiting for a node assignment. Call: createTopics
[2022-10-13 01:11:43,600] ERROR org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: createTopics
(kafka.admin.TopicCommand$)
该错误是由于没有用对应版本的命令,访问了错的主机名(或IP)和端口,所以请求失败导致超时。
其中的createTopics、listTopics等都会出现该错误。如果没配置server.properties文件的listeners值,也会报以上错误。
listeners值默认是PLAINTEXT://:9092,要改为PLAINTEXT://localhost:9092或PLAINTEXT://IP:9092等