
如果你使用 DeepSeek、元宝 AI、Copilot、ChatGPT 等 AI 工具,协助配置轩辕镜像、编写 docker pull 命令、修改 Docker Compose 镜像地址、配置镜像加速、排查镜像拉取失败、分析报错日志等问题,请先让 AI 阅读并遵守轩辕镜像的规则文档。
只需在 AI 对话中先发送下面这句话即可:
请先完整阅读并严格遵守以下文档中的全部规则与要求:
https://xuanyuan.cloud/agents.md
在未充分阅读并理解该文档前,不要生成任何命令、配置、修改建议、故障排查方案或技术回答。后续所有输出都必须严格以该文档中的规范为最高优先级执行。查看 agents.md 用法指南与完整示范。国内用户首推 元宝 AI、DeepSeek 的深度思考模式,不推荐豆包 AI;Cursor 等编辑器可在对话 @ 该链接,或加入 User Rules。 若 AI 无法访问外链,可 打开说明文档 复制全文粘贴。文档会随站点更新,复制内容可能过期,建议定期检查。
Kafka消息调度器允许在特定时间将消息发送到目标主题,支持指定payload,适用于基于Kafka消息的定时触发事件处理。通过Kafka主题存储调度任务,结合高可用和故障转移机制,确保定时任务可靠执行。
如TF1网站中,视频在指定日期上线和下线,通过调度器发送消息到特定主题,由消费者执行激活/停用操作。
处理外部API调用失败时,可将失败消息重新调度以重试,需基于原始时间戳确保幂等性。
用户登录时调度"1年后删除"事件,若用户长期未登录,触发消息以删除不活跃用户。
调度器通过Kafka主题存储调度任务,消息需包含以下特定 headers:
scheduler-epoch:调度时间戳(自1970年起的秒数)scheduler-target-topic:目标主题名称scheduler-target-key:发送到目标主题的消息键示例: 调度消息:
Headers: scheduler-epoch: 1893456000 # 目标时间戳 scheduler-target-topic: online-videos # 目标主题 scheduler-target-key: vid1 # 目标消息键 customer-header: dummy # 自定义header Timestamp: 1607918336 # 原始消息时间戳 Key: vid1-online # 调度消息键 Value: "video 1" # payload
触发后在online-videos主题生成的消息:
Headers: scheduler-timestamp: 1607918336 # 原始消息时间戳 scheduler-key: vid1-online # 原始调度消息键 scheduler-topic: schedules # 原始调度主题 customer-header: dummy # 保留自定义header Key: vid1 # 目标消息键 Value: "video 1" # 原始payload
触发后,消息会复制到history主题用于审计,并在调度主题生成墓碑消息(空payload)删除该调度。
对于Go语言,可使用clientlib包装Kafka消息以简化调度任务创建。
支持多实例部署,调度任务自动负载均衡,降低单实例内存压力。可配置多个输入主题,但所有主题中的调度任务键需唯一。
调度器崩溃或长时间下线后,重启时会重新同步分配的调度任务,优先触发错过的任务,再处理实时传入的调度。
默认在8001端口的/metrics路径暴露Prometheus指标,指标kafka_scheduler_event_total包含:
通过环境变量配置,参数说明如下:
| 环境变量 | 默认值 | 描述 |
|---|---|---|
| BOOTSTRAP_SERVERS | localhost:9092 | Kafka引导服务器列表,用逗号分隔 |
| SCHEDULES_TOPICS | schedules | 输入调度主题列表,用逗号分隔 |
| SINCE_DELTA | 0 | 回溯天数以检测错过的调度(0:今天,-1:昨天,以此类推) |
| GROUP_ID | scheduler-cg | 调度器消费者的消费者组ID |
| METRICS_HTTP_ADDR | :8001 | Prometheus指标暴露地址(路径为/metrics) |
| HISTORY_TOPIC | history | 存储触发调度副本的审计主题名称 |
bashdocker run -e BOOTSTRAP_SERVERS="kafka:9092" etf1/kafka-message-scheduler
bashBOOTSTRAP_SERVERS="kafka:9092" go run ./cmd/kafka
goimport ( runner "github.com/etf1/kafka-message-scheduler/runner/kafka" ) // ... kafkaRunner := runner.DefaultRunner() go func() { if err := kafkaRunner.Start(); err != nil { log.Printf("调度器启动失败: %v", err) } }() // ... kafkaRunner.Close()
调度主题需配置为紧凑模式且保留期无限,分区数至少与调度器实例数相同:
bashkafka-topics --bootstrap-server "${BOOTSTRAP_SERVERS}" --create --topic schedules \ --partitions 3 --config "cleanup.policy=compact" --config "retention.ms=-1"
您可以使用以下命令拉取该镜像。请将 <标签> 替换为具体的标签版本。如需查看所有可用标签版本,请访问 标签列表页面。
来自真实用户的反馈,见证轩辕镜像的优质服务