Centos7.5安装kafka集群

时间:2021-07-18 12:11:47

Tags: kafka

Centos7.5安装kafka集群

主机环境

基本配置:

节点数 3
操作系统 CentOS Linux release 7.5.1804 (Core)
内存 8GB

流程配置:

节点数 3
操作系统 CentOS Linux release 7.5.1804 (Core)
内存 16GB

注: 实际生产中按照需求分配内存,如果只是在vmvare中搭建虚拟机,内存可以调整为每台主机1-2GB即可

软件环境

软件 版本 下载地址
jdk jdk-8u172-linux-x64 点击下载
zookeeeper zookeeper-3.4.5-cdh5.14.2 点击下载
kafka kafka_2.12-1.1.0 点击下载
kafka-manager kafka-manager-1.3.3.17 源码下载

注: kafka-manager官方只提供源码下载,下载后编译:sbt clean dist,此编译需要x翻XX墙x才能完成,没有条件的话可以下载网友编译好的低一点的版本:kafka-manager-1.3.3.4

主机规划

3个节点角色规划如下:

主机名 CDHNode1 CDHNode2 CDHNode3
IP 192.168.223.201 192.168.223.202 192.168.223.203
zookeeper yes yes yes
kafka yes yes yes

注: ZooKeeper保持奇数个,如果需要高可用则不少于 3 个节点。具体原因,以后详叙。

主机安装前准备

  1. 关闭所有节点的 SELinux
sed -i 's/^SELINUX=.*$/SELINUX=disabled/g' /etc/selinux/config
setenforce 0
  1. 关闭所有节点防火墙 firewalld or iptables
systemctl disable firewalld;
systemctl stop firewalld;
systemctl disable iptables;
systemctl stop iptables;
  1. 开启所有节点时间同步 ntpdate
echo "*/5 * * * * /usr/sbin/ntpdate asia.pool.ntp.org | logger -t NTP" >> /var/spool/cron/root
  1. 设置所有节点语言编码以及时区
echo 'export TZ=Asia/Shanghai' >> /etc/profile
echo 'export LANG=en_US.UTF-8' >> /etc/profile
. /etc/profile
  1. 所有节点添加kafka用户
useradd -m kafka
echo '123456' | passwd --stdin kafka
# 设置PS1
su - kafka
echo 'export PS1="\u@\h:\$PWD>"' >> ~/.bash_profile
echo "alias mv='mv -i'
alias rm='rm -i'" >> ~/.bash_profile
. ~/.bash_profile
  1. 设置kafka用户之间免密登录 首先在CDHNode1主机生成秘钥
su - kafka
ssh-keygen -t rsa # 一直回车即可生成kafka用户的公钥和私钥
cd .ssh
vi id_rsa.pub # 去掉私钥末尾的主机名 kafka@CDHNode1
cat id_rsa.pub > authorized_keys
chmod 600 authorized_keys

压缩.ssh文件夹

su - kafka
zip -r ssh.zip .ssh

随后分发ssh.zip到CDHNode2-3主机kafka用户家目录解压即完成免密登录

  1. 主机内核参数优化以及最大文件打开数、最大进程数等参数优化 不同主机优化参数有可能不一样,故这里不作出具体优化方法,但如果kafka环境用于正式生产,必须优化,linux默认参数可能会导致kafka集群性能低下。

注: 以上操作需要使用 root 用户,到目前为止操作系统环境已经准备完成,以下开始正式安装,后面的操作如果不做特殊说明均使用 kafka 用户

安装jdk1.8

所有节点都需要安装,安装方式都一样 解压 jdk-8u172-linux-x64.tar.gz

tar zxvf jdk-8u172-linux-x64.tar.gz
mkdir -p /home/kafka/app
mv jdk-8u172-linux-x64 /home/kafka/app/jdk
rm -f jdk-8u172-linux-x64.tar.gz

配置环境变量 vi ~/.bash_profile 添加以下内容:

#java
export JAVA_HOME=/home/kafka/app/jdk
export CLASSPATH=.:$JAVA_HOME/lib:$CLASSPATH
export PATH=$PATH:$JAVA_HOME/bin:$JAVA_HOME/jre/bin

加载环境变量

. ~/.bash_profile

查看是否安装成功 java -version

java version "1.8.0_172"
Java(TM) SE Runtime Environment (build 1.8.0_172-b11)
Java HotSpot(TM) 64-Bit Server VM (build 25.172-b11, mixed mode)

