57-Zookeeper集群和kafka消息队列集群

时间:2023-02-03 12:02:06

Martin Fowler发现所有成功的微服务都遵循了通用的模式 - Monolith First(单体优先)

  • 几乎所有成功的微服务故事,都是从一个变得太大而被分解的单体开始的。
  • 几乎所有我听说过的从头开始构建为微服务系统的系统都以严重的麻烦告终。

ZooKeeper

简介

  • ZooKeeper 是一个典型的分布式数据一致性解决方案,分布式应用程序可以基于 ZooKeeper 实现诸如数据发布/订阅、负载均衡、命名服务、分布式协调/通知、集群管理、Master 选举、分布式锁和分布式队列等功能。

使用场景

  • Zookeeper 一个最常用的使用场景就是用于担任服务生产者和服务消费者的注册中心(提供发布订阅服务)。 服务生产者将自己提供的服务注册到Zookeeper中心,服务的消费者在进行服务调用的时候先到Zookeeper中查找服务,获取到服务生产者的详细信息之后,再去调用服务生产者的内容与数据。在Dubbo架构中 Zookeeper 就担任了注册中心这一角色。

Zookeeper 数据模型

  • 在 Zookeeper 中,节点分为两类:
  • 第一类是指构成Zookeeper集群的主机,称之为主机节点
  • 第二类则是指内存中zookeeper数据模型中的数据单元,用来存储各种数据内容,称之为数据节点 ZNode。
  • Zookeeper内部维护了一个层次关系(树状结构)的数据模型,它的表现形式类似于Linux的文件系统

ZooKeeper 服务流程图解

57-Zookeeper集群和kafka消息队列集群

ZooKeeper集群选举

  • 节点角色状态:
  • LOOKING:寻找 Leader 状态,处于该状态需要进入选举流程
  • LEADING:领导者状态,处于该状态的节点说明是角色已经是Leader
  • FOLLOWING:跟随者状态,表示 Leader已经选举出来,当前节点角色是follower
  • OBSERVER:观察者状态,表明当前节点角色是 observer
  • 选举 ID:
  • ZXID(zookeeper transaction id):每个改变 Zookeeper状态的操作都会形成一个对应的zxid。ZXID最大的节点优先选为Leader
  • myid:服务器的唯一标识(SID),通过配置 myid 文件指定,集群中唯一,当ZXID一样时,myid大的节点优先选为Leader
  • 选举过程:
  • 当集群中的 zookeeper 节点启动以后,会根据配置文件中指定的 zookeeper节点地址进行leader 选择操作,过程如下:
#每个zookeeper 都会发出投票,由于是第一次选举leader,因此每个节点都会把自己当做leader角色进行选举,每个zookeeper 的投票中都会包含自己的myid和zxid,此时zookeeper 1 的投票为myid 为 1,初始zxid有一个初始值0x0,后期会随着数据更新而自动变化,zookeeper2 的投票为myid 为2,初始zxid 为初始生成的值。

#每个节点接受并检查对方的投票信息,比如投票时间、是否状态为LOOKING状态的投票。

#对比投票,优先检查zxid,如果zxid 不一样则 zxid 大的为leader,如果zxid相同则继续对比myid,myid 大的一方为 leader
成为 Leader 的必要条件: Leader 要具有最高的zxid;当集群的规模是 n 时,集群中大多数的机器(至少n/2+1)得到响应并follower 选出的 Leader。
  • 心跳机制:Leader 与 Follower 利用 PING 来感知对方的是否存活,当 Leader无法响应PING 时,将重新发起 Leader 选举。
  • 当 Leader 服务器出现网络中断、崩溃退出与重启等异常情况,ZAB(Zookeeper Atomic Broadcast)协议就会进入恢复模式并选举产生新的Leader服务器。这个过程大致如下:
#Leader Election(选举阶段):节点在一开始都处于选举阶段,只要有一个节点得到超半数节点的票数,它就可以当选准 leader。

#Discovery(发现阶段):在这个阶段,followers 跟准 leader 进行通信,同步 followers 最近接收的事务提议。

#Synchronization(同步阶段):同步阶段主要是利用 leader 前一阶段获得的最新提议历史,同步集群中所有的副本。同步完成之后 准 leader 才会成为真正的 leader。

#Broadcast(广播阶段) :到了这个阶段,Zookeeper 集群才能正式对外提供事务服务,并且leader 可以进行消息广播。同时如果有新的节点加入,还需要对新节点进行同步

ZooKeeper 集群部署

案例:

  • 二进制编译部署
[root@ubuntu2204 Script]#cat install_zookeeper_cluster.sh 
#!/bin/bash

ZK_VERSION=3.8.1
ZK_URL=https://archive.apache.org/dist/zookeeper/zookeeper-${ZK_VERSION}/apache-zookeeper-${ZK_VERSION}-bin.tar.gz

NODE1=192.168.122.2
NODE2=192.168.122.3
NODE3=192.168.122.4

. /etc/os-release

HOST=`hostname -I|awk '{print $1}'`

color () {
RES_COL=60
MOVE_TO_COL="echo -en \\033[${RES_COL}G"
SETCOLOR_SUCCESS="echo -en \\033[1;32m"
SETCOLOR_FAILURE="echo -en \\033[1;31m"
SETCOLOR_WARNING="echo -en \\033[1;33m"
SETCOLOR_NORMAL="echo -en \E[0m"
echo -n "$1" && $MOVE_TO_COL
echo -n "["
if [ $2 = "success" -o $2 = "0" ] ;then
${SETCOLOR_SUCCESS}
echo -n $" OK "
elif [ $2 = "failure" -o $2 = "1" ] ;then
${SETCOLOR_FAILURE}
echo -n $"FAILED"
else
${SETCOLOR_WARNING}
echo -n $"WARNING"
fi
${SETCOLOR_NORMAL}
echo -n "]"
echo
}

zk_myid () {
read -p "请输入node编号(默认为 1): " MYID

if [ -z "$MYID" ] ;then
MYID=1
elif [[ ! "$MYID" =~ ^[0-9]+$ ]];then
color "请输入正确的node编号!" 1
exit
else
true
fi
}


install_jdk() {
if [ $ID = 'centos' -o $ID = 'rocky' ];then
yum -y install java-1.8.0-openjdk-devel || { color "安装JDK失败!" 1; exit 1; }
else
apt update
apt install openjdk-11-jdk -y || { color "安装JDK失败!" 1; exit 1; }
#apt install openjdk-8-jdk -y || { color "安装JDK失败!" 1; exit 1; }
fi
java -version
}

