Kafka MongoDB Watcher是一个用于监听MongoDB集合操作日志(oplog)事件(如插入、更新、删除等)并将其分发到指定Kafka主题的工具。同时支持重播模式,可在首次使用时将集合中的所有文档初始化到Kafka主题中。该Docker镜像封装了工具的运行环境,简化部署流程。
镜像已发布至Docker Hub,可直接拉取:
bashdocker pull etf1/kafka-mongo-watcher:latest
使用docker run命令启动,通过环境变量配置参数:
bashdocker run \ -e "KAFKA_MONGO_WATCHER_REPLAY=true" \ -e "KAFKA_MONGO_WATCHER_MONGODB_URI=mongodb://root:password@mongo:27017/mydb?replicaSet=rs0" \ -e "KAFKA_MONGO_WATCHER_MONGODB_DATABASE_NAME=mydb" \ -e "KAFKA_MONGO_WATCHER_MONGODB_COLLECTION_NAME=mycollection" \ -e "KAFKA_MONGO_WATCHER_KAFKA_BOOTSTRAP_SERVERS=kafka:9092" \ -e "KAFKA_MONGO_WATCHER_KAFKA_TOPIC=mongodb-events" \ etf1/kafka-mongo-watcher:latest
yamlversion: '3' services: kafka-mongo-watcher: image: etf1/kafka-mongo-watcher:latest environment: - KAFKA_MONGO_WATCHER_REPLAY=true - KAFKA_MONGO_WATCHER_MONGODB_URI=mongodb://root:password@mongo:27017/mydb?replicaSet=rs0 - KAFKA_MONGO_WATCHER_MONGODB_DATABASE_NAME=mydb - KAFKA_MONGO_WATCHER_MONGODB_COLLECTION_NAME=mycollection - KAFKA_MONGO_WATCHER_KAFKA_BOOTSTRAP_SERVERS=kafka:9092 - KAFKA_MONGO_WATCHER_KAFKA_TOPIC=mongodb-events - KAFKA_MONGO_WATCHER_LOG_LEVEL=info depends_on: - mongo - kafka mongo: image: mongo:5 command: --replSet rs0 environment: - MONGO_INITDB_ROOT_USERNAME=root - MONGO_INITDB_ROOT_PASSWORD=password kafka: image: confluentinc/cp-kafka:7.0.0 environment: - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092 - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 depends_on: - zookeeper zookeeper: image: confluentinc/cp-zookeeper:7.0.0 environment: - ZOOKEEPER_CLIENT_PORT=2181
可通过环境变量配置镜像,支持多源配置(.env文件、环境变量、命令行参数,后者覆盖前者)。以下是主要配置变量:
| 环境变量 | 类型 | 描述 | 示例值 | 默认值 |
|---|---|---|---|---|
| KAFKA_MONGO_WATCHER_CUSTOM_PIPELINE | string | 自定义过滤管道,支持重播和监听模式 | [ { "$match": { "fullDocument.is_active": true } }, { $addFields: { "custom-field": "custom-value" } } ] | - |
| KAFKA_MONGO_WATCHER_REPLAY | bool | 是否一次性发送集合所有文档 | true | false |
| KAFKA_MONGO_WATCHER_MONGODB_URI | string | MongoDB连接URI | mongodb://root:password@mongo:27017/mydb?replicaSet=rs0 | mongodb://root:toor@127.0.0.1:27011,... |
| KAFKA_MONGO_WATCHER_MONGODB_COLLECTION_NAME | string | 要监听的MongoDB集合名 | users | items |
| KAFKA_MONGO_WATCHER_MONGODB_DATABASE_NAME | string | MongoDB数据库名 | mydb | watcher |
| KAFKA_MONGO_WATCHER_MONGODB_SERVER_SELECTION_TIMEOUT | duration | MongoDB服务器选择超时时间 | 5s | 2s |
| KAFKA_MONGO_WATCHER_MONGODB_OPTION_BATCH_SIZE | integer | MongoDB监听批处理大小(0为不批处理) | 100 | 0 |
| KAFKA_MONGO_WATCHER_MONGODB_OPTION_FULL_DOCUMENT | boolean | 是否在oplog中返回完整文档 | false | true |
| KAFKA_MONGO_WATCHER_MONGODB_OPTION_MAX_AWAIT_TIME | duration | 等待新oplog的最大时间(0为不停止) | 30s | 0 |
| KAFKA_MONGO_WATCHER_MONGODB_OPTION_RESUME_AFTER | string | 变更流的起始逻辑点 | {"_data": "82650..."} | - |
| KAFKA_MONGO_WATCHER_MONGODB_OPTION_START_AT_OPERATION_TIME_I | uint32 | 变更流起始时间戳(递增值) | *** | - |
| KAFKA_MONGO_WATCHER_MONGODB_OPTION_START_AT_OPERATION_TIME_T | uint32 | 变更流起始时间戳(时间值) | *** | - |
| KAFKA_MONGO_WATCHER_MONGODB_OPTION_WATCH_MAX_RETRIES | integer | 监听集合的最大重试次数(0为禁用重试) | 5 | 3 |
| KAFKA_MONGO_WATCHER_MONGODB_OPTION_WATCH_RETRY_DELAY | duration | 监听重试间隔 | 1s | 500ms |
| KAFKA_MONGO_WATCHER_KAFKA_BOOTSTRAP_SERVERS | string | Kafka引导服务器列表 | kafka1:9092,kafka2:9092 | 127.0.0.1:9092 |
| KAFKA_MONGO_WATCHER_KAFKA_TOPIC | string | 目标Kafka主题名 | mongodb-events | kafka-mongo-watcher |
| KAFKA_MONGO_WATCHER_KAFKA_PRODUCE_CHANNEL_SIZE | integer | Kafka生产者内部通道最大容量 | 20000 | *** |
| KAFKA_MONGO_WATCHER_LOG_CLI_VERBOSE | boolean | 是否启用详细日志 | false | true |
| KAFKA_MONGO_WATCHER_LOG_LEVEL | string | 日志级别 | debug | info |
| KAFKA_MONGO_WATCHER_GRAYLOG_ENDPOINT | string | Graylog日志推送端点 | udp://graylog:*** | - |
| KAFKA_MONGO_WATCHER_HTTP_TECH_ADDR | string | 技术HTTP服务器监听地址 | :8080 | :8001 |
| KAFKA_MONGO_WATCHER_PRINT_CONFIG | boolean | 启动时是否打印配置 | false | true |
| KAFKA_MONGO_WATCHER_PPROF_ENABLED | boolean | 是否启用Go pprof调试 | false | true |
工具暴露Go进程和应用监控指标,可通过技术HTTP服务器端点被Prometheus抓取:
http://<容器IP>:8001/metrics
(默认端口可通过KAFKA_MONGO_WATCHER_HTTP_TECH_ADDR修改)
来自真实用户的反馈,见证轩辕镜像的优质服务
免费版仅支持 Docker Hub 加速,不承诺可用性和速度;专业版支持更多镜像源,保证可用性和稳定速度,提供优先客服响应。
免费版仅支持 docker.io;专业版支持 docker.io、gcr.io、ghcr.io、registry.k8s.io、nvcr.io、quay.io、mcr.microsoft.com、docker.elastic.co 等。
当返回 402 Payment Required 错误时,表示流量已耗尽,需要充值流量包以恢复服务。
通常由 Docker 版本过低导致,需要升级到 20.x 或更高版本以支持 V2 协议。
先检查 Docker 版本,版本过低则升级;版本正常则验证镜像信息是否正确。
使用 docker tag 命令为镜像打上新标签,去掉域名前缀,使镜像名称更简洁。
探索更多轩辕镜像的使用方法,找到最适合您系统的配置方式
通过 Docker 登录认证访问私有仓库
在 Linux 系统配置镜像加速服务
在 Docker Desktop 配置镜像加速
Docker Compose 项目配置加速
Kubernetes 集群配置 Containerd
在宝塔面板一键配置镜像加速
Synology 群晖 NAS 配置加速
飞牛 fnOS 系统配置镜像加速
极空间 NAS 系统配置加速服务
爱快 iKuai 路由系统配置加速
绿联 NAS 系统配置镜像加速
QNAP 威联通 NAS 配置加速
Podman 容器引擎配置加速
HPC 科学计算容器配置加速
ghcr、Quay、nvcr 等镜像仓库
无需登录使用专属域名加速
需要其他帮助?请查看我们的 常见问题 或 官方QQ群: 13763429