如果出现以上结果证明安装成功。

安装zookeeper

首先在CDHNode1上安装

解压 zookeeper-3.4.5-cdh5.14.2.tar.gz

tar zxvf zookeeper-3.4.5-cdh5.14.2.tar.gz
mv zookeeper-3.4.5-cdh5.14.2 /home/kafka/app/zookeeper
rm -f zookeeper-3.4.5-cdh5.14.2.tar.gz

设置环境变量 vi ~/.bash_profile 添加以下内容:

#zk
export ZOOKEEPER_HOME=/home/kafka/app/zookeeper
export PATH=$PATH:$ZOOKEEPER_HOME/bin

加载环境变量

. ~/.bash_profile

添加配置文件 vi /home/kafka/app/zookeeper/conf/zoo.cfg 添加以下内容:

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
#数据文件目录与日志目录
dataDir=/home/kafka/data/zookeeper/zkdata
dataLogDir=/home/kafka/data/zookeeper/zkdatalog
# the port at which the clients will connect
clientPort=2181
#server.服务编号=主机名称:Zookeeper不同节点之间同步和通信的端口:选举端口(选举leader)
server.1=CDHNode1:2888:3888
server.2=CDHNode2:2888:3888
server.3=CDHNode3:2888:3888
# 节点变更时只需在此添加或者删除相应的节点(所有节点配置都需要修改),然后在启动新增或者停止删除的节点即可
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1

创建所需目录

mkdir -p /home/kafka/data/zookeeper/zkdata
mkdir -p /home/kafka/data/zookeeper/zkdatalog
mkdir -p /home/kafka/app/zookeeper/logs

添加myid vim /home/kafka/data/zookeeper/zkdata/myid,添加:

1

注: 此数字来源于zoo.cfg中配置 server.1=CDHNode1:2888:3888行server后面的1,故CDHNode2填写2,CDHNode3填写3

配置日志目录 vim /home/kafka/app/zookeeper/libexec/zkEnv.sh ,修改以下参数为:

ZOO_LOG_DIR="$ZOOKEEPER_HOME/logs"
ZOO_LOG4J_PROP="INFO,ROLLINGFILE"

注: /home/kafka/app/zookeeper/libexec/zkEnv.sh 与 /home/kafka/app/zookeeper/bin/zkEnv.sh 文件内容相同。启动脚本 /home/kafka/app/zookeeper/bin/zkServer.sh 会优先读取/home/kafka/app/zookeeper/libexec/zkEnv.sh,当其不存在时才会读取 /home/kafka/app/zookeeper/bin/zkEnv.sh

vim /home/kafka/app/zookeeper/conf/log4j.properties ,修改以下参数为:

zookeeper.root.logger=INFO, ROLLINGFILE
zookeeper.log.dir=/home/kafka/app/zookeeper/logs
log4j.appender.ROLLINGFILE=org.apache.log4j.RollingFileAppender

复制zookeeper到CDHNode2-3

scp ~/.bash_profile CDHNode2:/home/kafka
scp ~/.bash_profile CDHNode3:/home/kafka
scp -pr /home/kafka/app/zookeeper CDHNode2:/home/kafka/app
scp -pr /home/kafka/app/zookeeper CDHNode3:/home/kafka/app
ssh CDHNode2 "mkdir -p /home/kafka/data/zookeeper/zkdata;mkdir -p /home/kafka/data/zookeeper/zkdatalog;mkdir -p /home/kafka/app/zookeeper/logs"
ssh CDHNode2 "echo 2 > /home/kafka/data/zookeeper/zkdata/myid"
ssh CDHNode3 "mkdir -p /home/kafka/data/zookeeper/zkdata;mkdir -p /home/kafka/data/zookeeper/zkdatalog;mkdir -p /home/kafka/app/zookeeper/logs"
ssh CDHNode3 "echo 3 > /home/kafka/data/zookeeper/zkdata/myid"

启动zookeeper 3个节点均启动

/home/kafka/app/zookeeper/bin/zkServer.sh start

查看节点状态

/home/kafka/app/zookeeper/bin/zkServer.sh status

如果一个节点为leader,另2个节点为follower,则说明Zookeeper安装成功

查看进程

jps

其中 QuorumPeerMain 进程为zookeeper

停止zookeeper

/home/kafka/app/zookeeper/bin/zkServer.sh stop

安装kafka

