
mostafamoradian/xk6-kafkaxk6-kafka是k6的扩展,允许用户使用生产者(及用于调试的消费者)对Apache Kafka进行负载测试。其核心用途是测试基于Apache Kafka设计的系统,通过自动生成消息并发送到Kafka来验证消费者及整个系统的性能和正确性。本Docker镜像为该扩展提供官方容器化部署方式,简化测试环境配置。
适用于对Kafka生产者、消费者及依赖Kafka的系统进行负载测试的场景,尤其适合:
从Docker Hub拉取官方镜像:
bashdocker pull mostafamoradian/xk6-kafka:latest
通过挂载本地脚本或标准输入传递测试脚本运行负载测试。
bashdocker run --rm -i mostafamoradian/xk6-kafka:latest run - <本地脚本路径/test_json.js
将本地脚本目录挂载到容器中,直接运行指定脚本:
bashdocker run --rm -v $(pwd)/scripts:/scripts mostafamoradian/xk6-kafka:latest run /scripts/test_json.js
测试脚本遵循k6测试生命周期,包含以下核心阶段:
javascript// 导入完整模块 import * as kafka from "k6/x/kafka"; // 或导入特定类和常量 import { Writer, Reader, Connection, SchemaRegistry, SCHEMA_TYPE_STRING } from "k6/x/kafka";
创建Kafka连接、生产者、消费者实例,并初始化主题:
javascript// 创建生产者 const writer = new Writer({ brokers: ["localhost:9092"], // Kafka broker地址列表 topic: "my-test-topic", // 目标主题 compression: "snappy" // 消息压缩方式(可选) }); // 创建消费者 const reader = new Reader({ brokers: ["localhost:9092"], topic: "my-test-topic", groupID: "test-group" // 消费者组ID(可选) }); // 创建连接(用于主题管理) const connection = new Connection({ address: "localhost:9092" }); // 初始化时创建主题(仅首次运行) if (__VU == 0) { connection.createTopic({ topic: "my-test-topic", numPartitions: 3, // 分区数(可选) replicationFactor: 1 // 副本因子(可选) }); }
发送消息、消费消息并验证:
javascriptexport default function () { // 生产消息 writer.produce({ messages: [ { key: schemaRegistry.serialize({ data: "key-1", schemaType: SCHEMA_TYPE_STRING }), value: schemaRegistry.serialize({ data: { id: 1, value: "test" }, schemaType: SCHEMA_TYPE_JSON }) } ] }); // 消费消息并验证 const messages = reader.consume({ limit: 10 }); // 最多消费10条消息 if (messages.length > 0) { console.log(`消费消息: ${JSON.stringify(messages[0].value)}`); // 可添加k6 checks验证消息内容、格式等 } }
关闭连接并清理测试主题:
javascriptexport function teardown() { // 删除测试主题(可选) connection.deleteTopic("my-test-topic"); // 关闭所有连接 writer.close(); reader.close(); connection.close(); }
javascriptconst writer = new Writer({ brokers: ["kafka-broker:9092"], topic: "sasl-test-topic", sasl: { mechanism: "SCRAM-SHA-256", // 认证机制 username: "test-user", password: "test-pass" }, tls: { enable: true // 启用TLS } });
javascriptconst schemaRegistry = new SchemaRegistry({ url: "[***]" }); writer.produce({ messages: [ { key: schemaRegistry.serialize({ data: "user-1", schemaType: "avro", schema: { type: "string" } // 或从Schema Registry获取 }), value: schemaRegistry.serialize({ data: { id: 1, name: "test" }, schemaType: "avro", schema: { type: "record", name: "User", fields: [{ name: "id", type: "int" }, { name: "name", type: "string" }] } }) } ] });
xk6-kafka导出以下核心指标用于监控测试过程:
| 指标名称 | 类型 | 描述 |
|---|---|---|
| kafka_reader_message_count | Counter | 消费消息总数 |
| kafka_writer_message_count | Counter | 生产消息总数 |
| kafka_reader_error_count | Counter | 消费错误总数 |
| kafka_writer_error_count | Counter | 生产错误总数 |
| kafka_reader_lag | Gauge | 消费滞后量(最后消息偏移与当前偏移差) |
| kafka_writer_write_seconds | Trend | 消息写入耗时(趋势) |
| kafka_reader_read_seconds | Trend | 消息读取耗时(趋势) |
| kafka_reader_fetch_bytes | Counter | 读取字节总数 |
| kafka_writer_batch_size | Counter | 批处理消息总数 |
官方仓库scripts目录提供多种功能示例,关键脚本包括:
test_json.js:JSON格式消息生产/消费测试test_avro_with_schema_registry.js:使用Schema Registry的Avro消息测试test_sasl_auth.js:SASL认证测试test_consumer_group.js:消费者组多分区消费测试test_topics.js:主题创建/删除/列出操作测试manifest unknown 错误
TLS 证书验证失败
DNS 解析超时
410 错误:版本过低
402 错误:流量耗尽
身份认证失败错误
429 限流错误
凭证保存错误
来自真实用户的反馈,见证轩辕镜像的优质服务