Kafka消息调度器允许在特定时间将消息发送到目标主题,支持指定payload,适用于基于Kafka消息的定时触发事件处理。通过Kafka主题存储调度任务,结合高可用和故障转移机制,确保定时任务可靠执行。
如TF1网站中,视频在指定日期上线和下线,通过调度器发送消息到特定主题,由消费者执行激活/停用操作。
处理外部API调用失败时,可将失败消息重新调度以重试,需基于原始时间戳确保幂等性。
用户登录时调度"1年后删除"事件,若用户长期未登录,触发消息以删除不活跃用户。
调度器通过Kafka主题存储调度任务,消息需包含以下特定 headers:
scheduler-epoch:调度时间戳(自1970年起的秒数)scheduler-target-topic:目标主题名称scheduler-target-key:发送到目标主题的消息键示例: 调度消息:
Headers: scheduler-epoch: *** # 目标时间戳 scheduler-target-topic: online-videos # 目标主题 scheduler-target-key: vid1 # 目标消息键 customer-header: dummy # 自定义header Timestamp: *** # 原始消息时间戳 Key: vid1-online # 调度消息键 Value: "video 1" # payload
触发后在online-videos主题生成的消息:
Headers: scheduler-timestamp: *** # 原始消息时间戳 scheduler-key: vid1-online # 原始调度消息键 scheduler-topic: schedules # 原始调度主题 customer-header: dummy # 保留自定义header Key: vid1 # 目标消息键 Value: "video 1" # 原始payload
触发后,消息会复制到history主题用于审计,并在调度主题生成墓碑消息(空payload)删除该调度。
对于Go语言,可使用clientlib包装Kafka消息以简化调度任务创建。
支持多实例部署,调度任务自动负载均衡,降低单实例内存压力。可配置多个输入主题,但所有主题中的调度任务键需唯一。
调度器崩溃或长时间下线后,重启时会重新同步分配的调度任务,优先触发错过的任务,再处理实时传入的调度。
默认在8001端口的/metrics路径暴露Prometheus指标,指标kafka_scheduler_event_total包含:
通过环境变量配置,参数说明如下:
| 环境变量 | 默认值 | 描述 |
|---|---|---|
| BOOTSTRAP_SERVERS | localhost:9092 | Kafka引导服务器列表,用逗号分隔 |
| SCHEDULES_TOPICS | schedules | 输入调度主题列表,用逗号分隔 |
| SINCE_DELTA | 0 | 回溯天数以检测错过的调度(0:今天,-1:昨天,以此类推) |
| GROUP_ID | scheduler-cg | 调度器消费者的消费者组ID |
| METRICS_HTTP_ADDR | :8001 | Prometheus指标暴露地址(路径为/metrics) |
| HISTORY_TOPIC | history | 存储触发调度副本的审计主题名称 |
bashdocker run -e BOOTSTRAP_SERVERS="kafka:9092" etf1/kafka-message-scheduler
bashBOOTSTRAP_SERVERS="kafka:9092" go run ./cmd/kafka
goimport ( runner "github.com/etf1/kafka-message-scheduler/runner/kafka" ) // ... kafkaRunner := runner.DefaultRunner() go func() { if err := kafkaRunner.Start(); err != nil { log.Printf("调度器启动失败: %v", err) } }() // ... kafkaRunner.Close()
调度主题需配置为紧凑模式且保留期无限,分区数至少与调度器实例数相同:
bashkafka-topics --bootstrap-server "${BOOTSTRAP_SERVERS}" --create --topic schedules \ --partitions 3 --config "cleanup.policy=compact" --config "retention.ms=-1"
来自真实用户的反馈,见证轩辕镜像的优质服务
免费版仅支持 Docker Hub 加速,不承诺可用性和速度;专业版支持更多镜像源,保证可用性和稳定速度,提供优先客服响应。
免费版仅支持 docker.io;专业版支持 docker.io、gcr.io、ghcr.io、registry.k8s.io、nvcr.io、quay.io、mcr.microsoft.com、docker.elastic.co 等。
当返回 402 Payment Required 错误时,表示流量已耗尽,需要充值流量包以恢复服务。
通常由 Docker 版本过低导致,需要升级到 20.x 或更高版本以支持 V2 协议。
先检查 Docker 版本,版本过低则升级;版本正常则验证镜像信息是否正确。
使用 docker tag 命令为镜像打上新标签,去掉域名前缀,使镜像名称更简洁。
探索更多轩辕镜像的使用方法,找到最适合您系统的配置方式
通过 Docker 登录认证访问私有仓库
在 Linux 系统配置镜像加速服务
在 Docker Desktop 配置镜像加速
Docker Compose 项目配置加速
Kubernetes 集群配置 Containerd
在宝塔面板一键配置镜像加速
Synology 群晖 NAS 配置加速
飞牛 fnOS 系统配置镜像加速
极空间 NAS 系统配置加速服务
爱快 iKuai 路由系统配置加速
绿联 NAS 系统配置镜像加速
QNAP 威联通 NAS 配置加速
Podman 容器引擎配置加速
HPC 科学计算容器配置加速
ghcr、Quay、nvcr 等镜像仓库
无需登录使用专属域名加速
需要其他帮助?请查看我们的 常见问题 或 官方QQ群: 13763429