我的 Kafka 旅程 - 基于账号密码的 SASL+PLAIN 认证授权 · 配置 · 创建账号 · 用户授权 · .NET接入

发布时间 2023-09-04 09:09:47作者: Sol·wang

本文基于 Kafka 3.0+ 的 KRaft 模式来阐述

默认的 Kafka 不受认证约束,可不用账号就可以连接到服务,也就是默认的 PLAIN 方式,不需要认证;配置了 SASL 认证之后,连接Kafka只能用凭证连接登录。

SASL 支持的认证方式有多种:GSSAPI,PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,OAUTHBEARER
本文讲述的是比较简单的 SASL_PLAINTEXT 方式,认证机制统一为:SCRAM-SHA-512。

那么基于 SASL+PLAINTEXT+SCRAM 的认证模式,本文涵盖的内容为:

  1. 配置服务端的认证模式
  2. 命令行创建新账号
  3. 授权账号到Topic的生产/消费权限
  4. 命令行凭证接入样例
  5. 客户端凭证接入样例

作者:[Sol·wang] - 博客园,原文出处:https://www.cnblogs.com/Sol-wang/

一、创建新用户

所以在不受认证约束的默认情况下,使用 kafka-configs.sh,可以在 Kafka 中创建新用户:

# 创建用户
bin/kafka-configs.sh --bootstrap-server {host}:9092 --alter \
    --entity-type users --entity-name {u-name} --add-config 'SCRAM-SHA-512=[password={user-password}]'
# 查看用户
bin/kafka-configs.sh --bootstrap-server {host}:9092 --describe --entity-type users --entity-name {u-name}

为什么要先创建一个账号

默认在没有账号的情况下,后续的认证授权生效后,用谁来连接到Kafka创建用户呢??
当然是先有一个管理员账号的存在,如上创建的账号,就假设以上创建的账号为管理员 admin。

用户的分类

 - 超级管理员:对 KAFKA 的管理
 - 管理员:  用于第三方UI的接入
 - 写入Topic:用于生产端的接入
 - 读取Topic:用于消费端的接入

二、认证授权配置

接下来,让创建的用户起作用,就要配置认证授权机制(SASL_PLAINTEXT)

来编辑 config/kraft/server.properties 配置文件吧,如下:

# 认证方式(内/外) 配置
listeners=SASL_PLAINTEXT://:9092,CONTROLLER://:9093     # 对外认证方式 SASL_PLAINTEXT
advertised.listeners=SASL_PLAINTEXT://localhost:9092    # Client(生产/消费)认证方式(不配置就用上一行代替)
inter.broker.listener.name=SASL_PLAINTEXT               # 内部节点之间的通讯认证方式
security.inter.broker.protocol=SASL_PLAINTEXT           # 内部通讯安全协议(不配置就用上一行代替)

# 认证机制 配置
sasl.enabled.mechanisms=SCRAM-SHA-512                   # SASL 定义支持的认证机制
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512      # SASL 的内部认证机制(必须是上一行包含的值)

# 账号限制 配置
super.users=User:{user-name};User:{user-name}           # 定义超级管理员
allow.everyone.if.no.acl.found=true                     # 生产/消费/等等,超级管理员之外的用户是否可以访问

# 授权方式(类名) 配置
authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer

重启 Kafka 服务,使其配置生效。此时,只能用定义了的超级管理员来连接操作 Kafka。那么需要的用户凭证如下节内容。

三、命令行中的凭证

假设之前已经创建了用户 admin/*****,并且已配置为超级管理员。

为命令行模式创建此用户的凭证文件,把它命名为 admin-user-jaas,admin用户凭证内容如下:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="*****";

命令行模式,用凭证连接 Kafka 服务:

之后的每一步操作,都要带上用户凭证,假设用超级用户 admin 继续创建新用户:

bin/kafka-configs.sh --bootstrap-server {host}:9092 \
    --alter --add-config 'SCRAM-SHA-512=[password={user-password}]' \
    --entity-type users --entity-name {new-user-name} --command-config admin-user-jaas

与之前不同的是多了个--command-config admin-user-jaas 也就是admin用户的凭证信息。这时若不带凭证信息会提示 disconnected。

四、用户授权到TOPIC

用户创建好了,接下来要为用户授权到指定的 Topic 了,并指定写入/读取权限。

通常写入权限等同于生产端的授权,那么读取权限等同于消费端的授权。

# 授权用户到指定 topic 的生产端(写入)权限
# --allow-principal:指定用户
# --topic {topic-name}:指定某个主题
# --producer:指定为(生产端的)写入权限
bin/kafka-acls.sh --bootstrap-server {host}:9092 --add --producer \
    --allow-principal User:{user-name} --topic {topic-name} --command-config {admin-jaas}
#
# 授权用户到指定 topic 的消费端(读取)权限
# --allow-principal:指定用户
# --topic {topic-name}:指定某个主题
# --consumer:指定为(消费端的)读取权限
# --group {topic-group-name}:必须的消费组归属
bin/kafka-acls.sh --bootstrap-server {host}:9092 --add --consumer \
    --allow-principal User:{user-name} --topic {topic-name} --group {topic-group-name} --command-config {jaas-name}

随着业务不断的增长,创建更多的账号并授权使用。

五、使用凭证接入

所用的用户凭证JAAS文件都相同,只不过账号密码不同,同样的为客户端用户创建一个凭证文件。

跟 admin-user-jaas 文件一样,创建一个新的JAAS凭证文件,这里是生产端/消费端的命令行接入案例:

# 生产端 用户凭证 连接到 Kafka
# --producer.config {用户凭证文件}
bin/kafka-console-producer.sh --bootstrap-server {host}:9092 --topic {topic-name} --producer.config upro-jaas
#
# 消费端 用户凭证 连接到 Kafka
# --consumer.config {用户凭证文件}
bin/kafka-console-consumer.sh --bootstrap-server {host}:9092 --topic {topic-name} --consumer.config ucsm-jaas --from-beginning

六、.NET用凭证接入

也就是把凭证文件JAAS中的内容,移到配置类中:

var conf = new ConsumerConfig {
    GroupId = "test-cons-group",
    BootstrapServers = "192.168.56.101:9092",
    SaslUsername = "user-name",
    SaslPassword = "*********",
    SaslMechanism = SaslMechanism.ScramSha512,
    SecurityProtocol = SecurityProtocol.SaslPlaintext,
    SaslOauthbearerConfig = "org.apache.kafka.common.security.scram.ScramLoginModule"
};