本例不使用kerberos做认证,使用用户名和密码的方式来进行认证
1、服务端配置
1.0 配置server.properties 添加如下配置
#配置 ACL 入口类
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
#本例使用 SASL PLAINTEXT
listeners=SASL_PLAINTEXT://hadoop4:9092
security.inter.broker.protocol= SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
#设置本例中 admin 为超级用户
super.users=User:admin
1.1 创建服务端的jaas.conf文件,文件信息如下:
[hduser@hadoop4 config]$ cat jaas.conf
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin"
user_admin="admin"
user_reader="reader"
user_writer="writer";
};
1.2 修改启动脚本kafka-server-start.sh,
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/data1/hadoop/kafka/config/jaas.conf kafka.Kafka "$@"
其中:-Djava.security.auth.login.config=/data1/hadoop/kafka/config/jaas.conf 是新加的
2、生产者配置
2.1 生成jaas文件
[hduser@hadoop4 config]$ cat writer_jaas.conf
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username = "writer"
password="writer";
};
2.2 配置生产者启动脚本
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/data1/hadoop/kafka/config/writer_jaas.conf kafka.tools.ConsoleProducer "$@"
2.3 配置启动脚本
kafka-console-producer.sh --bootstrap-server 192.168.43.15:9092 --topic test2 --producer-property security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=PLAIN
可以发现,需要添加协议参数:
security.protocol: 表示开启安全协议,使用SASL,
sasl.mechanism: 协议机制,如果是使用Kerberos,那么就配置kerberos
如果继续执行上述的命令,可以发现还是失败,失败的原因是对于topic test2来说,没有授权。
2.4 授权
在设置具体的 ACL 规则之前,首先简单学习一下 Kafka ACL 的格式。根据官网 的介绍,
Kafka 一条 ACL 的格式为 "Principal P is [Allowed/Denied] Operation O From Host H On
Resource R",含义描述如下:
principal :表示 Kafka user
operation :表示 个具体的操作类型,如 WRITE、READ 、DESCRIBE 。完整的操
作列表详见 http://docs.confluent.io/current/kafka/authorization.html#overview
Host 表示连 Kafka 集群的 client IP 地址,如果是“*”则表示所有四。注意 ,当
Kafka 不支持主机名,只能指定 IP 地址。
Resource :表示一种 Kafka 资源类型 。当前共有 种类型 TOPIC CLUSTER GROUP
和 TRANSACTIONID
kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:writer --operation Write --topic test2
3、消费者
3.1 配置jaas文件
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="reader"
password="reader";
};
3.2 消费者启动脚本配置
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/data1/hadoop/kafka/config/reader_jaas.conf kafka.tools.ConsoleConsumer "$@"
3.3 创建消费者配置文件
[hduser@hadoop4 ~]$ cat consumer.config
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
group.id=test-group
3.4 消费数据
- 如果不指定consumer.config,将会出现下面的异常
[hduser@hadoop4 ~]$ kafka-console-consumer.sh --bootstrap-server 192.168.43.15:9092 --from-beginning --topic test2
[2021-05-08 09:44:35,771] WARN [Consumer clientId=consumer-console-consumer-85632-1, groupId=console-consumer-85632] Bootstrap broker 192.168.43.15:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
[2021-05-08 09:44:36,187] WARN [Consumer clientId=consumer-console-consumer-85632-1, groupId=console-consumer-85632] Bootstrap broker 192.168.43.15:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
[2021-05-08 09:44:36,599] WARN [Consumer clientId=consumer-console-consumer-85632-1, groupId=console-consumer-85632] Bootstrap broker 192.168.43.15:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
[2021-05-08 09:44:37,006] WARN [Consumer clientId=consumer-console-consumer-85632-1, groupId=console-consumer-85632] Bootstrap broker 192.168.43.15:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
- 接着指定consumer.config
[hduser@hadoop4 ~]$ kafka-console-consumer.sh --bootstrap-server 192.168.43.15:9092 --from-beginning --topic test2 --consumer.config consumer.config
[2021-05-08 09:46:10,044] WARN [Consumer clientId=consumer-test-group-1, groupId=test-group] Error while fetching metadata with correlation id 2 : {test2=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)
[2021-05-08 09:46:10,045] ERROR [Consumer clientId=consumer-test-group-1, groupId=test-group] Topic authorization failed for topics [test2] (org.apache.kafka.clients.Metadata)
[2021-05-08 09:46:10,047] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [test2]
可以发现跟生产者是一样的,没有权限访问topic test2
3.5 授权
[hduser@hadoop4 ~]$ kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:reader --operation Read --topic test2
Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test2, patternType=LITERAL)`:
(principal=User:reader, host=*, operation=READ, permissionType=ALLOW)
Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test2, patternType=LITERAL)`:
(principal=User:writer, host=*, operation=WRITE, permissionType=ALLOW)
(principal=User:reader, host=*, operation=READ, permissionType=ALLOW)
3.6 重新消费
接着消费还是会发现没有对组test-group的操作权限
[hduser@hadoop4 ~]$ kafka-console-consumer.sh --bootstrap-server 192.168.43.15:9092 --from-beginning --topic test2 --consumer.config consumer.config
[2021-05-08 09:48:07,842] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: test-group
Processed a total of 0 messages
赋予权限
[hduser@hadoop4 ~]$ kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:reader --operation Read --group test-group
Adding ACLs for resource `ResourcePattern(resourceType=GROUP, name=test-group, patternType=LITERAL)`:
(principal=User:reader, host=*, operation=READ, permissionType=ALLOW)
Current ACLs for resource `ResourcePattern(resourceType=GROUP, name=test-group, patternType=LITERAL)`:
(principal=User:reader, host=*, operation=READ, permissionType=ALLOW)
生产者发送
[hduser@hadoop4 ~]$ kafka-console-producer.sh --bootstrap-server 192.168.43.15:9092 --topic test2 --producer-property security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=PLAIN
>hahaha
>wanm^H^H
>完美
>
消费者消费
[hduser@hadoop4 ~]$ kafka-console-consumer.sh --bootstrap-server 192.168.43.15:9092 --from-beginning --topic test2 --consumer.config consumer.config
hahaha
wanm
完美
4、管理员
使用admin用户查看用户的组信息
4.1 配置jaas.conf文件
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin";
};
4.2 配置脚本kafka-consumer-groups.sh
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/data1/hadoop/kafka/config/admin_jaas.conf kafka.admin.ConsumerGroupCommand "$@"
4.3 配置安全协议属性
[hduser@hadoop4 ~]$ cat admin_sasl.config
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
4.4 查看组信息
[hduser@hadoop4 ~]$ kafka-consumer-groups.sh --group test-group --describe --command-config admin_sasl.config --bootstrap-server 192.168.43.15:9092
Consumer group \'test-group\' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test-group test2 0 3 3 0 - - -
test-group test 1 1001515 1001516 1 - - -
test-group test 0 992785 992786 1 - - -
test-group test 3 1000894 1000894 0 - - -
test-group test 2 1000772 1000773 1 - - -
test-group test 4 1004034 1004034 0 - - -
一般生产环境还是得使用Kerberos配合ranger+ldap。
借鉴kafka实战