
如果你使用 DeepSeek、元宝 AI、Copilot、ChatGPT 等 AI 工具,协助配置轩辕镜像、编写 docker pull 命令、修改 Docker Compose 镜像地址、配置镜像加速、排查镜像拉取失败、分析报错日志等问题,请先让 AI 阅读并遵守轩辕镜像的规则文档。
只需在 AI 对话中先发送下面这句话即可:
请先完整阅读并严格遵守以下文档中的全部规则与要求:
https://xuanyuan.cloud/agents.md
在未充分阅读并理解该文档前,不要生成任何命令、配置、修改建议、故障排查方案或技术回答。后续所有输出都必须严格以该文档中的规范为最高优先级执行。查看 agents.md 用法指南与完整示范。国内用户首推 元宝 AI、DeepSeek 的深度思考模式,不推荐豆包 AI;Cursor 等编辑器可在对话 @ 该链接,或加入 User Rules。 若 AI 无法访问外链,可 打开说明文档 复制全文粘贴。文档会随站点更新,复制内容可能过期,建议定期检查。
使用 https://github.com/GoogleContainerTools/jib 打包Apache Kafka Connect分布式服务器。
拉取Docker镜像!🐳
shdocker pull cricketeerone/apache-kafka-connect
还有包含 confluent-hub 的镜像版本!
shdocker pull cricketeerone/apache-kafka-connect:latest-confluent-hub
同时提供Alpine变体,详情请查看 https://hub.docker.com/r/cricketeerone/apache-kafka-connect/tags%E3%80%82
目录
与 confluentinc/cp-kafka-connect 镜像类似,此容器使用以 CONNECT_ 开头的环境变量,后跟需要配置的Kafka Connect Worker属性。
例如,以下是运行Connect分布式服务器所需的最低限度变量,但假设它连接到至少有3个broker的Kafka集群(三个主题的副本因子):
txtCONNECT_BOOTSTRAP_SERVERS CONNECT_GROUP_ID CONNECT_KEY_CONVERTER CONNECT_VALUE_CONVERTER CONNECT_CONFIG_STORAGE_TOPIC CONNECT_OFFSET_STORAGE_TOPIC CONNECT_STATUS_STORAGE_TOPIC
有关这些变量在连接到Kafka broker时的完整使用示例,请参见 docker-compose.yml。
想要构建自己的镜像?简而言之 - 克隆仓库,然后使用 ./mvnw clean compile jib:dockerBuild 或 MVN_BUILD_CMD='compile jib:dockerBuild' make 即可完成!
多平台构建(buildx)
默认情况下,使用上述命令将构建基于Ubuntu的 linux/amd64 架构镜像。
以下命令通过Docker Buildx构建多平台镜像并推送到个人Docker Hub账户:
shBUILDX_PLATFORMS=linux/arm64,linux/amd64 DOCKER_USER=$(whoami) make
截至2023年5月,Eclipse Temurin镜像的Alpine变体不支持 arm64 架构。
要推送到私有Docker仓库,需要先 docker login 到该地址。以下命令将 apache-kafka-connect 镜像推送到私有仓库中当前用户名下:
sh$ docker login <仓库地址> --username=$(whoami) $ DOCKER_REGISTRY=<仓库地址> DOCKER_USER=$(whoami) \ make
本教程使用Jib打包 ConnectDistributed 类来运行Kafka Connect分布式模式worker。以下说明使用 https://github.com/bitnami/bitnami-docker-kafka Kafka镜像,但其他Kafka Docker镜像也应适用。
本教程大致遵循 Kafka官方网站上的Connect教程,但使用分布式Connect服务器。
如果不使用Docker,可以使用相应的启动脚本在本地启动Kafka(如果不使用Kraft,还需要ZooKeeper)。此时,引导服务器的变量需要相应调整。
以下步骤可用于在Docker外部本地运行此应用:
bash# 假设Kafka使用默认端口 export CONNECT_BOOTSTRAP_SERVERS=127.0.0.1:9092 export CONNECT_GROUP_ID=cg_connect-jib export CONNECT_CONFIG_STORAGE_TOPIC=connect-jib_config export CONNECT_OFFSET_STORAGE_TOPIC=connect-jib_offsets export CONNECT_STATUS_STORAGE_TOPIC=connect-jib_status # 不能高于Kafka集群中的broker数量 export CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1 export CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1 export CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1 # 默认使用ByteArrayConverter,允许各个连接器自行配置 export CONNECT_KEY_CONVERTER=org.apache.kafka.connect.converters.ByteArrayConverter export CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.converters.ByteArrayConverter # 通过Maven运行ConnectDistributed ./mvnw clean exec:java
注意:有时Kafka容器会在以下步骤中自行终止,因此消费者命令可能需要重新执行。Connect worker应能自行重新连接。
本练习需要使用三个独立的终端窗口,请先打开它们。
首先,在前台启动集群。这会启动Kafka,在主机上监听 9092 端口,在Docker网络内监听 29092 端口。
终端1
bashdocker compose up kafka
需要创建用于生产数据的主题:
终端2
bashdocker compose exec kafka \ bash -c "kafka-topics.sh --create --bootstrap-server kafka:29092 --topic input --partitions=1 --replication-factor=1"
验证主题是否存在:
bashdocker compose exec kafka \ bash -c "kafka-topics.sh --list --bootstrap-server kafka:29092"
列表中应包含 input 主题。
bashdocker compose exec kafka \ bash -c "cat /data/lipsum.txt | kafka-console-producer.sh --topic input --broker-list kafka:29092"
验证数据是否存在(注意:将 max-messages 设置为预期文本的行数):
bashdocker compose exec kafka \ bash -c "kafka-console-consumer.sh --topic input --bootstrap-server kafka:29092 --from-beginning --max-messages=9"
应看到最后一行 Processed a total of 9 messages(共处理了9条消息)。
现在,可以构建Kafka Connect镜像并启动它:
bash./mvnw clean install docker compose up connect-jib-1
等待日志中出现 Kafka Connect Started(Kafka Connect已启动),然后创建FileSink连接器。如果未提供 file 参数,连接器任务将把数据写入容器的标准输出(终端1)。
终端3
使用Kafka Connect REST API启动此过程:
bashcurl -XPUT http://localhost:8083/connectors/console-sink/config -H 'Content-Type: application/json' -d '{ "connector.class": "FileStreamSink", "tasks.max": 1, "topics": "input", "transforms": "MakeMap,AddPartition", "transforms.MakeMap.type": "org.apache.kafka.connect.transforms.HoistField$Value", "transforms.MakeMap.field" : "line", "transforms.AddPartition.type": "org.apache.kafka.connect.transforms.InsertField$Value", "transforms.AddPartition.partition.field" : "partition!", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.storage.StringConverter" }'
这将从 input 主题的起始位置读取数据并开始处理。
在终端2的输出中,应看到类似以下内容:
textconnect-jib_1 | Struct{line=Morbi eu pharetra dolor. ....,partition=1} connect-jib_1 | Struct{line=,partition=1} connect-jib_1 | Struct{line=Nullam mauris sapien, vestibulum ....,partition=1}
这是Kafka Connect内部 Struct 类的 toString() 表示。由于添加了 HoistField$Value 转换,因此会有一个结构化对象,其中 line 字段设置为从 lipsum.txt 文件行中读取的Kafka消息值,partition 字段设置为消费记录的分区。该主题仅创建了一个分区。
要重复此过程,请删除连接器并重置消费者组:
bashcurl -XDELETE http://localhost:8083/connectors/console-sink docker compose exec kafka \ bash -c "kafka-consumer-groups.sh --bootstrap-server kafka:29092 --group connect-console-sink --reset-offsets --all-topics --to-earliest --execute"
重新运行上述console-producer和 curl -XPUT ... 命令,此时将打印超过9条消息。
使用具有多个分区的新主题重新执行教程,向其中生产更多输入数据,然后增加连接器的 max.tasks。注意输出中的 partition 字段可能会变化(可能需要多次生产数据以随机化记录批次)。
扩展worker需要添加另一个容器,并设置唯一的 CONNECT_ADVERTISED_HOST_NAME 变量。例如:
ymlconnect-jib-2: image: *connect-image hostname: connect-jib-2 depends_on: - kafka environment: <<: *connect-vars CONNECT_REST_ADVERTISED_HOST_NAME: connect-jib-2
应在所有实例前添加反向代理。docker-compose.cluster.yml 中提供了使用Traefik的示例,可通过 docker compose -f docker-compose.cluster.yml up 启动,并使用 curl -H Host:connect-jib.docker.localhost http://127.0.0.1/ 进行测试。
免责声明:最好将此镜像视为可添加自定义连接器的基础镜像。以下是Apache Kafka项目提供的默认连接器插件输出。
连接器插件最好放置在 /app/libs 目录中,因此需要设置环境变量 CONNECT_PLUGIN_PATH="/app/libs"。
使用 confluent-hub 镜像标签时,可以按如下方式扩展这些镜像:
DockerfileFROM cricketeerone/apache-kafka-connect:latest-confluent-hub # 示例连接器安装 RUN confluent-hub install --no-prompt \ --component-dir /app/libs --worker-configs /app/resources/connect-distributed.properties -- \ <connector-id>
其中 <connector-id> 可从 Confluent Hub 上的可用资源中复制。无法保证Kafka Connect基础版本与安装的任何插件版本之间的兼容性。
需要重申的是,confluent-hub 不包含在基础镜像版本中;它们仅包含 Apache Kafka提供的连接器类,限于File Sink/Source和MirrorSource Connector(MirrorMaker 2.0)。通常,您可能需要按上述方式添加自己的连接器。
默认插件
bash$ curl localhost:8083/connector-plugins | jq [ { "class": "org.apache.kafka.connect.file.FileStreamSinkConnector", "type": "sink", "version": "3.5.1" }, { "class": "org.apache.kafka.connect.file.FileStreamSourceConnector", "type": "source", "version": "3.5.1" }, { "class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector", "type": "source", "version": "3.5.1" }, { "class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector", "type": "source", "version": "3.5.1" }, { "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector", "type": "source", "version": "3.5.1" } ]
File Source/Sink 不应用于生产环境,根据文档,它们仅作为"简单的独立示例":
Kafka源代码中的
file包包含一个简单示例连接器。此连接器仅用于独立模式...
文件具有简单的结构化数据——每行只是一个字符串。几乎所有实际连接器都需要具有更复杂数据格式的模式。
话虽如此,MirrorSource将是更贴近实际应用的示例。
Confluent文档涵盖了基本身份验证。
创建文件:
shell$ cat /tmp/connect-jaas.conf KafkaConnect { org.apache.kafka.connect.rest.basic.auth.extension.PropertyFileLoginModule required file="/tmp/connect.password"; }; $ cat /tmp/connect.password # 根据需要添加多行 admin: OneCricketeer
添加环境变量和挂载(JAVA_TOOL_OPTIONS 来自Eclipse Temurin基础镜像):
yamlenvironment: ... # 认证 CONNECT_REST_EXTENSION_CLASSES: org.apache.kafka.connect.rest.basic.auth.extension.BasicAuthSecurityRestExtension JAVA_TOOL_OPTIONS: "-Djava.security.auth.login.config=/app/connect-jaas.conf" volumes: # 认证 - /tmp/connect-jaas.conf:/app/connect-jaas.conf:ro - /tmp/connect.password:/tmp/connect.password:ro
启动 docker compose up 并测试:
shell$ curl -w'\n' http://localhost:8083 User cannot access the resource. # 用户无法访问资源 $ curl -w'\n' -uadmin:OneCricketeer http://localhost:8083 {"version":"3.5.1","commit":"2c6fb6c54472e90a","kafka_cluster_id":"nA5eYC5WSrSHjaKgw1BpHg"}
exec:java 目标可用于在Docker外部运行Kafka Connect。
例如,要重建容器,运行 ./mvnw clean install 或 make。
bashdocker compose rm -sf # 清理挂载的docker卷 docker volume ls | grep $(basename `pwd`) | awk '{print $2}' | xargs docker volume rm # 清理网络 docker network ls | grep $(basename `pwd`) | awk '{print $2}' | xargs docker network rm
了解 https://github.com/GoogleContainerTools/jib%E3%80%82
了解 更多关于Apache Kafka和Kafka Connect。
您可以使用以下命令拉取该镜像。请将 <标签> 替换为具体的标签版本。如需查看所有可用标签版本,请访问 标签列表页面。


来自真实用户的反馈,见证轩辕镜像的优质服务