
xk6-kafka是k6的扩展,允许用户使用生产者(及用于调试的消费者)对Apache Kafka进行负载测试。其核心用途是测试基于Apache Kafka设计的系统,通过自动生成消息并发送到Kafka来验证消费者及整个系统的性能和正确性。本Docker镜像为该扩展提供官方容器化部署方式,简化测试环境配置。
适用于对Kafka生产者、消费者及依赖Kafka的系统进行负载测试的场景,尤其适合:
从Docker Hub拉取官方镜像:
bashdocker pull mostafamoradian/xk6-kafka:latest
通过挂载本地脚本或标准输入传递测试脚本运行负载测试。
1. 通过标准输入传递脚本
bashdocker run --rm -i mostafamoradian/xk6-kafka:latest run - <本地脚本路径/test_json.js
2. 挂载本地目录运行脚本
将本地脚本目录挂载到容器中,直接运行指定脚本:
bashdocker run --rm -v $(pwd)/scripts:/scripts mostafamoradian/xk6-kafka:latest run /scripts/test_json.js
测试脚本遵循k6测试生命周期,包含以下核心阶段:
1. 导入模块
javascript// 导入完整模块 import * as kafka from "k6/x/kafka"; // 或导入特定类和常量 import { Writer, Reader, Connection, SchemaRegistry, SCHEMA_TYPE_STRING } from "k6/x/kafka";
2. 初始化上下文(init阶段)
创建Kafka连接、生产者、消费者实例,并初始化主题:
javascript// 创建生产者 const writer = new Writer({ brokers: ["localhost:9092"], // Kafka broker地址列表 topic: "my-test-topic", // 目标主题 compression: "snappy" // 消息压缩方式(可选) }); // 创建消费者 const reader = new Reader({ brokers: ["localhost:9092"], topic: "my-test-topic", groupID: "test-group" // 消费者组ID(可选) }); // 创建连接(用于主题管理) const connection = new Connection({ address: "localhost:9092" }); // 初始化时创建主题(仅首次运行) if (__VU == 0) { connection.createTopic({ topic: "my-test-topic", numPartitions: 3, // 分区数(可选) replicationFactor: 1 // 副本因子(可选) }); }
3. VU测试逻辑(default函数)
发送消息、消费消息并验证:
javascriptexport default function () { // 生产消息 writer.produce({ messages: [ { key: schemaRegistry.serialize({ data: "key-1", schemaType: SCHEMA_TYPE_STRING }), value: schemaRegistry.serialize({ data: { id: 1, value: "test" }, schemaType: SCHEMA_TYPE_JSON }) } ] }); // 消费消息并验证 const messages = reader.consume({ limit: 10 }); // 最多消费10条消息 if (messages.length > 0) { console.log(`消费消息: ${JSON.stringify(messages[0].value)}`); // 可添加k6 checks验证消息内容、格式等 } }
4. 清理资源(teardown阶段)
关闭连接并清理测试主题:
javascriptexport function teardown() { // 删除测试主题(可选) connection.deleteTopic("my-test-topic"); // 关闭所有连接 writer.close(); reader.close(); connection.close(); }
SASL认证配置
javascriptconst writer = new Writer({ brokers: ["kafka-broker:9092"], topic: "sasl-test-topic", sasl: { mechanism: "SCRAM-SHA-256", // 认证机制 username: "test-user", password: "test-pass" }, tls: { enable: true // 启用TLS } });
Avro格式消息(带Schema Registry)
javascriptconst schemaRegistry = new SchemaRegistry({ url: "http://schema-registry:8081" }); writer.produce({ messages: [ { key: schemaRegistry.serialize({ data: "user-1", schemaType: "avro", schema: { type: "string" } // 或从Schema Registry获取 }), value: schemaRegistry.serialize({ data: { id: 1, name: "test" }, schemaType: "avro", schema: { type: "record", name: "User", fields: [{ name: "id", type: "int" }, { name: "name", type: "string" }] } }) } ] });
xk6-kafka导出以下核心指标用于监控测试过程:
| 指标名称 | 类型 | 描述 |
|---|---|---|
| kafka_reader_message_count | Counter | 消费消息总数 |
| kafka_writer_message_count | Counter | 生产消息总数 |
| kafka_reader_error_count | Counter | 消费错误总数 |
| kafka_writer_error_count | Counter | 生产错误总数 |
| kafka_reader_lag | Gauge | 消费滞后量(最后消息偏移与当前偏移差) |
| kafka_writer_write_seconds | Trend | 消息写入耗时(趋势) |
| kafka_reader_read_seconds | Trend | 消息读取耗时(趋势) |
| kafka_reader_fetch_bytes | Counter | 读取字节总数 |
| kafka_writer_batch_size | Counter | 批处理消息总数 |
官方仓库scripts目录提供多种功能示例,关键脚本包括:
test_json.js:JSON格式消息生产/消费测试test_avro_with_schema_registry.js:使用Schema Registry的Avro消息测试test_sasl_auth.js:SASL认证测试test_consumer_group.js:消费者组多分区消费测试test_topics.js:主题创建/删除/列出操作测试您可以使用以下命令拉取该镜像。请将 <标签> 替换为具体的标签版本。如需查看所有可用标签版本,请访问 标签列表页面。
探索更多轩辕镜像的使用方法,找到最适合您系统的配置方式
通过 Docker 登录认证访问私有仓库
无需登录使用专属域名
Kubernetes 集群配置 Containerd
K3s 轻量级 Kubernetes 镜像加速
VS Code Dev Containers 配置
Podman 容器引擎配置
HPC 科学计算容器配置
ghcr、Quay、nvcr 等镜像仓库
Harbor Proxy Repository 对接专属域名
Portainer Registries 加速拉取
Nexus3 Docker Proxy 内网缓存
需要其他帮助?请查看我们的 常见问题Docker 镜像访问常见问题解答 或 提交工单
docker search 限制
站内搜不到镜像
离线 save/load
插件要用 plugin install
WSL 拉取慢
安全与 digest
新手拉取配置
镜像合规机制
manifest unknown
no matching manifest(架构)
invalid tar header(解压)
TLS 证书失败
DNS 超时
域名连通性排查
410 Gone 排查
402 与流量用尽
401 认证失败
429 限流
D-Bus 凭证提示
413 与超大单层
来自真实用户的反馈,见证轩辕镜像的优质服务