一、简介
在Kafka0.9版本之前,Kafka集群时没有安全机制的。Kafka Client应用可以通过连接Zookeeper地址,例如zk1:2181:zk2:2181,zk3:2181等。来获取存储在Zookeeper中的Kafka元数据信息。拿到Kafka Broker地址后,连接到Kafka集群,就可以操作集群上的所有主题了。由于没有权限控制,集群核心的业务主题时存在风险的。
本文主要使用SASL+ACL
二、技术关键点
配置文件
修改broker启动所需的server.properties文件,你至少需要配置(或修改)以下这些参数:
listeners=SASL_PLAINTEXT://:9092
advertised.listeners=SASL_PLAINTEXT://$advertised_hostname:9092
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
#SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
allow.everyone.if.no.acl.found=true
super.users=User:admin
其他参数讲解,请参考链接:
https://www.cnblogs.com/xiao987334176/p/10065844.html
这里主要讲解几个重点参数
默认情况下,如果资源R没有关联acl,除了超级用户,没有用户允许访问。如果你想改变这种方式你可以做如下配置
allow.everyone.if.no.acl.found=true
什么意思呢?上面的配置已经启动了acl,除了超级用户之外,其他用户无法访问。那么问题就来了,在kafka集群中,其它节点需要同步数据,需要相互访问。
它默认会使用ANONYMOUS的用户名连接集群。在这种情况下,启动kafka集群,必然失败!所以这个参数一定要配置才行!
listeners=SASL_PLAINTEXT://:9092
这个参数,表示kafka监听的地址。此参数必须要配置,默认是注释掉的。默认会使用listeners=PLAINTEXT://:9092,但是我现在开启了SASL,必须使用SASL协议连接才行。
//:9092 这里虽然没有写IP地址,根据官方解释,它会监听所有IP。注意:这里只能是IP地址,不能是域名。否则启动时,会提示无法绑定IP。
advertised.listeners 这个参数,表示外部的连接地址。这里可以写域名,也可以写IP地址。建议使用域名,为什么呢?因为IP可能会变动,但是主机名是不会变动的。
所以在java代码里面写死,就可以了!注意:必须是SASL协议才行!
super.users=User:admin 表示启动超级用户admin,注意:此用户名不允许更改,否则使用生产模式时,会有异常!
启动脚本
bin/kafka-server-start.sh 这个是kafka的启动脚本,要使用ACL,需要增加一个参数才行。
有2种方法修改,这里分别介绍一下:
1. 增加环境变量KAFKA_OPTS(推荐)
先来看一下,默认的bin/kafka-server-start.sh的最后一行
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
只需要在最后一行的上面一行,添加一个环境变量即可
export KAFKA_OPTS="-Djava.security.auth.login.config=/kafka_2.12-2.1.0/config/kafka_cluster_jaas.conf"
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
2. 增加参数-Djava.security.auth.login.config
直接将最后一行修改为
exec $base_dir/kafka-run-class.sh -Djava.security.auth.login.config=/kafka_2.12-2.1.0/config/kafka_cluster_jaas.conf $EXTRA_ARGS kafka.Kafka "$@"
JAAS文件
kafka_cluster_jaas.conf
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password=""
user_admin=""
user_reader=""
user_writer="";
};
这个文件,是专门用来做认证的。用户名和密码的格式如下:
user_用户名="密码"
注意:对于超级用户,这几行是固定的
username="admin"
password=""
user_admin="admin"
这里指定的是admin用户密码为123456,密码可自行更改。
下面的,才是普通用户。最后一个用户,要有一个分号才行!
三、正式部署
环境介绍
本文采用的环境,参考以下链接
https://www.cnblogs.com/xiao987334176/p/10088497.html#autoid-3-0-0
使用了3台zookeeper和5台kafka。都是在一台服务器上面运行的!
其中zookeeper的镜像,不需要变动,直接启动即可。
但是kafka的镜像,需要重新构建,请看下面的内容。
创建镜像
创建空目录
mkdir /opt/kafka_cluster_acl
dockerfile
FROM ubuntu:16.04
# 修改更新源为阿里云
ADD sources.list /etc/apt/sources.list
ADD kafka_2.-2.1..tgz /
ADD kafka_cluster_jaas.conf /
# 安装jdk
RUN apt-get update && apt-get install -y openjdk--jdk --allow-unauthenticated && apt-get clean all EXPOSE
# 添加启动脚本
ADD run.sh .
RUN chmod run.sh
ENTRYPOINT [ "/run.sh"]
kafka_cluster_jaas.conf
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password=""
user_admin=""
user_reader=""
user_writer="";
};
run.sh
#!/bin/bash if [ -z $broker_id ];then
echo "broker_id变量不能为空"
exit
fi if [ -z $zookeeper ];then
echo "zookeeper变量不能为空"
exit
fi if [ -z $advertised_hostname ];then
echo "advertised_hostname变量不能为空"
exit
fi # 开启kafka acl验证
echo " listeners=SASL_PLAINTEXT://:9092
advertised.listeners=SASL_PLAINTEXT://$advertised_hostname:9092
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
#SASL_PLAINTEXT
sasl.enabled.mechanisms=PLAIN
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
allow.everyone.if.no.acl.found=true
super.users=User:admin " >> /kafka_2.12-2.1.0/config/server.properties cd /kafka_2.-2.1.
# 设置唯一id
sed -i "21s/0/$broker_id/" /kafka_2.-2.1./config/server.properties
# 设置zookeeper连接地址
sed -i "123s/localhost/$zookeeper/" /kafka_2.-2.1./config/server.properties # 配置启动脚本,最后一行之前添加环境变量
sed -i -e ""i'\export KAFKA_OPTS="-Djava.security.auth.login.config=/kafka_2.12-2.1.0/config/kafka_cluster_jaas.conf"' bin/kafka-server-start.sh # 添加配置文件
mv /kafka_cluster_jaas.conf /kafka_2.-2.1./config/ # 临时添加5条hosts
echo "172.168.0.5 kafka-1.default.svc.cluster.local" >> /etc/hosts
echo "172.168.0.6 kafka-2.default.svc.cluster.local" >> /etc/hosts
echo "172.168.0.7 kafka-3.default.svc.cluster.local" >> /etc/hosts
echo "172.168.0.8 kafka-4.default.svc.cluster.local" >> /etc/hosts
echo "172.168.0.9 kafka-5.default.svc.cluster.local" >> /etc/hosts # 启动kafka
bin/kafka-server-start.sh config/server.properties
注意:由于没有DNS,这里临时添加了5条hosts记录。5台kafka之间,必须要相互连通,否则会报错
WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
sources.list
deb http://mirrors.aliyun.com/ubuntu/ xenial main
deb-src http://mirrors.aliyun.com/ubuntu/ xenial main deb http://mirrors.aliyun.com/ubuntu/ xenial-updates main
deb-src http://mirrors.aliyun.com/ubuntu/ xenial-updates main deb http://mirrors.aliyun.com/ubuntu/ xenial universe
deb-src http://mirrors.aliyun.com/ubuntu/ xenial universe
deb http://mirrors.aliyun.com/ubuntu/ xenial-updates universe
deb-src http://mirrors.aliyun.com/ubuntu/ xenial-updates universe deb http://mirrors.aliyun.com/ubuntu/ xenial-security main
deb-src http://mirrors.aliyun.com/ubuntu/ xenial-security main
deb http://mirrors.aliyun.com/ubuntu/ xenial-security universe
deb-src http://mirrors.aliyun.com/ubuntu/ xenial-security universe
此时,目录结构如下:
./
├── dockerfile
├── kafka_2.-2.1..tgz
├── kafka_cluster_jaas.conf
├── run.sh
└── sources.list
生成镜像
docker build -t kafka_cluster_acl /opt/kafka_cluster_acl
启动镜像
请确保已经启动了3台zookeeper的镜像!
第一个kafka节点
docker run -it -p : -e broker_id= -e zookeeper=172.168.0.2:,172.168.0.3:,172.168.0.4: -e advertised_hostname=kafka-1.default.svc.cluster.local --network br1 --ip=172.168.0.5 kafka_cluster_acl
第二个kafka节点
docker run -it -p : -e broker_id= -e zookeeper=172.168.0.2:,172.168.0.3:,172.168.0.4: -e advertised_hostname=kafka-2.default.svc.cluster.local --network br1 --ip=172.168.0.6 kafka_cluster_acl
第三个kafka节点
docker run -it -p : -e broker_id= -e zookeeper=172.168.0.2:,172.168.0.3:,172.168.0.4: -e advertised_hostname=kafka-3.default.svc.cluster.local --network br1 --ip=172.168.0.7 kafka_cluster_acl
第四个kafka节点
docker run -it -p : -e broker_id= -e zookeeper=172.168.0.2:,172.168.0.3:,172.168.0.4: -e advertised_hostname=kafka-4.default.svc.cluster.local --network br1 --ip=172.168.0.8 kafka_cluster_acl
第五个kafka节点
docker run -it -p : -e broker_id= -e zookeeper=172.168.0.2:,172.168.0.3:,172.168.0.4: -e advertised_hostname=kafka-5.default.svc.cluster.local --network br1 --ip=172.168.0.9 kafka_cluster_acl
客户端测试
shell脚本客户端
先来查看docker进程
root@jqb-node128:~# docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
a5ff3c8f5c2a kafka_cluster_acl "/run.sh" About a minute ago Up About a minute 0.0.0.0:9096->9092/tcp gifted_jones
36a4d94054b5 kafka_cluster_acl "/run.sh" 3 minutes ago Up 3 minutes 0.0.0.0:9095->9092/tcp modest_khorana
f614d734ac8b kafka_cluster_acl "/run.sh" 3 minutes ago Up 3 minutes 0.0.0.0:9094->9092/tcp tender_kare
29ef9a2edd08 kafka_cluster_acl "/run.sh" 3 minutes ago Up 3 minutes 0.0.0.0:9093->9092/tcp reverent_jepsen
d9cd45c62e86 kafka_cluster_acl "/run.sh" 3 minutes ago Up 3 minutes 0.0.0.0:9092->9092/tcp silly_mcclintock
69dba560bc09 zookeeper_cluster "/run.sh" 4 minutes ago Up 4 minutes 0.0.0.0:2183->2181/tcp confident_fermat
d73a01e76949 zookeeper_cluster "/run.sh" 4 minutes ago Up 4 minutes 0.0.0.0:2182->2181/tcp admiring_snyder
7ccab68252e7 zookeeper_cluster "/run.sh" 4 minutes ago Up 4 minutes 0.0.0.0:2181->2181/tcp gifted_wilson
确保已经运行了5个kafka和3个zk
随便进入一个kafka容器
root@jqb-node128:~# docker exec -it a5ff3c8f5c2a /bin/bash
root@a5ff3c8f5c2a:/# cd /kafka_2.12-2.1.0/
新增一个配置文件 kafka_client_jaas.conf
root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# apt-get install -y vim
root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# vi config/kafka_client_jaas.conf
内容如下:
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="writer"
password="";
};
同理我们也要将配置文件内容传递给JVM, 因此需要修改。
生产者
root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# vi bin/kafka-console-producer.sh
最后一行的上面,添加 KAFKA_OPTS 变量
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M"
fi
export KAFKA_OPTS="-Djava.security.auth.login.config=/kafka_2.12-2.1.0/config/kafka_client_jaas.conf"
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"
修改生产者配置文件,最后一行追加2行内容
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
使用echo 追加
root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# echo 'security.protocol=SASL_PLAINTEXT' >> config/producer.properties
root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# echo 'sasl.mechanism=PLAIN' >> config/producer.properties
消费者
修改生产者配置文件,使用echo追加
root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# echo 'security.protocol=SASL_PLAINTEXT' >> config/consumer.properties
root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# echo 'sasl.mechanism=PLAIN' >> config/consumer.properties
编辑测试脚本
root@a5ff3c8f5c2a:/kafka_2.-2.1.# vi bin/kafka-console-consumer.sh
最后一行的上面,添加 KAFKA_OPTS 变量
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M"
fi
export KAFKA_OPTS="-Djava.security.auth.login.config=/kafka_2.12-2.1.0/config/kafka_client_jaas.conf"
exec $(dirname $)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"
测试生产者
目前还没有topic,先来创建一个topic
root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# bin/kafka-topics.sh --create --zookeeper 172.168.0.2:2181,172.168.0.3:2181,172.168.0.4:2181 --topic test --partitions 1 --replication-factor 1 Created topic "test".
root@a5ff3c8f5c2a:/kafka_2.12-2.1.0#
进入生产者模式,指定kafka的服务器为第一个kafka。当然,只要是5个kafka中的任意一个即可!
输入消息 fdsa,回车
root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# bin/kafka-console-producer.sh --broker-list kafka-1.default.svc.cluster.local:9092 --topic test --producer.config config/producer.properties
>fdsa
[2018-12-17 08:45:15,455] ERROR Error when sending message to topic test with key: null, value: 4 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.IllegalSaslStateException: Unexpected handshake request with client mechanism PLAIN, enabled mechanisms are []
[2018-12-17 08:45:15,457] ERROR [Producer clientId=console-producer] Connection to node -1 (d9cd45c62e86.br1/172.168.0.5:9092) failed authentication due to: Unexpected handshake request with client mechanism PLAIN, enabled mechanisms are [] (org.apache.kafka.clients.NetworkClient)
会出现报错,则说明配置的security 已生效, 要想普通用户能读写消息,需要配置ACL
配置ACL
kafka的ACL规则,是存储在zookeeper中的,只需要连接zookeeper即可!
topic权限
允许writer用户有所有权限,访问所有topic
--operation All 表示所有权限,
--topic=* 表示所有topic
root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=172.168.0.2:2181,172.168.0.3:2181,172.168.0.4:2181 --add --allow-principal User:writer --operation All --topic=*
Adding ACLs for resource `Topic:LITERAL:*`:
User:writer has Allow permission for operations: All from hosts: * Current ACLs for resource `Topic:LITERAL:*`:
User:writer has Allow permission for operations: All from hosts: *
组权限
root@a5ff3c8f5c2a:/kafka_2.12-2.1.0# bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=172.168.0.2:2181,172.168.0.3:2181,172.168.0.4:2181 --add --allow-principal User:writer --operation All -group=*
Adding ACLs for resource `Group:LITERAL:*`:
User:writer has Allow permission for operations: All from hosts: * Current ACLs for resource `Group:LITERAL:*`:
User:writer has Allow permission for operations: All from hosts: *
再次测试
root@e0bb740ac0ce:/kafka_2.-2.1.# bin/kafka-console-producer.sh --broker-list kafka-1.default.svc.cluster.local:9092 --topic test --producer.config config/producer.properties
>>
注意:在config/server.properties 文件中,设置了
advertised.listeners=SASL_PLAINTEXT://kafka-1.default.svc.cluster.local:9092
所以连接地址,必须是指定域名才可以!
再开一个窗口,连接同样的容器
root@jqb-node128:~# docker exec -it a5ff3c8f5c2a /bin/bash
root@a5ff3c8f5c2a:/# cd /kafka_2.12-2.1.0/
启动消费者模式
root@a5ff3c8f5c2a:/kafka_2.-2.1.# bin/kafka-console-consumer.sh --bootstrap-server kafka-1.default.svc.cluster.local:9092 --topic test --from-beginning --consumer.config config/consumer.properties
收到123表示成功了!
python客户端测试
由于真实主机无法直接连接到网桥的地址172.168.0.5,那么因此代码需要在
创建空目录
mkdir /opt/py_test
放2个文件
sources.list
deb http://mirrors.aliyun.com/ubuntu/ xenial main
deb-src http://mirrors.aliyun.com/ubuntu/ xenial main deb http://mirrors.aliyun.com/ubuntu/ xenial-updates main
deb-src http://mirrors.aliyun.com/ubuntu/ xenial-updates main deb http://mirrors.aliyun.com/ubuntu/ xenial universe
deb-src http://mirrors.aliyun.com/ubuntu/ xenial universe
deb http://mirrors.aliyun.com/ubuntu/ xenial-updates universe
deb-src http://mirrors.aliyun.com/ubuntu/ xenial-updates universe deb http://mirrors.aliyun.com/ubuntu/ xenial-security main
deb-src http://mirrors.aliyun.com/ubuntu/ xenial-security main
deb http://mirrors.aliyun.com/ubuntu/ xenial-security universe
deb-src http://mirrors.aliyun.com/ubuntu/ xenial-security universe
produer_consumer_acl_test.py
#!/usr/bin/env python3
# coding: utf-
# 注意:需要手动创建topic才行执行此脚本 import sys
import io def setup_io(): # 设置默认屏幕输出为utf-8编码
sys.stdout = sys.__stdout__ = io.TextIOWrapper(sys.stdout.detach(), encoding='utf-8', line_buffering=True)
sys.stderr = sys.__stderr__ = io.TextIOWrapper(sys.stderr.detach(), encoding='utf-8', line_buffering=True)
setup_io() import time
from kafka import KafkaProducer
from kafka import KafkaConsumer class KafkaClient(object): # kafka客户端程序
def __init__(self, kafka_server, port, topic,content,username,password):
self.kafka_server = kafka_server # kafka服务器ip地址
self.port = port # kafka端口
self.topic = topic # topic名
self.content = content # 发送内容
self.username = username # 用户名
self.password = password # 密码 def producer(self):
"""
生产者模式
:return: object
""" # 连接kafka服务器,比如['192.138.150.193:9092']
producer = KafkaProducer(bootstrap_servers=['%s:%s' % (self.kafka_server, self.port)],
security_protocol="SASL_PLAINTEXT", # 指定SASL安全协议
sasl_mechanism='PLAIN', # 配置SASL机制
sasl_plain_username=self.username, # 认证用户名
sasl_plain_password=self.password, # 密码
) producer.send(self.topic, self.content) # 发送消息,必须是二进制
producer.flush() # flush确保所有meg都传送给broker
producer.close()
return producer def consumer(self):
"""
消费者模式
:return: object
""" # 连接kafka,指定组为test_group
consumer = KafkaConsumer(topic, group_id='test_group', bootstrap_servers=['%s:%s' % (kafka_server, port)],
sasl_mechanism="PLAIN",
security_protocol='SASL_PLAINTEXT',
sasl_plain_username=self.username,
sasl_plain_password=self.password,
) return consumer
# for msg in consumer:
# recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
# print(recv) def main(self):
startime = time.time() # 开始时间 client = KafkaClient(self.kafka_server, self.port, self.topic, self.content,self.username,self.password) # 实例化客户端
client.producer() # 执行生产者
print('执行生产者')
consumer = client.consumer() # 执行消费者
print('执行消费者')
print('等待结果....')
flag = False
for msg in consumer:
# recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
# 判断生产的消息和消费的消息是否一致
print(msg.value)
# print(self.content)
if msg.value == self.content:
flag = True
break consumer.close() # 关闭消费者对象
endtime = time.time() # 结束时间 if flag:
# %.2f %(xx) 表示保留小数点2位
return "kafka验证消息成功,花费时间", '%.2f 秒' % (endtime - startime)
else:
return "kafka验证消息失败,花费时间", '%.2f 秒' % (endtime - startime) if __name__ == '__main__':
kafka_server = "kafka-1.default.svc.cluster.local"
port = ""
topic = "test"
content = "hello honey".encode('utf-8') username = "writer"
password = "" client = KafkaClient(kafka_server,port,topic,content,username,password) # 实例化客户端
print(client.main())
此时目录结构如下:
./
├── produer_consumer_acl_test.py
└── sources.list
进入容器,更新ubuntu更新源
root@jqb-node128:/opt/py_test# docker run -it -v /opt/py_test:/mnt --network br1 --ip=172.168.0.10 ubuntu:16.04
root@064f2f97aad2:/# cp /mnt/sources.list /etc/apt/
root@064f2f97aad2:/# apt-get update
安装python3-pip
root@064f2f97aad2:/# apt-get install -y python3-pip
安装kafka模块
root@064f2f97aad2:/# pip3 install kafka
添加hosts记录
echo "172.168.0.5 kafka-1.default.svc.cluster.local" >> /etc/hosts
echo "172.168.0.6 kafka-2.default.svc.cluster.local" >> /etc/hosts
echo "172.168.0.7 kafka-3.default.svc.cluster.local" >> /etc/hosts
echo "172.168.0.8 kafka-4.default.svc.cluster.local" >> /etc/hosts
echo "172.168.0.9 kafka-5.default.svc.cluster.local" >> /etc/hosts
执行Python文件
root@064f2f97aad2:/# python3 /mnt/produer_consumer_acl_v1_test.py
执行生产者
执行消费者
等待结果....
b'hello honey'
('kafka验证消息成功,花费时间', '28.59 秒')
注意:第一次执行时,会非常慢。等待30秒,如果没有输出hello honey。终止掉,再次执行。
反复5次。就可以了!
之后再次执行几次,就会很快了!
root@064f2f97aad2:/# python3 /mnt/produer_consumer_acl_v1_test.py
执行生产者
执行消费者
等待结果....
b'hello honey'
('kafka验证消息成功,花费时间', '5.37 秒')
root@064f2f97aad2:/# python3 /mnt/produer_consumer_acl_v1_test.py
执行生产者
执行消费者
等待结果....
b'hello honey'
('kafka验证消息成功,花费时间', '0.43 秒')
为啥,前面几次会很慢。之后就很快了,什么原因,我也不知道!
总之,只要经历过慢的阶段,之后就很快了!
本文参考链接: