ubuntu22 部署zookeeper + kafka集群 & 配置开机自启动

时间:2024-05-31 16:57:08

ufw disabled #关闭防火墙 或者 放开指定端口

vim /etc/hosts #配置ip host映射关系

10.3.1.96    node1

10.3.1.97    node2

#1.所有机器安装jdk

apt install openjdk-8-jdk -y
java -version #export JAVA_HOME=/usr/lib/jvm/jdk1.8.0_202

#2.部署zookeeper集群

cd /usr/local
wget https://dlcdn.apache.org/zookeeper/zookeeper-3.8.4/apache-zookeeper-3.8.4-bin.tar.gz
tar -zxvf apache-zookeeper-3.8.4-bin.tar.gz 
mv apache-zookeeper-3.8.4-bin zookeeper-3.8.4

cd /usr/local/zookeeper-3.8.4/conf && cp zoo_sample.cfg zoo.cfg
mkdir -p /usr/local/zookeeper-3.8.4/logs
mkdir -p /usr/local/zookeeper-3.8.4/data

vim zoo.cfg 
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper-3.8.4/data
dataLogDir=/usr/local/zookeeper-3.8.4/logs
clientPort=2181
#客户端访问zk的端口
server.1=node1:2888:3888
server.2=node2:2888:3888
#说明:2888为组成zookeeper服务器之间的通信端口,3888为用来选举leader的端口,server后面的数字与后面的myid相对应
#注意前后不要有空格 否则报错Invalid config, exiting abnormally  可以通过:set list显示隐藏字符来处理

vim /etc/profile
export ZOOKEEPER_HOME=/usr/local/zookeeper-3.8.4
export PATH=$ZOOKEEPER_HOME/bin:$PATH 
source /etc/profile

#node1节点执行

cd /usr/local/zookeeper-3.8.4/data && echo "1" > myid

#node2节点执行

cd /usr/local/zookeeper-3.8.4/data && echo "2" > myid

#所有机器启动zk

cd /usr/local/zookeeper-3.8.4/bin

./zkServer.sh start

./zkServer.sh status

#3.部署kafka 并修改配置文件(不同节点参数需修改)

cd /usr/local
wget https://downloads.apache.org/kafka/3.7.0/kafka_2.12-3.7.0.tgz
tar -zxvf kafka_2.12-3.7.0.tgz
mkdir -p /usr/local/kafka_2.12-3.7.0/logs

#node1进行如下操作。  node2同理:只是把id改为2 node1改为node2即可

vim /usr/local/kafka_2.12-3.7.0/config/server.properties
#id不重复 修改下面三行
broker.id=1
listeners=PLAINTEXT://node1:9092
advertised.listeners=PLAINTEXT://node1:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400 
socket.receive.buffer.bytes=102400 
socket.request.max.bytes=104857600
#建议开启 新增此行
delete.topic.enable=true
#修改此行
log.dirs=/usr/local/kafka_2.12-3.7.0/logs
num.partitions=1 
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1 
transaction.state.log.replication.factor=1 
transaction.state.log.min.isr=1 
log.retention.hours=168
log.segment.bytes=1073741824 
log.retention.check.interval.ms=300000
#修改此行
zookeeper.connect=node1:2181,node2:2181
zookeeper.connection.timeout.ms=18000 
group.initial.rebalance.delay.ms=0

vim /usr/local/kafka_2.12-3.7.0/config/consumer.properties
bootstrap.servers=node1:9092
group.id=test-consumer-group

vim /usr/local/kafka_2.12-3.7.0/config/producer.properties
bootstrap.servers=node1:9092
compression.type=none

#所有机器启动

cd /usr/local/kafka_2.12-3.7.0/bin

./kafka-server-start.sh -daemon ../config/server.properties

#配置systemctl 开机自启动 zookeeper和 kafka

vim /lib/systemd/system/zookeeper.service
[Unit]
Description=Apache Zookeeper server
Documentation=http://zookeeper.apache.org
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]
Type=forking
User=root
Group=root
ExecStart=/usr/local/zookeeper-3.8.4/bin/zkServer.sh start
ExecStop=/usr/local/zookeeper-3.8.4/bin/zkServer.sh stop
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

vim /lib/systemd/system/kafka.service
[Unit]
Description=Apache Kafka server (broker)
Documentation=http://kafka.apache.org/documentation.html
Requires=network.target remote-fs.target
After=network.target remote-fs.target zookeeper.service

[Service]
Type=forking
User=root
Group=root
Environment="JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64"
ExecStart=/usr/local/kafka_2.12-3.7.0/bin/kafka-server-start.sh -daemon /usr/local/kafka_2.12-3.7.0/config/server.properties
ExecStop=/usr/local/kafka_2.12-3.7.0/bin/kafka-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

#which java;  #然后ls -l 直到没有软链则为java安装路径

systemctl daemon-reload #刷新配置

systemctl start zookeeper.service

systemctl enable zookeeper.service

systemctl start kafka.service

systemctl enable kafka.service

注意:systemctl启动若不成功,可以修改Type 然后再尝试

命令行测试

#测试创建topic

./kafka-topics.sh --create --bootstrap-server node1:9092 --replication-factor 3 --partitions 3 --topic test #老版本 --zookeeper node1:2181

./kafka-topics.sh --describe --bootstrap-server node1:9092 --topic test

./kafka-topics.sh -list --bootstrap-server node1:9092

./kafka-console-producer.sh --broker-list node1:9092 --topic test

./kafka-console-consumer.sh --bootstrap-server node:9092 --topic test --from-beginning

#旧版Kafka,用的是zookeeper地址而非bootstrap.servers

#主要有两个目的/动机:一是优化元数据管理,原来的zk方案,极端情况下可能会造成数据不一致;二是简化部署和配置。

#bootstrap.servers只是用于客户端启动(bootstrap)的时候有一个可以热启动的一个连接者,一旦启动完毕客户端就应该可以得知当前集群的所有节点的信息,日后集群扩展的时候客户端也能够自动实时的得到新节点的信息,即使bootstrap.servers里面的挂掉了也应该是能正常运行的,除非节点挂掉后客户端也重启了

#服务器是Kafka,生产者(发送数据的)和消费者(接收数据的)是客户端

#删除topic

#1.kafka启动之前,在server.properties配置delete.topic.enable=true

#2.执行命令bin/kafka-topics.sh --delete --topic test --zookeeper zk:2181或者使用kafka-manager集群管理工具删除

#注意:如果没有设置 delete.topic.enable=true,则调用kafka 的delete命令无法真正将topic删除,而是显示(marked for deletion)