
davradocker/ruban-ms-iotdata-persist-service本Docker镜像为一款轻量级数据集成工具,专注于实现Kafka消息队列与时序数据库(Time Series Database, TSDB)之间的数据流转。通过配置化方式,将Kafka主题中的消息数据提取、转换后,高效写入目标时序数据库,解决实时数据流从消息中间件到时序存储系统的无缝对接问题,适用于需要长期存储、分析时序数据的场景。
bashdocker run -d \ --name kafka-to-tsdb \ -e KAFKA_BOOTSTRAP_SERVERS="kafka-broker:9092" \ -e KAFKA_TOPICS="sensor-data,metrics" \ -e TSDB_TYPE="influxdb" \ -e TSDB_URL="[***]" \ -e TSDB_DATABASE="iot_data" \ -e TSDB_USER="admin" \ -e TSDB_PASSWORD="password" \ -v ./data-mapping.json:/app/config/mapping.json \ your-registry/kafka-to-tsdb:latest
yamlversion: '3' services: kafka-to-tsdb: image: your-registry/kafka-to-tsdb:latest container_name: kafka-to-tsdb environment: - KAFKA_BOOTSTRAP_SERVERS=kafka:9092 - KAFKA_TOPICS=iot-sensors,system-metrics - KAFKA_CONSUMER_GROUP=tsdb-writer - TSDB_TYPE=timescaledb - TSDB_URL=jdbc:postgresql://timescaledb:5432/metrics_db - TSDB_USER=tsdb_user - TSDB_PASSWORD=tsdb_pass - BATCH_SIZE=1000 - FLUSH_INTERVAL=5000 volumes: - ./mapping:/app/config depends_on: - kafka - timescaledb kafka: image: confluentinc/cp-kafka:latest # Kafka配置省略... timescaledb: image: timescale/timescaledb:latest-pg14 # TimescaleDB配置省略...
| 环境变量 | 描述 | 示例值 | 是否必填 |
|---|---|---|---|
KAFKA_BOOTSTRAP_SERVERS | Kafka集群地址,多个地址用逗号分隔 | kafka-1:9092,kafka-2:9092 | 是 |
KAFKA_TOPICS | 需消费的Kafka主题,多个主题用逗号分隔 | sensor-data,app-metrics | 是 |
KAFKA_CONSUMER_GROUP | Kafka消费者组ID,用于偏移量管理 | tsdb-writer-group-1 | 否(默认:tsdb-writer) |
TSDB_TYPE | 目标时序数据库类型 | influxdb/timescaledb/prometheus | 是 |
TSDB_URL | 时序数据库连接地址 | [***]/jdbc:postgresql://tsdb:5432/db | 是 |
TSDB_DATABASE | 目标数据库名称 | iot_data | 是 |
TSDB_USER | 时序数据库认证用户名 | admin | 否(如无需认证) |
TSDB_PASSWORD | 时序数据库认证密码 | password | 否(如无需认证) |
BATCH_SIZE | 批量写入数据量阈值(达到阈值触发写入) | 1000(默认:500) | 否 |
FLUSH_INTERVAL | 定时写入间隔(毫秒,达到间隔触发写入,无论是否达到批量阈值) | 5000(默认:3000) | 否 |
通过挂载JSON配置文件(如/app/config/mapping.json)定义Kafka消息到时序数据库的字段映射规则,示例如下(以InfluxDB为例):
json{ "sensor-data": { // Kafka主题名 "measurement": "sensor_measure", // InfluxDB measurement名称 "tags": ["sensor_id", "location"], // 作为InfluxDB tag的字段(从Kafka消息中提取) "fields": { // 作为InfluxDB field的字段(支持类型转换) "temperature": {"type": "float"}, "humidity": {"type": "float"}, "status": {"type": "string"} }, "timestamp": { // 时间戳字段配置 "field": "ts", // 从消息中提取的时间戳字段名 "format": "unix_ms" // 时间戳格式:unix_ms/unix/iso8601 }, "filter": "status != 'error'" // 数据过滤条件(可选) } }
BATCH_SIZE与FLUSH_INTERVAL,并配置数据库连接池参数(通过环境变量DB_MAX_CONNECTIONS等)。LOG_LEVEL=debug),验证数据写入正确性。


manifest unknown 错误
TLS 证书验证失败
DNS 解析超时
410 错误:版本过低
402 错误:流量耗尽
身份认证失败错误
429 限流错误
凭证保存错误
来自真实用户的反馈,见证轩辕镜像的优质服务