首先在CDHNode1上安装

解压 kafka_2.12-1.1.0.tgz

tar zxvf kafka_2.12-1.1.0.tgz
mv kafka_2.12-1.1.0 /home/kafka/app/kafka
rm -f kafka_2.12-1.1.0.tgz

设置环境变量 vi ~/.bash_profile 添加以下内容:

#kafka
export KAFKA_HOME=/home/kafka/app/kafka
export PATH=$PATH:$KAFKA_HOME/bin

加载环境变量

. ~/.bash_profile

添加配置文件 vi /home/kafka/app/kafka/config/server.properties :

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License. # see kafka.server.KafkaConfig for additional details and defaults ############################# Server Basics ############################# # The id of the broker. This must be set to a unique integer for each broker.
broker.id=0
#当前机器在集群中的唯一标识,和zookeeper的myid性质一样,CDHNode1为0,CDHNode1为2,CDHNode3为2 ############################# Socket Server Settings ############################# # The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092 port=9092
#当前kafka对外提供服务的端口默认是9092
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092 # Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL # The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3
#这个是borker进行网络处理的线程数 # The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8
#这个是borker进行I/O处理的线程数 # The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=1024000
#发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能 # The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=1024000
#kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘 # The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
#这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小 ############################# Log Basics ############################# # A comma separated list of directories under which to store log files
log.dirs=/home/kafka/data/kafka/kafka-logs
#消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录
#如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个 # The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=3
#默认的分区数,一个topic默认1个分区数 # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1 ############################# Internal Topic Settings #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1 ############################# Log Flush Policy ############################# # Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis. # The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000 # The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000 ############################# Log Retention Policy ############################# # The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log. # The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168
#默认消息的最大持久化时间,168小时,7天 # A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
log.retention.bytes=1073741824
#日志数据存储的最大字节数,1GB
# log.retention.hours 与 log.retention.bytes无论哪个先达到都会触发 # The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
#这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件 # The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000 message.max.byte=5242880
#消息保存的最大值5M default.replication.factor=3
#kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务,必须小于等于集群节点数 replica.fetch.max.bytes=5242880
#取消息的最大直接数 ############################# Zookeeper ############################# # Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=CDHNode1:2181,CDHNode2:2181,CDHNode3:2181 # Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=10000 ############################# Group Coordinator Settings ############################# # The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0
  • 这里没有启用压缩,启用压缩后吞吐量应该会增加比较明显,具体开启方法以后研究

设置内存使用:vi /home/kafka/app/kafka/bin/kafka-server-start.sh

export KAFKA_HEAP_OPTS="-Xmx3G -Xms3G"

创建所需目录

mkdir -p /home/kafka/data/kafka/kafka-logs

复制kafka到CDHNode2-3

scp ~/.bash_profile CDHNode2:/home/kafka
scp ~/.bash_profile CDHNode3:/home/kafka
scp -pr /home/kafka/app/kafka CDHNode2:/home/kafka/app
scp -pr /home/kafka/app/kafka CDHNode3:/home/kafka/app
ssh CDHNode2 "mkdir -p /home/kafka/data/kafka/kafka-logs"
ssh CDHNode3 "mkdir -p /home/kafka/data/kafka/kafka-logs"

修改 /home/kafka/app/kafka/config/server.properties 中的 broker.id CDHNode2为:1 ,CDHNode3为:2

启动kafka 3个节点均启动

/home/kafka/app/kafka/bin/kafka-server-start.sh -daemon /home/kafka/app/kafka/config/server.properties
  • -daemon 后台服务的方式启动
  • 有时候启动时明明未绑定端口当时会提示:kafka.common.KafkaException: Socket server failed to bind to ...,导致kafka无法启动,原因大概是非正常退出,pid文件未删除,删除老的pid文件就可以正常启动了。

查看进程

jps

其中 Kafka 进程即为 kafka

停止kafka

/home/kafka/app/kafka/bin/kafka-server-stop.sh

kafka基本操作

kafka基本操作

创建toppic

/home/kafka/app/kafka/bin/kafka-topics.sh --zookeeper CDHNode1:2181,CDHNode2:2181,CDHNode3:2181 --create --topic test --replication-factor 3 --partitions 3
  • --replication-factor 指定副本数,必须小于等于集群节点数,否则报错
  • --partitions 指定分区数,如果设置3个分区,则最多3个消费者同时消费信息

查看toppic

