kafka zk 使用SASL,PLAIN认证
zookeeper配置
- 修改zoo.cfg增加三行配置
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProviderrequireClientAuthScheme=sasljaasLoginRenew=360000
- 配置JAAS文件:conf目录下创建zk_server_jaas.conf(定义了需要链接到Zookeeper服务器的用户名和密码)
vim zk_server_jaas.conf
Server {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="admin"password="admin-sec"user_admin="admin-sec";};
- 加入需要的包:(从kafka/libs下拷贝到zookeeper/lib)
kafka-clients-0.10.0.1.jarlz4-1.3.0.jarslf4j-api-1.7.21.jarslf4j-log4j12-1.7.21.jarsnappy-java-1.1.2.6.jar
- 修改zkEnv.sh(在最后一行新增)
export SERVER_JVMFLAGS=" -Djava.security.auth.login.config=$ZOOKEEPER_PREFIX/conf/zk_server_jaas.conf"
- 启动Zookeeper
/opt/lingmou/zookeeper/bin/zkServer.sh start
kafka服务的配置
- 配置server.properties
listeners=SASL_PLAINTEXT://192.168.101.130:6667advertised.listeners=SASL_PLAINTEXT://192.168.101.130:6667# set protocolsecurity.inter.broker.protocol=SASL_PLAINTEXT# SASL机制sasl.enabled.mechanisms=PLAINsasl.mechanism.inter.broker.protocol=PLAIN# 完成身份认证的类authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer# 如果没有找到ACL(访问控制列表)配置,则允许任何操作allow.everyone.if.no.acl.found=truesuper.users=User:adminsecurity.protocol=SASL_PLAINTEXT
- kafka增加认证信息:config/kafka_server_jaas.conf
KafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="admin" # username是broker用于初始化连接到其他的brokerpassword="admin-sec"user_admin="admin-sec" # user_userName定义了所有连接到broker和broker验证的所有的客户端连接包括其他broker的用户密码user_producer="prod-sec"user_consumer="cons-sec";};Client {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="admin" # 就是broker与zookeeper通信时登录zookeeper的用户名密码password="admin-sec";};
- 修改启动脚本 bin/kafka-server-start.sh
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=$base_dir/../config/kafka_server_jaas.conf kafka.Kafka "$@"
- 启动kafka
/opt/lingmou/kafka/bin/kafka-server-start.sh -daemon /opt/lingmou/kafka/config/server.properties
- kafka客户端配置
# 消费者:conf/kafka-consumer-jaas.confKafkaClient {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="consumer"password="cons-sec";};Client {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="consumer"password="cons-sec";};# 生产者:conf/kafka-producer-jaas.confKafkaClient {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="producer"password="prod-sec";};Client {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="producer"password="prod-sec";};
- 修改客户端配置信息
# 消费者:config/producer.propertiesbootstrap.servers=192.168.101.130:6667compression.type=none# 添加认证机制security.protocol=SASL_PLAINTEXTsasl.mechanism=PLAIN# 生产者:config/consumer.propertieszookeeper.connect=192.168.101.130:2181zookeeper.connection.timeout.ms=6000group.id=test-group# 添加认证机制sasl.mechanism=PLAINsecurity.protocol=SASL_PLAINTEXT
- 修改客户端脚本指定JAAS文件加载:
# 消费者 bin/kafka-console-consumer.sh修改exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"为exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=$(dirname $0)/../config/kafka-consumer-jaas.conf kafka.tools.ConsoleConsumer "$@"# 生产者 bin/kafka-console-producer.sh修改exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"为exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=$(dirname $0)/../config/kafka-producer-jaas.conf kafka.tools.ConsoleProducer "$@"
- 进行授权
1)创建主题 haha./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic haha2)增加生产权限./bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:producer --operation Write --topic haha3)配置消费权限./bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:consumer --operation Read --topic haha4)配置消费分组权限./bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:consumer --operation Read --group test-group5)查看配置的权限./bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --list6)取消权限./bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --remove --allow-principal User:producer --operation Write --topic haha7) 拒绝来自 ip 为192.168.1.100账户为 zhangsan 进行 read 操作,其他用户都允许./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:* --allow-host * --deny-principal User:zhangsan --deny-host 192.168.1.100 --operation Read --topic haha8) 允许来自 ip 为192.168.1.100或者192.168.1.101的读写请求./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:zhangsan --allow-principal User:alice --allow-host 192.168.1.100 --allow-host 192.168.1.101 --operation Read --operation Write --topic haha
- 测试
1)生产数据./bin/kafka-console-producer.sh --topic haha --broker-list localhost:6667 --producer.config config/producer.properties2)消费数据./bin/kafka-console-consumer.sh --topic haha --bootstrap-server localhost:6667 --consumer.config config/consumer.properties