install_zookeeper() {
if [ -f apache-zookeeper-${ZK_VERSION}-bin.tar.gz ] ;then
cp apache-zookeeper-${ZK_VERSION}-bin.tar.gz /usr/local/src/
else
wget -P /usr/local/src/ --no-check-certificate $ZK_URL || { color "下载失败!" 1 ;exit ; }
fi
tar xf /usr/local/src/${ZK_URL##*/} -C /usr/local
ln -s /usr/local/apache-zookeeper-*-bin/ /usr/local/zookeeper
echo 'PATH=/usr/local/zookeeper/bin:$PATH' > /etc/profile.d/zookeeper.sh
. /etc/profile.d/zookeeper.sh

mkdir -p /usr/local/zookeeper/data
echo $MYID > /usr/local/zookeeper/data/myid
cat > /usr/local/zookeeper/conf/zoo.cfg <<EOF
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper/data
clientPort=2181
maxClientCnxns=128
autopurge.snapRetainCount=3
autopurge.purgeInterval=24
server.1=${NODE1}:2888:3888
server.2=${NODE2}:2888:3888
server.3=${NODE3}:2888:3888
EOF
cat > /lib/systemd/system/zookeeper.service <<EOF
[Unit]
Description=zookeeper.service
After=network.target

[Service]
Type=forking
#Environment=/usr/local/zookeeper
ExecStart=/usr/local/zookeeper/bin/zkServer.sh start
ExecStop=/usr/local/zookeeper/bin/zkServer.sh stop
ExecReload=/usr/local/zookeeper/bin/zkServer.sh restart

[Install]
WantedBy=multi-user.target
EOF
systemctl daemon-reload
systemctl enable --now zookeeper.service
systemctl is-active zookeeper.service
if [ $? -eq 0 ] ;then
color "zookeeper 安装成功!" 0
else
color "zookeeper 安装失败!" 1
exit 1
fi
}

zk_myid

install_jdk

install_zookeeper
  • 安装后有自带脚本
[root@node1 ~]#ll  /usr/local/zookeeper/bin/
total 80
drwxr-xr-x 2 501 staff 4096 Jan 25 16:31 ./
drwxr-xr-x 8 root root 4096 Feb 2 08:01 ../
-rwxr-xr-x 1 501 staff 232 Jan 25 16:31 README.txt*
-rwxr-xr-x 1 501 staff 1978 Jan 25 16:31 zkCleanup.sh*
-rwxr-xr-x 1 501 staff 1115 Jan 25 16:31 zkCli.cmd*
-rwxr-xr-x 1 501 staff 1576 Jan 25 16:31 zkCli.sh*
-rwxr-xr-x 1 501 staff 1810 Jan 25 16:31 zkEnv.cmd*
-rwxr-xr-x 1 501 staff 3613 Jan 25 16:31 zkEnv.sh*
-rwxr-xr-x 1 501 staff 4559 Jan 25 16:31 zkServer-initialize.sh*
-rwxr-xr-x 1 501 staff 1243 Jan 25 16:31 zkServer.cmd*
-rwxr-xr-x 1 501 staff 11616 Jan 25 16:31 zkServer.sh*
-rwxr-xr-x 1 501 staff 988 Jan 25 16:31 zkSnapShotToolkit.cmd*
-rwxr-xr-x 1 501 staff 1377 Jan 25 16:31 zkSnapShotToolkit.sh*
-rwxr-xr-x 1 501 staff 987 Jan 25 16:31 zkSnapshotComparer.cmd*
-rwxr-xr-x 1 501 staff 1374 Jan 25 16:31 zkSnapshotComparer.sh*
-rwxr-xr-x 1 501 staff 996 Jan 25 16:31 zkTxnLogToolkit.cmd*
-rwxr-xr-x 1 501 staff 1385 Jan 25 16:31 zkTxnLogToolkit.sh*
  • 各节点部署Zookeeper需要注意的配置信息如下
[root@node1 ~]#grep -v "\^\#" /usr/local/zookeeper/conf/zoo.cfg
tickTime=2000 #服务器与服务器之间的单次心跳检测时间间隔,单位为毫秒
initLimit=10 #集群中leader服务器与follower服务器初始连接心跳次数,即多少个 2000 毫秒
syncLimit=5 #leader 与follower之间连接完成之后,后期检测发送和应答的心跳次数,如果该follower在设置的时间内(5*2000)不能与 leader 进行通信,那么此 follower将被视为不可用。
dataDir=/usr/local/zookeeper/data #自定义的zookeeper保存数据的目录
clientPort=2181 #客户端连接 Zookeeper 服务器的端口,Zookeeper会监听这个端口,接受客户端的访问请求
maxClientCnxns=128 #单个客户端IP 可以和zookeeper保持的连接数
autopurge.snapRetainCount=3 #3.4.0中的新增功能:启用后,ZooKeeper 自动清除功能,会将只保留此最新3个快照和相应的事务日志,并分别保留在dataDir 和dataLogDir中,删除其余部分,默认值为3,最小值为3
autopurge.purgeInterval=24 #3.4.0及之后版本,ZK提供了自动清理日志和快照文件的功能,这个参数指定了清理频率,单位是小时,需要配置一个1或更大的整数,默认是 0,表示不开启自动清理功能
-------------------------------------------------------
集群关键配置:
格式: server.MyID服务器唯一编号=服务器IP:Leader和Follower的数据同步端口(只有leader才会打开):Leader和Follower选举端口(L和F都有)
如果添加节点,只需要在所有节点上添加新节点的下面形式的配置行,在新节点创建myid文件,并重启所有节点服务即可
-------------------------------------------------------
server.1=192.168.122.2:2888:3888
server.2=192.168.122.3:2888:3888
server.3=192.168.122.4:2888:3888

#各个myid文件的内容要和zoo.cfg文件相匹配
[root@node1 ~]#cat /usr/local/zookeeper/data/myid
1
[root@node2 ~]#cat /usr/local/zookeeper/data/myid
2
[root@node3 ~]#cat /usr/local/zookeeper/data/myid
3
  • 各服务器启动 zookeeper
[root@node1 ~]#/usr/local/zookeeper/bin/zkServer.sh start
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... already running as process 3953.

[root@node2 ~]#/usr/local/zookeeper/bin/zkServer.sh start
[root@node3 ~]#/usr/local/zookeeper/bin/zkServer.sh start
  • 查看集群状态
#只有leader监听2888/tcp端口
[root@node1 ~]#ss -ntlp|grep java
LISTEN 0 50 *:2181 *:* users:(("java",pid=3953,fd=50))
LISTEN 0 50 *:33901 *:* users:(("java",pid=3953,fd=46))
LISTEN 0 50 [::ffff:192.168.122.2]:3888 *:* users:(("java",pid=3953,fd=59))
LISTEN 0 50 *:8080 *:* users:(("java",pid=3953,fd=54))

[root@node2 ~]#ss -ntlp|grep java
LISTEN 0 50 [::ffff:192.168.122.3]:3888 *:* users:(("java",pid=3467,fd=59))
LISTEN 0 50 *:8080 *:* users:(("java",pid=3467,fd=54))
LISTEN 0 50 *:33915 *:* users:(("java",pid=3467,fd=46))
LISTEN 0 50 *:2181 *:* users:(("java",pid=3467,fd=50))
LISTEN 0 50 [::ffff:192.168.122.3]:2888 *:* users:(("java",pid=3467,fd=61))

[root@node3 ~]#ss -ntlp|grep java
LISTEN 0 50 *:35597 *:* users:(("java",pid=3523,fd=46))
LISTEN 0 50 [::ffff:192.168.122.4]:3888 *:* users:(("java",pid=3523,fd=59))
LISTEN 0 50 *:8080 *:* users:(("java",pid=3523,fd=54))
LISTEN 0 50 *:2181 *:* users:(("java",pid=3523,fd=50))
  • 命令行客户端访问 ZooKeeper
#访问主节点命令行客户端
[root@node1 ~]#/usr/local/zookeeper/bin/zkCli.sh -server 192.168.122.3:2181
/usr/bin/java
Connecting to 192.168.122.3:2181
2023-02-02 08:37:08,017 [myid:] - INFO [main:o.a.z.Environment@98] - Client environment:zookeeper.version=3.8.1-74db005175a4ec545697012f9069cb9dcc8cdda7, built on 2023-01-25 16:31 UTC
2023-02-02 08:37:08,023 [myid:] - INFO [main:o.a.z.Environment@98] - Client environment:host.name=node1
2023-02-02 08:37:08,031 [myid:] - INFO [main:o.a.z.Environment@98] - Client environment:java.version=11.0.17
2023-02-02 08:37:08,033 [myid:] - INFO [main:o.a.z.Environment@98] - Client environment:java.vendor=Ubuntu
2023-02-02 08:37:08,034 [myid:] - INFO [main:o.a.z.Environment@98] - Client environment:java.home=/usr/lib/jvm/java-11-openjdk-amd64
2023-02-02 08:37:08,035 [myid:] - INFO [main:o.a.z.Environment@98] - Client environment:java.class.path=/usr/local/zookeeper/bin/../zookeeper-server/target/classes:/usr/local/zookeeper/bin/../build/classes:/usr/local/zookeeper/bin/../zookeeper-server/target/lib/*.jar:/usr/local/zookeeper/bin/../build/lib/*.jar:/usr/local/zookeeper/bin/../lib/zookeeper-prometheus-metrics-3.8.1.jar:/usr/local/zookeeper/bin/../lib/zookeeper-jute-3.8.1.jar:/usr/local/zookeeper/bin/../lib/zookeeper-3.8.1.jar:/usr/local/zookeeper/bin/../lib/snappy-java-1.1.7.7.jar:/usr/local/zookeeper/bin/../lib/slf4j-api-1.7.30.jar:/usr/local/zookeeper/bin/../lib/simpleclient_servlet-0.9.0.jar:/usr/local/zookeeper/bin/../lib/simpleclient_hotspot-0.9.0.jar:/usr/local/zookeeper/bin/../lib/simpleclient_common-0.9.0.jar:/usr/local/zookeeper/bin/../lib/simpleclient-0.9.0.jar:/usr/local/zookeeper/bin/../lib/netty-transport-native-unix-common-4.1.86.Final.jar:/usr/local/zookeeper/bin/../lib/netty-transport-native-epoll-4.1.86.Final.jar:/usr/local/zookeeper/bin/../lib/netty-transport-classes-epoll-4.1.86.Final.jar:/usr/local/zookeeper/bin/../lib/netty-transport-4.1.86.Final.jar:/usr/local/zookeeper/bin/../lib/netty-resolver-4.1.86.Final.jar:/usr/local/zookeeper/bin/../lib/netty-handler-4.1.86.Final.jar:/usr/local/zookeeper/bin/../lib/netty-common-4.1.86.Final.jar:/usr/local/zookeeper/bin/../lib/netty-codec-4.1.86.Final.jar:/usr/local/zookeeper/bin/../lib/netty-buffer-4.1.86.Final.jar:/usr/local/zookeeper/bin/../lib/metrics-core-4.1.12.1.jar:/usr/local/zookeeper/bin/../lib/logback-core-1.2.10.jar:/usr/local/zookeeper/bin/../lib/logback-classic-1.2.10.jar:/usr/local/zookeeper/bin/../lib/jline-2.14.6.jar:/usr/local/zookeeper/bin/../lib/jetty-util-ajax-9.4.49.v20220914.jar:/usr/local/zookeeper/bin/../lib/jetty-util-9.4.49.v20220914.jar:/usr/local/zookeeper/bin/../lib/jetty-servlet-9.4.49.v20220914.jar:/usr/local/zookeeper/bin/../lib/jetty-server-9.4.49.v20220914.jar:/usr/local/zookeeper/bin/../lib/jetty-security-9.4.49.v20220914.jar:/usr/local/zookeeper/bin/../lib/jetty-io-9.4.49.v20220914.jar:/usr/local/zookeeper/bin/../lib/jetty-http-9.4.49.v20220914.jar:/usr/local/zookeeper/bin/../lib/javax.servlet-api-3.1.0.jar:/usr/local/zookeeper/bin/../lib/jackson-databind-2.13.4.2.jar:/usr/local/zookeeper/bin/../lib/jackson-core-2.13.4.jar:/usr/local/zookeeper/bin/../lib/jackson-annotations-2.13.4.jar:/usr/local/zookeeper/bin/../lib/commons-io-2.11.0.jar:/usr/local/zookeeper/bin/../lib/commons-cli-1.5.0.jar:/usr/local/zookeeper/bin/../lib/audience-annotations-0.12.0.jar:/usr/local/zookeeper/bin/../zookeeper-*.jar:/usr/local/zookeeper/bin/../zookeeper-server/src/main/resources/lib/*.jar:/usr/local/zookeeper/bin/../conf:
2023-02-02 08:37:08,036 [myid:] - INFO [main:o.a.z.Environment@98] - Client environment:java.library.path=/usr/java/packages/lib:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
2023-02-02 08:37:08,037 [myid:] - INFO [main:o.a.z.Environment@98] - Client environment:java.io.tmpdir=/tmp
2023-02-02 08:37:08,038 [myid:] - INFO [main:o.a.z.Environment@98] - Client environment:java.compiler=<NA>
2023-02-02 08:37:08,038 [myid:] - INFO [main:o.a.z.Environment@98] - Client environment:os.name=Linux
2023-02-02 08:37:08,039 [myid:] - INFO [main:o.a.z.Environment@98] - Client environment:os.arch=amd64
2023-02-02 08:37:08,040 [myid:] - INFO [main:o.a.z.Environment@98] - Client environment:os.version=5.15.0-58-generic
2023-02-02 08:37:08,040 [myid:] - INFO [main:o.a.z.Environment@98] - Client environment:user.name=root
2023-02-02 08:37:08,041 [myid:] - INFO [main:o.a.z.Environment@98] - Client environment:user.home=/root
2023-02-02 08:37:08,041 [myid:] - INFO [main:o.a.z.Environment@98] - Client environment:user.dir=/root
2023-02-02 08:37:08,042 [myid:] - INFO [main:o.a.z.Environment@98] - Client environment:os.memory.free=11MB
2023-02-02 08:37:08,042 [myid:] - INFO [main:o.a.z.Environment@98] - Client environment:os.memory.max=247MB
2023-02-02 08:37:08,043 [myid:] - INFO [main:o.a.z.Environment@98] - Client environment:os.memory.total=15MB
2023-02-02 08:37:08,050 [myid:] - INFO [main:o.a.z.ZooKeeper@637] - Initiating client connection, connectString=192.168.122.3:2181 sessionTimeout=30000 watcher=org.apache.zookeeper.ZooKeeperMain$MyWatcher@27fe3806
2023-02-02 08:37:08,069 [myid:] - INFO [main:o.a.z.c.X509Util@77] - Setting -D jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS renegotiation
2023-02-02 08:37:08,082 [myid:] - INFO [main:o.a.z.ClientCnxnSocket@239] - jute.maxbuffer value is 1048575 Bytes
2023-02-02 08:37:08,100 [myid:] - INFO [main:o.a.z.ClientCnxn@1741] - zookeeper.request.timeout value is 0. feature enabled=false
Welcome to ZooKeeper!
JLine support is enabled
[zk: 192.168.122.3:2181(CONNECTING) 0] 2023-02-02 08:37:08,362 [myid:192.168.122.3:2181] - INFO [main-SendThread(192.168.122.3:2181):o.a.z.ClientCnxn$SendThread@1177] - Opening socket connection to server 192.168.122.3/192.168.122.3:2181.
2023-02-02 08:37:08,364 [myid:192.168.122.3:2181] - INFO [main-SendThread(192.168.122.3:2181):o.a.z.ClientCnxn$SendThread@1179] - SASL config status: Will not attempt to authenticate using SASL (unknown error)
2023-02-02 08:37:08,385 [myid:192.168.122.3:2181] - INFO [main-SendThread(192.168.122.3:2181):o.a.z.ClientCnxn$SendThread@1011] - Socket connection established, initiating session, client: /192.168.122.2:38766, server: 192.168.122.3/192.168.122.3:2181
2023-02-02 08:37:08,488 [myid:192.168.122.3:2181] - INFO [main-SendThread(192.168.122.3:2181):o.a.z.ClientCnxn$SendThread@1452] - Session establishment complete on server 192.168.122.3/192.168.122.3:2181, session id = 0x200001556ee0000, negotiated timeout = 30000

WATCHER::

WatchedEvent state:SyncConnected type:None path:null

#做增删改查
[zk: 192.168.122.3:2181(CONNECTED) 0]
[zk: 192.168.122.3:2181(CONNECTED) 0] ls /
[zookeeper]
[zk: 192.168.122.3:2181(CONNECTED) 1] ls /zookeeper
[config, quota]
[zk: 192.168.122.3:2181(CONNECTED) 2] get /zookeeper/config
server.1=192.168.122.2:2888:3888:participant
server.2=192.168.122.3:2888:3888:participant
server.3=192.168.122.4:2888:3888:participant
version=0
[zk: 192.168.122.3:2181(CONNECTED) 3] create /mooreyxia moore1data
Created /mooreyxia
[zk: 192.168.122.3:2181(CONNECTED) 5] ls /
[mooreyxia, zookeeper]
[zk: 192.168.122.3:2181(CONNECTED) 6] get /mooreyxia
moore1data
[zk: 192.168.122.3:2181(CONNECTED) 7] set /mooreyxia data
[zk: 192.168.122.3:2181(CONNECTED) 8] get /mooreyxia
data
[zk: 192.168.122.3:2181(CONNECTED) 9] stat /zookeeper
cZxid = 0x0
ctime = Thu Jan 01 00:00:00 UTC 1970
mZxid = 0x0
mtime = Thu Jan 01 00:00:00 UTC 1970
pZxid = 0x0
cversion = -2
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 2
[zk: 192.168.122.3:2181(CONNECTED) 10] quit

WATCHER::

WatchedEvent state:Closed type:None path:null
2023-02-02 08:51:28,928 [myid:] - INFO [main:o.a.z.ZooKeeper@1232] - Session: 0x200001556ee0000 closed
2023-02-02 08:51:28,928 [myid:] - INFO [main-EventThread:o.a.z.ClientCnxn$EventThread@568] - EventThread shut down for session: 0x200001556ee0000
2023-02-02 08:51:28,937 [myid:] - INFO [main:o.a.z.u.ServiceUtils@45] - Exiting JVM with code 0
  • nc 访问 ZooKeeper
  • ZooKeeper支持某些特定的四字命令字母与其的交互。它们大多是查询命令,用来获取 ZooKeeper服务的当前状态及相关信息。用户在客户端可以通过 netcat 或telnet向zookeeper发送下面命令
#各节点解开安全限制
[root@node1 ~]#cat /usr/local/zookeeper/conf/zoo.cfg
...
server.1=192.168.122.2:2888:3888
server.2=192.168.122.3:2888:3888
server.3=192.168.122.4:2888:3888
4lw.commands.whitelist=* #加入命令执行白名单

#在服务状态查看命令中有很多存在隐患的命令,所以为了避免生产中因为这些命令的安全隐患,所以我们要对这些命令进行一些安全限制,只需要编辑服务的zoo.cfg文件即可
4lw.commands.whitelist=stat,ruok,conf,isro

常见命令列表

conf #输出相关服务配置的详细信息
cons #列出所有连接到服务器的客户端的完全的连接/会话的详细信息
envi #输出关于服务环境的详细信息
dump #列出未经处理的会话和临时节点
stat #查看哪个节点被选择作为Follower或者Leader
ruok #测试是否启动了该Server,若回复imok表示已经启动
mntr #输出一些运行时信息
reqs #列出未经处理的请求
wchs #列出服务器watch的简要信息
wchc #通过session列出服务器watch的详细信息
wchp #通过路径列出服务器watch的详细信息
srvr #输出服务的所有信息
srst #重置服务器统计信息
kill #关掉Server
isro #查看该服务的节点权限信息

#查看节点服务状态
[root@node1 ~]#echo stat|nc 127.0.0.1 2181
Zookeeper version: 3.8.1-74db005175a4ec545697012f9069cb9dcc8cdda7, built on 2023-01-25 16:31 UTC
Clients:
/127.0.0.1:55830[0](queued=0,recved=1,sent=0)

Latency min/avg/max: 0/0.0/0
Received: 1
Sent: 0
Connections: 1
Outstanding: 0
Zxid: 0x100000004
Mode: follower
Node count: 6

#查看节点服务配置
[root@node1 ~]#echo conf|nc 127.0.0.1 2181
clientPort=2181
secureClientPort=-1
dataDir=/usr/local/zookeeper/data/version-2
dataDirSize=67109483
dataLogDir=/usr/local/zookeeper/data/version-2
dataLogSize=67109483
tickTime=2000
maxClientCnxns=128
minSessionTimeout=4000
maxSessionTimeout=40000
clientPortListenBacklog=-1
serverId=1
initLimit=10
syncLimit=5
electionAlg=3
electionPort=3888
quorumPort=2888
peerType=0
membership:
server.1=192.168.122.2:2888:3888:participant
server.2=192.168.122.3:2888:3888:participant
server.3=192.168.122.4:2888:3888:participant
version=0

#查看节点服务环境
[root@node1 ~]#echo envi | nc 127.0.0.1 2181
Environment:
zookeeper.version=3.8.1-74db005175a4ec545697012f9069cb9dcc8cdda7, built on 2023-01-25 16:31 UTC
host.name=node1
java.version=11.0.17
java.vendor=Ubuntu
java.home=/usr/lib/jvm/java-11-openjdk-amd64
java.class.path=/usr/local/zookeeper/bin/../zookeeper-server/target/classes:/usr/local/zookeeper/bin/../build/classes:/usr/local/zookeeper/bin/../zookeeper-server/target/lib/*.jar:/usr/local/zookeeper/bin/../build/lib/*.jar:/usr/local/zookeeper/bin/../lib/zookeeper-prometheus-metrics-3.8.1.jar:/usr/local/zookeeper/bin/../lib/zookeeper-jute-3.8.1.jar:/usr/local/zookeeper/bin/../lib/zookeeper-3.8.1.jar:/usr/local/zookeeper/bin/../lib/snappy-java-1.1.7.7.jar:/usr/local/zookeeper/bin/../lib/slf4j-api-1.7.30.jar:/usr/local/zookeeper/bin/../lib/simpleclient_servlet-0.9.0.jar:/usr/local/zookeeper/bin/../lib/simpleclient_hotspot-0.9.0.jar:/usr/local/zookeeper/bin/../lib/simpleclient_common-0.9.0.jar:/usr/local/zookeeper/bin/../lib/simpleclient-0.9.0.jar:/usr/local/zookeeper/bin/../lib/netty-transport-native-unix-common-4.1.86.Final.jar:/usr/local/zookeeper/bin/../lib/netty-transport-native-epoll-4.1.86.Final.jar:/usr/local/zookeeper/bin/../lib/netty-transport-classes-epoll-4.1.86.Final.jar:/usr/local/zookeeper/bin/../lib/netty-transport-4.1.86.Final.jar:/usr/local/zookeeper/bin/../lib/netty-resolver-4.1.86.Final.jar:/usr/local/zookeeper/bin/../lib/netty-handler-4.1.86.Final.jar:/usr/local/zookeeper/bin/../lib/netty-common-4.1.86.Final.jar:/usr/local/zookeeper/bin/../lib/netty-codec-4.1.86.Final.jar:/usr/local/zookeeper/bin/../lib/netty-buffer-4.1.86.Final.jar:/usr/local/zookeeper/bin/../lib/metrics-core-4.1.12.1.jar:/usr/local/zookeeper/bin/../lib/logback-core-1.2.10.jar:/usr/local/zookeeper/bin/../lib/logback-classic-1.2.10.jar:/usr/local/zookeeper/bin/../lib/jline-2.14.6.jar:/usr/local/zookeeper/bin/../lib/jetty-util-ajax-9.4.49.v20220914.jar:/usr/local/zookeeper/bin/../lib/jetty-util-9.4.49.v20220914.jar:/usr/local/zookeeper/bin/../lib/jetty-servlet-9.4.49.v20220914.jar:/usr/local/zookeeper/bin/../lib/jetty-server-9.4.49.v20220914.jar:/usr/local/zookeeper/bin/../lib/jetty-security-9.4.49.v20220914.jar:/usr/local/zookeeper/bin/../lib/jetty-io-9.4.49.v20220914.jar:/usr/local/zookeeper/bin/../lib/jetty-http-9.4.49.v20220914.jar:/usr/local/zookeeper/bin/../lib/javax.servlet-api-3.1.0.jar:/usr/local/zookeeper/bin/../lib/jackson-databind-2.13.4.2.jar:/usr/local/zookeeper/bin/../lib/jackson-core-2.13.4.jar:/usr/local/zookeeper/bin/../lib/jackson-annotations-2.13.4.jar:/usr/local/zookeeper/bin/../lib/commons-io-2.11.0.jar:/usr/local/zookeeper/bin/../lib/commons-cli-1.5.0.jar:/usr/local/zookeeper/bin/../lib/audience-annotations-0.12.0.jar:/usr/local/zookeeper/bin/../zookeeper-*.jar:/usr/local/zookeeper/bin/../zookeeper-server/src/main/resources/lib/*.jar:/usr/local/zookeeper/bin/../conf:
java.library.path=/usr/java/packages/lib:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
java.io.tmpdir=/tmp
java.compiler=<NA>
os.name=Linux
os.arch=amd64
os.version=5.15.0-58-generic
user.name=root
user.home=/root
user.dir=/
os.memory.free=5MB
os.memory.max=966MB
os.memory.total=15MB

Kafka

简介:

  • Kafka 被称为下一代分布式消息系统,由 Scala 和 Java编写,是非营利性组织ASF(Apache SoftwareFoundation)基金会中的一个开源项目,比如:HTTP Server、Tomcat、Hadoop、ActiveMQ等开源软件都属于 Apache基金会的开源软件,类似的消息系统还有RabbitMQ、ActiveMQ、ZeroMQ。
  • Kafka用于构建实时数据管道和流应用程序。
  • kafka 相对于其他MQ产品的最主要的优势是其具备分布式功能、并可以结合 zookeeper 可以实现动态扩容,Kafka 是一种高吞吐量的分布式发布订阅消息系统。

以下是优势和特点总结:

特点:
1. 分布式: 多机实现,不允许单机
2. 分区: 一个消息.可以拆分出多个,分别存储在多个位置 --> 实现负载均衡
3. 多副本: 防止信息丢失,可以多来几个备份 --> 实现高可用
4. 多订阅者: 可以有很多应用连接kafka
5. Zookeeper: 早期版本的Kafka依赖于zookeeper, 2021年4月19日Kafka 2.8.0正式发布,此版本包括了很多重要改动,最主要的是kafka通过自我管理的仲裁来替代ZooKeeper,即Kafka将不再需要ZooKeeper!

优势:
1. Kafka 通过 O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以 TB 级别以上的消息存储也能够保持长时间的稳定性能。
2. 高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息。支持通过Kafka 服务器分区消息。
3. 分布式: Kafka 基于分布式集群实现高可用的容错机制,可以实现自动的故障转移
4. 顺序保证:在大多数使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。 Kafka保证一个Partiton内的消息的有序性(分区间数据是无序的,如果对数据的顺序有要求,应将在创建主题时将分区数partitions设置为1)
5. 支持 Hadoop 并行数据加载
6. 通常用于大数据场合,传递单条消息比较大,而Rabbitmq 消息主要是传输业务的指令数据,单条数据较小

Kafka角色图解

57-Zookeeper集群和kafka消息队列集群

  • Producer:生产者,消息的产生者,是消息的入口。负责发布消息到Kafka broker。
  • Consumer:消费者,用于消费消息,即处理消息
  • Consumer group: 每个consumer 属于一个特定的consumer group(可为每个consumer 指定 groupname,若不指定 group name 则属于默认的group),使用 consumer high level API 时,同一topic的一条消息只能被同一个consumer group 内的一个consumer 消费,但多个consumer group 可同时消费这一消息。
  • Broker:Broker是kafka实例,每个服务器上可以有一个或多个kafka的实例,假设每个broker对应一台服务器。每个kafka集群内的broker都有一个不重复的编号,如: broker-0、broker-1等……
  • Topic :消息的主题,可以理解为消息的分类,相当于Redis的Key和ES中的索引,kafka的数据就保存在topic。在每个broker上都可以创建多个topic。物理上不同 topic 的消息分开存储在不同的文件夹,逻辑上一个 topic的消息虽然保存于一个或多个broker 上, 但用户只需指定消息的topic即可生产或消费数据而不必关心数据存于何处,topic 在逻辑上对record(记录、日志)进行分组保存,消费者需要订阅相应的topic 才能消费topic中的消息。
  • Replication: 同样数据的副本,包括leader和follower的副本数,基于数据安全,建议至少2个,是Kafka的高可靠性的保障,和ES不同的,ES中的副本数不包括主分片数
  • Partition :是物理上的概念,每个topic 分割为一个或多个partition,即一个topic切分为多份.创建topic时可指定 parition 数量,partition的表现形式就是一个一个的文件夹,该文件夹下存储该partition的数据和索引文件,分区的作用还可以实现负载均衡,提高kafka的吞吐量。同一个topic在不同的分区的数据是不重复的,一般Partition数不要超过节点数,注意同一个partition数据是有顺序的,但不同的partition则是无序的。为了实现数据的高可用,比如将分区 0 的数据分散到不同的kafka 节点,每一个分区都有一个 broker 作为 Leader 和一个 broker 作为Follower,类似于ES中的主分片和副本分片,
  • AR: Assigned Replicas,分区中的所有副本的统称,AR= lSR+ OSR
  • lSR:ln Sync Replicas,所有与leader副本保持同步的副本 follower和leader本身组成的集合,是AR的子集
  • OSR:out-of-Sync Replied,所有与leader副本同步不能同步的 follower的集合,是AR的子集

分区的优势:

  • 实现存储空间的横向扩容,即将多个kafka服务器的空间结合利用
  • 提升性能,多服务器读写
  • 实现高可用,分区leader 分布在不同的kafka 服务器,假设分区因子为 3, 分区 0 的leader为服务器A,则服务器 B 和服务器 C 为 A 的follower,而分区 1 的leader为服务器B,则服务器 A 和C 为服务器B 的follower,而分区 2 的leader 为C,则服务器A 和 B 为C 的follower。

57-Zookeeper集群和kafka消息队列集群

Kafka部署

案例:

  • 环境说明
#在三个Ubuntu2204节点提前部署zookeeper和kafka三个节点复用
node1:192.168.122.2
node2:192.168.122.3
node3:192.168.122.4
*注意:生产中zookeeper和kafka一般是分开独立部署的,kafka安装前需要安装java环境

#确保三个节点的zookeeper启动
[root@node1 ~]#/usr/local/zookeeper/bin/zkServer.sh status
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower

[root@node2 ~]#/usr/local/zookeeper/bin/zkServer.sh status
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: leader

[root@node3 ~]#/usr/local/zookeeper/bin/zkServer.sh status
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower
  • 下载kafka前确认版本
  • kafka 基于scala语言实现,所以使用kafka需要指定scala的相应的版本.kafka 为多个版本的Scala构建。
#这里实验我用最新版kafka_2.13-3.3.2.tgz
#kafka版本格式
kafka_<scala 版本>_<kafka 版本>
例:kafka_2.13-2.7.0.tgz
  • 各节点部署 Kafka
#在所有节点上执行安装java
[root@node1 ~]#apt install openjdk-8-jdk -y
#在所有节点上执行下载,官方下载
[root@node1 ~]#wget https://downloads.apache.org/kafka/3.3.1/kafka_2.13-3.3.2.tgz
#国内镜像下载
[root@node1 ~]#wget
https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/3.3.1/kafka_2.13-3.3.2.tgz
#解压缩
[root@node1 ~]#tar xf kafka_2.13-2.3.2.tgz -C /usr/local/
[root@node1 ~]#ln -s /usr/local/kafka_2.13-2.3.2/ /usr/local/kafka
#配置PATH变量
[root@node1 ~]#echo 'PATH=/usr/local/kafka/bin:$PATH' > /etc/profile.d/kafka.sh
[root@node1 ~]#. /etc/profile.d/kafka.sh
#修改配置文件
[root@node1 ~]#vim /usr/local/kafka/config/server.properties
broker.id=1 #每个broker在集群中每个节点的正整数唯一标识,此值保存在log.dirs下的
meta.properties文件
listeners=PLAINTEXT://192.168.122.2:9092 #指定当前主机的IP做为监听地址,注意:不支持0.0.0.0
log.dirs=/usr/local/kafka/data #kakfa用于保存数据的目录,所有的消息都会存储在该目录当中
num.partitions=1 #设置创建新的topic时默认分区数量,建议和kafka的节点数量一致
default.replication.factor=3 #指定默认的副本数为3,可以实现故障的自动转移
log.retention.hours=168 #设置kafka中消息保留时间,默认为168小时即7天
zookeeper.connect=192.168.122.2:2181,192.168.122.3:2181,192.168.122.3:2181 #指定连接的zk的地址,zk中存储了broker的元数据信息
zookeeper.connection.timeout.ms=6000 #设置连接zookeeper的超时时间,单位为ms,默认6秒钟
#准备数据目录
[root@node1 ~]#mkdir /usr/local/kafka/data
[root@node1 ~]#scp /usr/local/kafka/config/server.properties 192.168.122.3:/usr/local/kafka/config
[root@node1 ~]#scp /usr/local/kafka/config/server.properties 192.168.122.4:/usr/local/kafka/config
#修改第2个节点配置
[root@node2 ~]#vim /usr/local/kafka/config/server.properties
broker.id=2 #每个broker 在集群中的唯一标识,正整数。
listeners=PLAINTEXT://192.168.122.3:9092 #指定当前主机的IP做为监听地址,注意:不支持0.0.0.0
#修改第3个节点配置
[root@node3 ~]#vim /usr/local/kafka/config/server.properties
broker.id=31 #每个broker 在集群中的唯一标识,正整数。
listeners=PLAINTEXT://192.168.122.4:9092 #指定当前主机的IP做为监听地址,注意:不支持0.0.0.0
  • 启动服务
#在所有kafka节点执行下面操作
[root@node1 ~]#vim vim /usr/local/kafka/bin/kafka-server-start.sh
if[ " x$KAFKA_HEAP_OPTS"="x"] ; then
export KAFKA_HEAP_OPTS=" -Xmx1G-Xms1G" #可以调整内存
fi
[root@node1 ~]#kafka-server-start.sh -daemon
/usr/local/kafka/config/server.properties
  • 确保服务启动状态
[root@node1 ~]#ss -ntl|grep 9092
LISTEN 0 50 [::ffff:192.168.122.2]:9092 *:*
[root@node1 ~]#tail /usr/local/kafka/logs/server.log
[2023-02-02 12:44:12,663] INFO [MetadataCache brokerId=1] Updated cache from existing <empty> to latest FinalizedFeaturesAndEpoch(features=Map(), epoch=0). (kafka.server.metadata.ZkMetadataCache)
[2023-02-02 12:44:12,678] INFO [ExpirationReaper-1-AlterAcls]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2023-02-02 12:44:12,699] INFO [/config/changes-event-process-thread]: Starting (kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread)
[2023-02-02 12:44:12,717] INFO [SocketServer listenerType=ZK_BROKER, nodeId=1] Enabling request processing. (kafka.network.SocketServer)
[2023-02-02 12:44:12,738] INFO Kafka version: 3.3.2 (org.apache.kafka.common.utils.AppInfoParser)
[2023-02-02 12:44:12,739] INFO Kafka commitId: b66af662e61082cb (org.apache.kafka.common.utils.AppInfoParser)
[2023-02-02 12:44:12,739] INFO Kafka startTimeMs: 1675341852723 (org.apache.kafka.common.utils.AppInfoParser)
[2023-02-02 12:44:12,741] INFO [KafkaServer id=1] started (kafka.server.KafkaServer)
[2023-02-02 12:44:12,946] INFO [BrokerToControllerChannelManager broker=1 name=forwarding]: Recorded new controller, from now on will use node 192.168.122.2:9092 (id: 1 rack: null) (kafka.server.BrokerToControllerRequestThread)
[2023-02-02 12:44:13,028] INFO [BrokerToControllerChannelManager broker=1 name=alterPartition]: Recorded new controller, from now on will use node 192.168.122.2:9092 (id: 1 rack: null) (kafka.server.BrokerToControllerRequestThread)

[root@node2 ~]#ss -ntl|grep 9092;tail /usr/local/kafka/logs/server.log
LISTEN 0 50 [::ffff:192.168.122.3]:9092 *:*
[2023-02-02 12:44:41,171] INFO [Transaction Marker Channel Manager 2]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager)
[2023-02-02 12:44:41,224] INFO [ExpirationReaper-2-AlterAcls]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2023-02-02 12:44:41,281] INFO [/config/changes-event-process-thread]: Starting (kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread)
[2023-02-02 12:44:41,305] INFO [SocketServer listenerType=ZK_BROKER, nodeId=2] Enabling request processing. (kafka.network.SocketServer)
[2023-02-02 12:44:41,324] INFO Kafka version: 3.3.2 (org.apache.kafka.common.utils.AppInfoParser)
[2023-02-02 12:44:41,324] INFO Kafka commitId: b66af662e61082cb (org.apache.kafka.common.utils.AppInfoParser)
[2023-02-02 12:44:41,325] INFO Kafka startTimeMs: 1675341881310 (org.apache.kafka.common.utils.AppInfoParser)
[2023-02-02 12:44:41,326] INFO [KafkaServer id=2] started (kafka.server.KafkaServer)
[2023-02-02 12:44:41,445] INFO [BrokerToControllerChannelManager broker=2 name=alterPartition]: Recorded new controller, from now on will use node 192.168.122.2:9092 (id: 1 rack: null) (kafka.server.BrokerToControllerRequestThread)
[2023-02-02 12:44:41,448] INFO [BrokerToControllerChannelManager broker=2 name=forwarding]: Recorded new controller, from now on will use node 192.168.122.2:9092 (id: 1 rack: null) (kafka.server.BrokerToControllerRequestThread)

[root@node3 ~]#ss -ntl|grep 9092;tail /usr/local/kafka/logs/server.log
LISTEN 0 50 [::ffff:192.168.122.4]:9092 *:*
[2023-02-02 12:44:51,451] INFO [TransactionCoordinator id=3] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator)
[2023-02-02 12:44:51,549] INFO [ExpirationReaper-3-AlterAcls]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2023-02-02 12:44:51,605] INFO [SocketServer listenerType=ZK_BROKER, nodeId=3] Enabling request processing. (kafka.network.SocketServer)
[2023-02-02 12:44:51,616] INFO [/config/changes-event-process-thread]: Starting (kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread)
[2023-02-02 12:44:51,642] INFO Kafka version: 3.3.2 (org.apache.kafka.common.utils.AppInfoParser)
[2023-02-02 12:44:51,642] INFO Kafka commitId: b66af662e61082cb (org.apache.kafka.common.utils.AppInfoParser)
[2023-02-02 12:44:51,642] INFO Kafka startTimeMs: 1675341891619 (org.apache.kafka.common.utils.AppInfoParser)
[2023-02-02 12:44:51,644] INFO [KafkaServer id=3] started (kafka.server.KafkaServer)
[2023-02-02 12:44:51,815] INFO [BrokerToControllerChannelManager broker=3 name=forwarding]: Recorded new controller, from now on will use node 192.168.122.2:9092 (id: 1 rack: null) (kafka.server.BrokerToControllerRequestThread)
[2023-02-02 12:44:51,892] INFO [BrokerToControllerChannelManager broker=3 name=alterPartition]: Recorded new controller, from now on will use node 192.168.122.2:9092 (id: 1 rack: null) (kafka.server.BrokerToControllerRequestThread)
  • 准备Kafka的service文件
[root@node1 ~]#cat /lib/systemd/system/kafka.service
[Unit]
Description=Apache kafka
After=network.target

[Service]
Type=simple
#Environment=JAVA_HOME=/data/server/java
#PIDFile=/usr/local/kafka/kafka.pid
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
ExecStop=/bin/kill -TERM ${MAINPID}
Restart=always
RestartSec=20

[Install]
WantedBy=multi-user.target

#启动service
[root@node1 ~]#systemctl daemon-load
[root@node1 ~]#systemctl start kafka
[root@node1 ~]#systemctl status kafka
● kafka.service - Apache kafka
Loaded: loaded (/lib/systemd/system/kafka.service; enabled; vendor preset: enabled)
Active: active (running) since Thu 2023-02-02 12:44:01 UTC; 13min ago
Main PID: 4233 (java)
Tasks: 72 (limit: 1029)
Memory: 343.8M
CPU: 14.347s
CGroup: /system.slice/kafka.service
└─4233 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -
Djava.awt.headless=true "-Xlog:gc*:file=/usr/local/kafka/bin/../logs/kafkaServer-gc.log:time,tags:filecount=10,filesize=100M" -Dcom.sun.management.jmxremote -Dcom.sun.management.
jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/usr/local/kafka/bin/../logs -Dlog4j.configuration=file:/usr/local/kafka/bin/../config/log4
j.properties -cp /usr/local/kafka/bin/../libs/activation-1.1.1.jar:/usr/local/kafka/bin/../libs/aopalliance-repackaged-2.6.1.jar:/usr/local/kafka/bin/../libs/argparse4j-0.7.0.jar
:/usr/local/kafka/bin/../libs/audience-annotations-0.5.0.jar:/usr/local/kafka/bin/../libs/commons-cli-1.4.jar:/usr/local/kafka/bin/../libs/commons-lang3-3.12.0.jar:/usr/local/kaf
ka/bin/../libs/commons-lang3-3.8.1.jar:/usr/local/kafka/bin/../libs/connect-api-3.3.2.jar:/usr/local/kafka/bin/../libs/connect-basic-auth-extension-3.3.2.jar:/usr/local/kafka/bin
/../libs/connect-json-3.3.2.jar:/usr/local/kafka/bin/../libs/connect-mirror-3.3.2.jar:/usr/local/kafka/bin/../libs/connect-mirror-client-3.3.2.jar:/usr/local/kafka/bin/../libs/co
nnect-runtime-3.3.2.jar:/usr/local/kafka/bin/../libs/connect-transforms-3.3.2.jar:/usr/local/kafka/bin/../libs/hk2-api-2.6.1.jar:/usr/local/kafka/bin/../libs/hk2-locator-2.6.1.ja
r:/usr/local/kafka/bin/../libs/hk2-utils-2.6.1.jar:/usr/local/kafka/bin/../libs/jackson-annotations-2.13.4.jar:/usr/local/kafka/bin/../libs/jackson-core-2.13.4.jar:/usr/local/kaf
ka/bin/../libs/jackson-databind-2.13.4.2.jar:/usr/local/kafka/bin/../libs/jackson-dataformat-csv-2.13.4.jar:/usr/local/kafka/bin/../libs/jackson-datatype-jdk8-2.13.4.jar:/usr/loc
al/kafka/bin/../libs/jackson-jaxrs-base-2.13.4.jar:/usr/local/kafka/bin/../libs/jackson-jaxrs-json-provider-2.13.4.jar:/usr/local/kafka/bin/../libs/jackson-module-jaxb-annotation
s-2.13.4.jar:/usr/local/kafka/bin/../libs/jackson-module-scala_2.13-2.13.4.jar:/usr/local/kafka/bin/../libs/jakarta.activation-api-1.2.2.jar:/usr/local/kafka/bin/../libs/jakarta.
annotation-api-1.3.5.jar:/usr/local/kafka/bin/../libs/jakarta.inject-2.6.1.jar:/usr/local/kafka/bin/../libs/jakarta.validation-api-2.0.2.jar:/usr/local/kafka/bin/../libs/jakarta.
ws.rs-api-2.1.6.jar:/usr/local/kafka/bin/../libs/jakarta.xml.bind-api-2.3.3.jar:/usr/local/kafka/bin/../libs/javassist-3.27.0-GA.jar:/usr/local/kafka/bin/../libs/javax.servlet-ap
i-3.1.0.jar:/usr/local/kafka/bin/../libs/javax.ws.rs-api-2.1.1.jar:/usr/local/kafka/bin/../libs/jaxb-api-2.3.0.jar:/usr/local/kafka/bin/../libs/jersey-client-2.34.jar:/usr/local/
kafka/bin/../libs/jersey-common-2.34.jar:/usr/local/kafka/bin/../libs/jersey-container-servlet-2.34.jar:/usr/local/kafka/bin/../libs/jersey-container-servlet-core-2.34.jar:/usr/l
ocal/kafka/bin/../libs/jersey-hk2-2.34.jar:/usr/local/kafka/bin/../libs/jersey-server-2.34.jar:/usr/local/kafka/bin/../libs/jetty-client-9.4.48.v20220622.jar:/usr/local/kafka/bin
/../libs/jetty-continuation-9.4.48.v20220622.jar:/usr/local/kafka/bin/../libs/jetty-http-9.4.48.v20220622.jar:/usr/local/kafka/bin/../libs/jetty-io-9.4.48.v20220622.jar:/usr/loca
l/kafka/bin/../libs/jetty-security-9.4.48.v20220622.jar:/usr/local/kafka/bin/../libs/jetty-server-9.4.48.v20220622.jar:/usr/local/kafka/bin/../libs/jetty-servlet-9.4.48.v20220622
.jar:/usr/local/kafka/bin/../libs/jetty-servlets-9.4.48.v20220622.jar:/usr/local/kafka/bin/../libs/jetty-util-9.4.48.v20220622.jar:/usr/local/kafka/bin/../libs/jetty-util-ajax-9.
4.48.v20220622.jar:/usr/local/kafka/bin/../libs/jline-3.21.0.jar:/usr/local/kafka/bin/../libs/jopt-simple-5.0.4.jar:/usr/local/kafka/bin/../libs/jose4j-0.7.9.jar:/usr/local/kafka
/bin/../libs/kafka-clients-3.3.2.jar:/usr/local/kafka/bin/../libs/kafka-log4j-appender-3.3.2.jar:/usr/local/kafka/bin/../libs/kafka-metadata-3.3.2.jar:/usr/local/kafka/bin/../lib
s/kafka-raft-3.3.2.jar:/usr/local/kafka/bin/../libs/kafka-server-common-3.3.2.jar:/usr/local/kafka/bin/../libs/kafka-shell-3.3.2.jar:/usr/local/kafka/bin/../libs/kafka-storage-3.
3.2.jar:/usr/local/kafka/bin/../libs/kafka-storage-api-3.3.2.jar:/usr/local/kafka/bin/../libs/kafka-streams-3.3.2.jar:/usr/local/kafka/bin/../libs/kafka-streams-examples-3.3.2.ja
r:/usr/local/kafka/bin/../libs/kafka-streams-scala_2.13-3.3.2.jar:/usr/local/kafka/bin/../libs/kafka-streams-test-utils-3.3.2.jar:/usr/local/kafka/bin/../libs/kafka-tools-3.3.2.j
ar:/usr/local/kafka/bin/../libs/kafka_2.13-3.3.2.jar:/usr/local/kafka/bin/../libs/lz4-java-1.8.0.jar:/usr/local/kafka/bin/../libs/maven-artifact-3.8.4.jar:/usr/local/kafka/bin/..
/libs/metrics-core-2.2.0.jar:/usr/local/kafka/bin/../libs/metrics-core-4.1.12.1.jar:/usr/local/kafka/bin/../libs/netty-buffer-4.1.78.Final.jar:/usr/local/kafka/bin/../libs/netty-
codec-4.1.78.Final.jar:/usr/local/kafka/bin/../libs/netty-common-4.1.78.Final.jar:/usr/local/kafka/bin/../libs/netty-handler-4.1.78.Final.jar:/usr/local/kafka/bin/../libs/netty-r
esolver-4.1.78.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-4.1.78.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-classes-epoll-4.1.78.Final.jar:/usr/local/kafk
a/bin/../libs/netty-transport-native-epoll-4.1.78.Final.jar:/usr/local/kafka/bin/../libs/netty-transport-native-unix-common-4.1.78.Final.jar:/usr/local/kafka/bin/../libs/osgi-res
ource-locator-1.0.3.jar:/usr/local/kafka/bin/../libs/paranamer-2.8.jar:/usr/local/kafka/bin/../libs/plexus-utils-3.3.0.jar:/usr/local/kafka/bin/../libs/reflections-0.9.12.jar:/us
r/local/kafka/bin/../libs/reload4j-1.2.19.jar:/usr/local/kafka/bin/../libs/rocksdbjni-7.1.2.jar:/usr/local/kafka/bin/../libs/scala-collection-compat_2.13-2.6.0.jar:/usr/local/kaf
ka/bin/../libs/scala-java8-compat_2.13-1.0.2.jar:/usr/local/kafka/bin/../libs/scala-library-2.13.8.jar:/usr/local/kafka/bin/../libs/scala-logging_2.13-3.9.4.jar:/usr/local/kafka/
bin/../libs/scala-reflect-2.13.8.jar:/usr/local/kafka/bin/../libs/slf4j-api-1.7.36.jar:/usr/local/kafka/bin/../libs/slf4j-reload4j-1.7.36.jar:/usr/local/kafka/bin/../libs/snappy-
java-1.1.8.4.jar:/usr/local/kafka/bin/../libs/swagger-annotations-2.2.0.jar:/usr/local/kafka/bin/../libs/trogdor-3.3.2.jar:/usr/local/kafka/bin/../libs/zookeeper-3.6.3.jar:/usr/l
ocal/kafka/bin/../libs/zookeeper-jute-3.6.3.jar:/usr/local/kafka/bin/../libs/zstd-jni-1.5.2-1.jar kafka.Kafka /usr/local/kafka/config/server.properties

Feb 02 12:44:12 node1 kafka-server-start.sh[4233]: [2023-02-02 12:44:12,663] INFO [MetadataCache brokerId=1] Updated cache from existing <empty> to latest FinalizedFeaturesAndEpo
ch(features=Map(), epoch=0). (kafka.server.metadata.ZkMetadataCache)
Feb 02 12:44:12 node1 kafka-server-start.sh[4233]: [2023-02-02 12:44:12,678] INFO [ExpirationReaper-1-AlterAcls]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperatio
nReaper)
Feb 02 12:44:12 node1 kafka-server-start.sh[4233]: [2023-02-02 12:44:12,699] INFO [/config/changes-event-process-thread]: Starting (kafka.common.ZkNodeChangeNotificationListener$
ChangeEventProcessThread)
Feb 02 12:44:12 node1 kafka-server-start.sh[4233]: [2023-02-02 12:44:12,717] INFO [SocketServer listenerType=ZK_BROKER, nodeId=1] Enabling request processing. (kafka.network.Sock
etServer)
Feb 02 12:44:12 node1 kafka-server-start.sh[4233]: [2023-02-02 12:44:12,738] INFO Kafka version: 3.3.2 (org.apache.kafka.common.utils.AppInfoParser)
Feb 02 12:44:12 node1 kafka-server-start.sh[4233]: [2023-02-02 12:44:12,739] INFO Kafka commitId: b66af662e61082cb (org.apache.kafka.common.utils.AppInfoParser)
Feb 02 12:44:12 node1 kafka-server-start.sh[4233]: [2023-02-02 12:44:12,739] INFO Kafka startTimeMs: 1675341852723 (org.apache.kafka.common.utils.AppInfoParser)
Feb 02 12:44:12 node1 kafka-server-start.sh[4233]: [2023-02-02 12:44:12,741] INFO [KafkaServer id=1] started (kafka.server.KafkaServer)
Feb 02 12:44:12 node1 kafka-server-start.sh[4233]: [2023-02-02 12:44:12,946] INFO [BrokerToControllerChannelManager broker=1 name=forwarding]: Recorded new controller, from now o
n will use node 192.168.122.2:9092 (id: 1 rack: null) (kafka.server.BrokerToControllerRequestThread)
Feb 02 12:44:13 node1 kafka-server-start.sh[4233]: [2023-02-02 12:44:13,028] INFO [BrokerToControllerChannelManager broker=1 name=alterPartition]: Recorded new controller, from n
ow on will use node 192.168.122.2:9092 (id: 1 rack: null) (kafka.server.BrokerToControllerRequestThread)
  • 查看kafka安装文件
[root@node1 ~]#tree /usr/local/kafka
/usr/local/kafka
├── LICENSE
├── NOTICE
├── bin
│ ├── connect-distributed.sh
│ ├── connect-mirror-maker.sh
│ ├── connect-standalone.sh
│ ├── kafka-acls.sh
│ ├── kafka-broker-api-versions.sh
│ ├── kafka-cluster.sh
│ ├── kafka-configs.sh
│ ├── kafka-console-consumer.sh
│ ├── kafka-console-producer.sh
│ ├── kafka-consumer-groups.sh
│ ├── kafka-consumer-perf-test.sh
│ ├── kafka-delegation-tokens.sh
│ ├── kafka-delete-records.sh
│ ├── kafka-dump-log.sh
│ ├── kafka-features.sh
│ ├── kafka-get-offsets.sh
│ ├── kafka-leader-election.sh
│ ├── kafka-log-dirs.sh
│ ├── kafka-metadata-quorum.sh
│ ├── kafka-metadata-shell.sh
│ ├── kafka-mirror-maker.sh
│ ├── kafka-producer-perf-test.sh
│ ├── kafka-reassign-partitions.sh
│ ├── kafka-replica-verification.sh
│ ├── kafka-run-class.sh
│ ├── kafka-server-start.sh
│ ├── kafka-server-stop.sh
│ ├── kafka-storage.sh
│ ├── kafka-streams-application-reset.sh
│ ├── kafka-topics.sh
│ ├── kafka-transactions.sh
│ ├── kafka-verifiable-consumer.sh
│ ├── kafka-verifiable-producer.sh
│ ├── trogdor.sh
│ ├── windows
│ │ ├── connect-distributed.bat
│ │ ├── connect-standalone.bat
│ │ ├── kafka-acls.bat
│ │ ├── kafka-broker-api-versions.bat
│ │ ├── kafka-configs.bat
│ │ ├── kafka-console-consumer.bat
│ │ ├── kafka-console-producer.bat
│ │ ├── kafka-consumer-groups.bat
│ │ ├── kafka-consumer-perf-test.bat
│ │ ├── kafka-delegation-tokens.bat
│ │ ├── kafka-delete-records.bat
│ │ ├── kafka-dump-log.bat
│ │ ├── kafka-get-offsets.bat
│ │ ├── kafka-leader-election.bat
│ │ ├── kafka-log-dirs.bat
│ │ ├── kafka-metatada-quorum.bat
│ │ ├── kafka-mirror-maker.bat
│ │ ├── kafka-producer-perf-test.bat
│ │ ├── kafka-reassign-partitions.bat
│ │ ├── kafka-replica-verification.bat
│ │ ├── kafka-run-class.bat
│ │ ├── kafka-server-start.bat
│ │ ├── kafka-server-stop.bat
│ │ ├── kafka-storage.bat
│ │ ├── kafka-streams-application-reset.bat
│ │ ├── kafka-topics.bat
│ │ ├── kafka-transactions.bat
│ │ ├── zookeeper-server-start.bat
│ │ ├── zookeeper-server-stop.bat
│ │ └── zookeeper-shell.bat
│ ├── zookeeper-security-migration.sh
│ ├── zookeeper-server-start.sh
│ ├── zookeeper-server-stop.sh
│ └── zookeeper-shell.sh
├── config
│ ├── connect-console-sink.properties
│ ├── connect-console-source.properties
│ ├── connect-distributed.properties
│ ├── connect-file-sink.properties
│ ├── connect-file-source.properties
│ ├── connect-log4j.properties
│ ├── connect-mirror-maker.properties
│ ├── connect-standalone.properties
│ ├── consumer.properties
│ ├── kraft
│ │ ├── broker.properties
│ │ ├── controller.properties
│ │ └── server.properties
│ ├── log4j.properties
│ ├── producer.properties
│ ├── server.properties
│ ├── tools-log4j.properties
│ ├── trogdor.conf
│ └── zookeeper.properties
├── data
│ ├── cleaner-offset-checkpoint
│ ├── log-start-offset-checkpoint
│ ├── meta.properties
│ ├── recovery-point-offset-checkpoint
│ └── replication-offset-checkpoint
├── libs
│ ├── activation-1.1.1.jar
│ ├── aopalliance-repackaged-2.6.1.jar
│ ├── argparse4j-0.7.0.jar
│ ├── audience-annotations-0.5.0.jar
│ ├── commons-cli-1.4.jar
│ ├── commons-lang3-3.12.0.jar
│ ├── commons-lang3-3.8.1.jar
│ ├── connect-api-3.3.2.jar
│ ├── connect-basic-auth-extension-3.3.2.jar
│ ├── connect-file-3.3.2.jar
│ ├── connect-json-3.3.2.jar
│ ├── connect-mirror-3.3.2.jar
│ ├── connect-mirror-client-3.3.2.jar
│ ├── connect-runtime-3.3.2.jar
│ ├── connect-transforms-3.3.2.jar
│ ├── hk2-api-2.6.1.jar
│ ├── hk2-locator-2.6.1.jar
│ ├── hk2-utils-2.6.1.jar
│ ├── jackson-annotations-2.13.4.jar
│ ├── jackson-core-2.13.4.jar
│ ├── jackson-databind-2.13.4.2.jar
│ ├── jackson-dataformat-csv-2.13.4.jar
│ ├── jackson-datatype-jdk8-2.13.4.jar
│ ├── jackson-jaxrs-base-2.13.4.jar
│ ├── jackson-jaxrs-json-provider-2.13.4.jar
│ ├── jackson-module-jaxb-annotations-2.13.4.jar
│ ├── jackson-module-scala_2.13-2.13.4.jar
│ ├── jakarta.activation-api-1.2.2.jar
│ ├── jakarta.annotation-api-1.3.5.jar
│ ├── jakarta.inject-2.6.1.jar
│ ├── jakarta.validation-api-2.0.2.jar
│ ├── jakarta.ws.rs-api-2.1.6.jar
│ ├── jakarta.xml.bind-api-2.3.3.jar
│ ├── javassist-3.27.0-GA.jar
│ ├── javax.servlet-api-3.1.0.jar
│ ├── javax.ws.rs-api-2.1.1.jar
│ ├── jaxb-api-2.3.0.jar
│ ├── jersey-client-2.34.jar
│ ├── jersey-common-2.34.jar
│ ├── jersey-container-servlet-2.34.jar
│ ├── jersey-container-servlet-core-2.34.jar
│ ├── jersey-hk2-2.34.jar
│ ├── jersey-server-2.34.jar
│ ├── jetty-client-9.4.48.v20220622.jar
│ ├── jetty-continuation-9.4.48.v20220622.jar
│ ├── jetty-http-9.4.48.v20220622.jar
│ ├── jetty-io-9.4.48.v20220622.jar
│ ├── jetty-security-9.4.48.v20220622.jar
│ ├── jetty-server-9.4.48.v20220622.jar
│ ├── jetty-servlet-9.4.48.v20220622.jar
│ ├── jetty-servlets-9.4.48.v20220622.jar
│ ├── jetty-util-9.4.48.v20220622.jar
│ ├── jetty-util-ajax-9.4.48.v20220622.jar
│ ├── jline-3.21.0.jar
│ ├── jopt-simple-5.0.4.jar
│ ├── jose4j-0.7.9.jar
│ ├── kafka-clients-3.3.2.jar
│ ├── kafka-log4j-appender-3.3.2.jar
│ ├── kafka-metadata-3.3.2.jar
│ ├── kafka-raft-3.3.2.jar
│ ├── kafka-server-common-3.3.2.jar
│ ├── kafka-shell-3.3.2.jar
│ ├── kafka-storage-3.3.2.jar
│ ├── kafka-storage-api-3.3.2.jar
│ ├── kafka-streams-3.3.2.jar
│ ├── kafka-streams-examples-3.3.2.jar
│ ├── kafka-streams-scala_2.13-3.3.2.jar
│ ├── kafka-streams-test-utils-3.3.2.jar
│ ├── kafka-tools-3.3.2.jar
│ ├── kafka_2.13-3.3.2.jar
│ ├── lz4-java-1.8.0.jar
│ ├── maven-artifact-3.8.4.jar
│ ├── metrics-core-2.2.0.jar
│ ├── metrics-core-4.1.12.1.jar
│ ├── netty-buffer-4.1.78.Final.jar
│ ├── netty-codec-4.1.78.Final.jar
│ ├── netty-common-4.1.78.Final.jar
│ ├── netty-handler-4.1.78.Final.jar
│ ├── netty-resolver-4.1.78.Final.jar
│ ├── netty-transport-4.1.78.Final.jar
│ ├── netty-transport-classes-epoll-4.1.78.Final.jar
│ ├── netty-transport-native-epoll-4.1.78.Final.jar
│ ├── netty-transport-native-unix-common-4.1.78.Final.jar
│ ├── osgi-resource-locator-1.0.3.jar
│ ├── paranamer-2.8.jar
│ ├── plexus-utils-3.3.0.jar
│ ├── reflections-0.9.12.jar
│ ├── reload4j-1.2.19.jar
│ ├── rocksdbjni-7.1.2.jar
│ ├── scala-collection-compat_2.13-2.6.0.jar
│ ├── scala-java8-compat_2.13-1.0.2.jar
│ ├── scala-library-2.13.8.jar
│ ├── scala-logging_2.13-3.9.4.jar
│ ├── scala-reflect-2.13.8.jar
│ ├── slf4j-api-1.7.36.jar
│ ├── slf4j-reload4j-1.7.36.jar
│ ├── snappy-java-1.1.8.4.jar
│ ├── swagger-annotations-2.2.0.jar
│ ├── trogdor-3.3.2.jar
│ ├── zookeeper-3.6.3.jar
│ ├── zookeeper-jute-3.6.3.jar
│ └── zstd-jni-1.5.2-1.jar
├── licenses
│ ├── CDDL+GPL-1.1
│ ├── DWTFYWTPL
│ ├── argparse-MIT
│ ├── classgraph-MIT
│ ├── eclipse-distribution-license-1.0
│ ├── eclipse-public-license-2.0
│ ├── jline-BSD-3-clause
│ ├── jopt-simple-MIT
│ ├── paranamer-BSD-3-clause
│ ├── slf4j-MIT
│ └── zstd-jni-BSD-2-clause
├── logs
│ ├── controller.log
│ ├── kafka-authorizer.log
│ ├── kafka-request.log
│ ├── kafkaServer-gc.log
│ ├── log-cleaner.log
│ ├── server.log
│ └── state-change.log
└── site-docs
└── kafka_2.13-3.3.2-site-docs.tgz

9 directories, 214 files
  • Kafka 读写数据
#常见命令
kafka-topics.sh #消息的管理命令
kafka-console-producer.sh #生产者的模拟命令
kafka-console-consumer.sh #消费者的模拟命令

#创建 Topic
----------------------------------------------------------------------
创建名为 mooreyxia,partitions(分区)为3,replication(每个分区的副本数/每个分区的分区因子)为 2 的topic(主题)
----------------------------------------------------------------------
[root@node1 ~]#ll /usr/local/kafka/data/
total 20
drwxr-xr-x 2 root root 4096 Feb 2 13:05 ./
drwxr-xr-x 9 root root 4096 Feb 2 12:44 ../
-rw-r--r-- 1 root root 0 Feb 2 12:44 .lock
-rw-r--r-- 1 root root 0 Feb 2 12:44 cleaner-offset-checkpoint
-rw-r--r-- 1 root root 4 Feb 2 13:05 log-start-offset-checkpoint
-rw-r--r-- 1 root root 88 Feb 2 12:44 meta.properties
-rw-r--r-- 1 root root 4 Feb 2 13:05 recovery-point-offset-checkpoint
-rw-r--r-- 1 root root 0 Feb 2 12:44 replication-offset-checkpoint
[root@node1 ~]#kafka-topics.sh --create --topic mooreyxia --bootstrap-server 192.168.122.2:9092 --partitions 3 --replication-factor 2
Created topic mooreyxia.
#执行命令后生成mooreyxia-1 mooreyxia-2 两个文件
[root@node1 ~]#ll /usr/local/kafka/data/
total 32
drwxr-xr-x 4 root root 4096 Feb 2 13:07 ./
drwxr-xr-x 9 root root 4096 Feb 2 12:44 ../
-rw-r--r-- 1 root root 0 Feb 2 12:44 .lock
-rw-r--r-- 1 root root 0 Feb 2 12:44 cleaner-offset-checkpoint
-rw-r--r-- 1 root root 4 Feb 2 13:07 log-start-offset-checkpoint
-rw-r--r-- 1 root root 88 Feb 2 12:44 meta.properties
drwxr-xr-x 2 root root 4096 Feb 2 13:07 mooreyxia-1/
drwxr-xr-x 2 root root 4096 Feb 2 13:07 mooreyxia-2/
-rw-r--r-- 1 root root 32 Feb 2 13:07 recovery-point-offset-checkpoint
-rw-r--r-- 1 root root 32 Feb 2 13:07 replication-offset-checkpoint

[root@node1 ~]#du -sh /usr/local/kafka/data/mooreyxia*
8.0K /usr/local/kafka/data/mooreyxia-1
12K /usr/local/kafka/data/mooreyxia-2

#在其他节点上也生成了mooreyxia副本
[root@node2 ~]#du -sh /usr/local/kafka/data/mooreyxia*
12K /usr/local/kafka/data/mooreyxia-0
8.0K /usr/local/kafka/data/mooreyxia-2

[root@node3 ~]#du -sh /usr/local/kafka/data/mooreyxia*
8.0K /usr/local/kafka/data/mooreyxia-0
12K /usr/local/kafka/data/mooreyxia-1

#获取所有 Topic
[root@node1 ~]#kafka-topics.sh --list --bootstrap-server 192.168.122.2:9092
mooreyxia

#验证 Topic 详情
------------------------------------------------------------------
状态说明:mooreyxia 有三个分区分别为0、1、2,分区0的leader是2 (broker.id),分区 0 有2 个副本(2,3),并且状态都为 lsr(ln-sync,表示可以参加选举成为 leader)。
------------------------------------------------------------------
[root@node1 ~]#/usr/local/kafka/bin/kafka-topics.sh --describe --bootstrap-server 192.168.122.2:9092 --topic mooreyxia
Topic: mooreyxia TopicId: jB9c8LBEQmqfem-0HW-51A PartitionCount: 3 ReplicationFactor: 2 Configs:
Topic: mooreyxia Partition: 0 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: mooreyxia Partition: 1 Leader: 3 Replicas: 3,1 Isr: 3,1
Topic: mooreyxia Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1,2

Topic生成副本在各个分区,结构如下:

57-Zookeeper集群和kafka消息队列集群

  • 生产并消费 Topic
  • 消息者先生产消息,消费都后续才启动,也能收到之前生产的消息
  • 同一个消息在同一个group内的消费者只有被一个消费者消费,比如:共100条消息,在一个group内有A,B两个消费者,其中A消费50条,B消费另外的50条消息。从而实现负载均衡,不同group内的消费者则可以同时消费同一个消息
#生产topic - 发送消息命令格式:
kafka-console-producer.sh --broker-list <kafkaIP1>:<端口>,<kafkaIP2>:<端口> --topic <topic名称>
#例:
#交互式输入消息,按Ctrl+C退出
[root@node1 ~]#kafka-console-producer.sh --broker-list 192.168.122.2:9092,192.168.122.3:9092,192.168.122.4:9092 --topic mooreyxia
>This is mooreyxia
>welcome to my chanel

#消费topic - 接收消息命令格式:
#--from-beginning 表示消费历史消息可接受,默认只能收到消费后消息
#--consumer-property group.id=<组名称> 同一个消息在同一个group内的消费者只有被一个消费者消费
kafka-console-consumer.sh --bootstrap-server <host>:<post> --topic <topic名称> --from-beginning --consumer-property group.id=<组名称>
#例1
[root@node2 ~]#kafka-console-consumer.sh --topic mooreyxia --bootstrap-server 192.168.122.2:9092 --from-beginning
This is mooreyxia
welcome to my chanel
#例2
#先生产消息
[root@node1 ~]#kafka-console-producer.sh --broker-list 192.168.122.2:9092,192.168.122.3:9092,192.168.122.4:9092 --topic mooreyxia
>mooreyxia will come
#同一个组内的消费者再消费消息,两个消费者互相抢占
#node2消费了消息 node3就接收不到了
[root@node2 ~]#kafka-console-consumer.sh --topic mooreyxia --bootstrap-server 192.168.122.3:9092 --consumer-property group.id=1
mooreyxia will come

[root@node3 ~]#kafka-console-consumer.sh --topic mooreyxia --bootstrap-server 192.168.122.3:9092 --consumer-property group.id=1
  • 删除Topic
[root@node3 ~]#kafka-topics.sh --delete --bootstrap-server 192.168.122.2:2181,192.168.122.3:2181,192.168.122.4:2181 --topic mooreyxia

kafka在ZooKeeper里的存储结构

57-Zookeeper集群和kafka消息队列集群

图形工具 Offset Explorer (Kafka Tool)

下载链接:

https://www.kafkatool.com/download.html

57-Zookeeper集群和kafka消息队列集群

57-Zookeeper集群和kafka消息队列集群

我是moore,大家一起加油!

明天更新Redis-Cluster和Dubbo微服务管理,加油吧....