/home/kafka/app/kafka/bin/kafka-topics.sh --zookeeper CDHNode1:2181,CDHNode2:2181,CDHNode3:2181 --describe --topic test
  • Leader:负责处理消息的读和写,Leader是从所有节点中随机选择的
  • Replicas:列出了所有的副本节点,不管节点是否在服务中。
  • Isr:是正在服务中的节点

查看所有toppic

/home/kafka/app/kafka/bin/kafka-topics.sh --zookeeper CDHNode1:2181,CDHNode2:2181,CDHNode3:2181 --list

生成消息 使用kafka自带工具

/home/kafka/app/kafka/bin/kafka-console-producer.sh --broker-list CDHNode1:9092,CDHNode2:9092,CDHNode3:9092 --topic test
  • 进入控制台输入消息然后回车就生成一条消息

消费消息 使用kafka自带工具 第一种方法:

/home/kafka/app/kafka/bin/kafka-console-consumer.sh --zookeeper CDHNode1:2181,CDHNode2:2181,CDHNode3:2181 --from-beginning --topic test
  • 不会记录消费者offset,每次都是从头开始

第二种方法:

/home/kafka/app/kafka/bin/kafka-console-consumer.sh --zookeeper CDHNode1:2181,CDHNode2:2181,CDHNode3:2181 --group test_group  --topic test
  • 指定消费者组,会记录消费者offset
  • --zookeeper CDHNode1:2181,CDHNode2:2181,CDHNode3:2181 可以使用 --bootstrap-server CDHNode1:9092,CDHNode2:9092,CDHNode3:9092 代替,相应的查看消费者组时 --zookeeper 也需要使用 --bootstrap-server 代替

第三种方法:

/home/kafka/app/kafka/bin/kafka-console-consumer.sh --bootstrap-server CDHNode1:9092,CDHNode2:9092,CDHNode3:9092 --offset 0 --partition 0 --group test_group --topic test
  • 默认消费会从最新开始,可以指定offset,指定offset后必须指定partition,offset 从 0开始,且指定offset只使用与新版的使用--bootstrap-server的消费者,使用zookeeper消费者不支持

注: kafka中数据的删除跟有没有消费者消费完全无关。数据的删除,只跟kafka broker的这两个配置有关:

log.retention.hours=720 #数据最多保存720小时
log.retention.bytes=10737418240 #数据最多10GB

查看所有消费者组

/home/kafka/app/kafka/bin/kafka-consumer-groups.sh --bootstrap-server CDHNode1:9092,CDHNode2:9092,CDHNode3:9092 --list
  • 结果中的 KafkaManagerOffsetCache 为系统内置组

查看消费者组消费信息

/home/kafka/app/kafka/bin/kafka-consumer-groups.sh --zookeeper CDHNode1:2181,CDHNode2:2181,CDHNode3:2181 --group test_group --describe
  • --zookeeper CDHNode1:2181,CDHNode2:2181,CDHNode3:2181 可以使用 --bootstrap-server CDHNode1:9092,CDHNode2:9092,CDHNode3:9092 代替,这里是使用 --zookeeper 还是 --bootstrap-server 由消费者组消费时的选择而定

查看topic偏移量最大(小)值

/home/kafka/app/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic test --time -1 --broker-list CDHNode1:9092,CDHNode2:9092,CDHNode3:9092 --partitions 2
  • time为-1时表示最大值,time为-2时表示最小值
  • 结果 test:2:0 ,中间数字为分区编号,最后的数字为结果

删除toppic

/home/kafka/app/kafka/bin/kafka-topics.sh --zookeeper CDHNode1:2181,CDHNode2:2181,CDHNode3:2181 --delete --topic test

删除消费者组

  1. 使用 --zookeeper 的消费者组:
$ $ZOOKEEPER_HOME/bin/zkCli.sh -server CDHNode1:2181,CDHNode2:2181,CDHNode3:2181
rmr /consumers/test_group
  1. 使用 --bootstrap-server的消费者组,使用命令:
$KAFKA_HOME/bin/kafka-consumer-groups.sh --bootstrap-server CDHNode1:9092,CDHNode2:9092,CDHNode3:9092 --delete --group test_group
  • 删除即时生效,但在kafka-manager管理界面还是会显示,只有重启集群才消失,应该是kafka-manager本身的问题

topic 分区负载均衡 在创建一个topic时,kafka尽量将partition均分在所有的brokers上,并且将replicas也j均分在不同的broker上。

每个partitiion的所有replicas叫做"assigned replicas","assigned replicas"中的第一个replicas叫"preferred replica",刚创建的topic一般"preferred replica"是leader。leader replica负责所有的读写。

但随着时间推移,broker可能会停机,会导致leader迁移,导致机群的负载不均衡。我们期望对topic的leader进行重新负载均衡,让partition选择"preferred replica"做为leader。

对所有Topics进行均衡操作

/home/kafka/app/kafka/bin/kafka-preferred-replica-election.sh --zookeeper CDHNode1:2181,CDHNode2:2181,CDHNode3:2181

对某个Topic进行操作,先创建分配json文件

{
"partitions":
[
{"topic":"test","partition": 2}
]
}

or

{
"partitions":
[
{"topic":"test","partition": 0},
{"topic":"test","partition": 1},
{"topic":"test","partition": 2}
]
}

执行

/home/kafka/app/kafka/bin/kafka-preferred-replica-election.sh --zookeeper CDHNode1:2181,CDHNode2:2181,CDHNode3:2181 --path-to-json-file topic.json

安装管理监控工具Kafka-Manager

CDHNode1上解压安装下载好的 kafka-manager-1.3.3.4.zip

unzip kafka-manager-1.3.3.4.zip
mv kafka-manager-1.3.3.4 /home/kafka/app/kafka-manager
mkdir -p /home/kafka/app/kafka-manager/logs
rm -f kafka-manager-1.3.3.4.zip

添加配置文件 vi /home/kafka/app/kafka-manager/conf/application.conf


# Copyright 2015 Yahoo Inc. Licensed under the Apache License, Version 2.0
# See accompanying LICENSE file. # This is the main configuration file for the application.
# ~~~~~ # Secret key
# ~~~~~
# The secret key is used to secure cryptographics functions.
# If you deploy your application to several instances be sure to use the same key!
play.crypto.secret="^<csmm5Fx4d=r2HEX8pelM3iBkFVv?k[mc;IZE<_Qoq8EkX_/7@Zt6dP05Pzea3U"
play.crypto.secret=${?APPLICATION_SECRET} # The application languages
# ~~~~~
play.i18n.langs=["en"] play.http.requestHandler = "play.http.DefaultHttpRequestHandler"
play.http.context = "/"
play.application.loader=loader.KafkaManagerLoader kafka-manager.zkhosts="CDHNode1:2181,CDHNode2:2181,CDHNode3:2181"
kafka-manager.zkhosts=${?ZK_HOSTS}
pinned-dispatcher.type="PinnedDispatcher"
pinned-dispatcher.executor="thread-pool-executor"
application.features=["KMClusterManagerFeature","KMTopicManagerFeature","KMPreferredReplicaElectionFeature","KMReassignPartitionsFeature"] akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = "INFO"
} # 开启http基本密码验证
basicAuthentication.enabled=true
basicAuthentication.username="admin"
basicAuthentication.password="password"
basicAuthentication.realm="Kafka-Manager" kafka-manager.consumer.properties.file=${?CONSUMER_PROPERTIES_FILE}

启动(nohup)

nohup /home/kafka/app/kafka-manager/bin/kafka-manager -Dconfig.file=/home/kafka/app/kafka-manager/conf/application.conf -Dhttp.port=8080 > /home/kafka/app/kafka-manager/logs/server.log 2>&1 &
  • -Dhttp.port=8080 指定端口,默认9000
  • -Dconfig.file 指定配置文件

停止

kill pid

访问

http://CDHNode1:8080

点击【Cluster】>【Add Cluster】打开如下添加集群的配置界面: 自定义集群名称和zookeeper地址 版本没有1.1.0,选择最新的0.10.1.0即可

其他broker的配置可以根据自己需要进行配置,默认情况下,点击【保存】时,会提示几个默认值为1的配置错误,需要配置为>=2的值。保存成功后,点击【Go to cluster view.】打开当前的集群界面。

关于kafka-manager的其他功能和操作可以参考官网:https://github.com/yahoo/kafka-manager。

开启kafka-server JMX 开启后kafka-manager能够显示更多监控信息,开启方法 vi $KAFKA_HOME/bin/kafka_server_start.sh, 添加以下参数

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
export JMX_PORT="9999"
fi

然后kafka-manager添加集群是勾选 Enable JMX Polling (Set JMX_PORT env variable before starting kafka server) 即可