
cricketeerone/apache-kafka-connect]([] :
txtCONNECT_BOOTSTRAP_SERVERS CONNECT_GROUP_ID CONNECT_KEY_CONVERTER CONNECT_VALUE_CONVERTER CONNECT_CONFIG_STORAGE_TOPIC CONNECT_OFFSET_STORAGE_TOPIC CONNECT_STATUS_STORAGE_TOPIC
有关这些变量在连接到Kafka broker时的完整使用示例,请参见 docker-compose.yml。
想要构建自己的镜像?简而言之 - 克隆仓库,然后使用 ./mvnw clean compile jib:dockerBuild 或 MVN_BUILD_CMD='compile jib:dockerBuild' make 即可完成!
多平台构建(buildx)
默认情况下,使用上述命令将构建基于Ubuntu的 linux/amd64 架构镜像。
以下命令通过Docker Buildx构建多平台镜像并推送到个人Docker Hub账户:
shBUILDX_PLATFORMS=linux/arm64,linux/amd64 DOCKER_USER=$(whoami) make
截至2023年5月,Eclipse Temurin镜像的Alpine变体不支持 arm64 架构。
要推送到私有Docker仓库,需要先 docker login 到该地址。以下命令将 apache-kafka-connect 镜像推送到私有仓库中当前用户名下:
sh$ docker login <仓库地址> --username=$(whoami) $ DOCKER_REGISTRY=<仓库地址> DOCKER_USER=$(whoami) \ make
本教程使用Jib打包 ConnectDistributed 类来运行Kafka Connect分布式模式worker。以下说明使用 Bitnami Kafka镜像,但其他Kafka Docker镜像也应适用。
本教程大致遵循 Kafka官方网站上的Connect教程,但使用分布式Connect服务器。
如果不使用Docker,可以使用相应的启动脚本在本地启动Kafka(如果不使用Kraft,还需要ZooKeeper)。此时,引导服务器的变量需要相应调整。
以下步骤可用于在Docker外部本地运行此应用:
bash# 假设Kafka使用默认端口 export CONNECT_BOOTSTRAP_SERVERS=127.0.0.1:9092 export CONNECT_GROUP_ID=cg_connect-jib export CONNECT_CONFIG_STORAGE_TOPIC=connect-jib_config export CONNECT_OFFSET_STORAGE_TOPIC=connect-jib_offsets export CONNECT_STATUS_STORAGE_TOPIC=connect-jib_status # 不能高于Kafka集群中的broker数量 export CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1 export CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1 export CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1 # 默认使用ByteArrayConverter,允许各个连接器自行配置 export CONNECT_KEY_CONVERTER=org.apache.kafka.connect.converters.ByteArrayConverter export CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.converters.ByteArrayConverter # 通过Maven运行ConnectDistributed ./mvnw clean exec:java
注意:有时Kafka容器会在以下步骤中自行终止,因此消费者命令可能需要重新执行。Connect worker应能自行重新连接。
本练习需要使用三个独立的终端窗口,请先打开它们。
首先,在前台启动集群。这会启动Kafka,在主机上监听 9092 端口,在Docker网络内监听 29092 端口。
终端1
bashdocker compose up kafka
需要创建用于生产数据的主题:
终端2
bashdocker compose exec kafka \ bash -c "kafka-topics.sh --create --bootstrap-server kafka:29092 --topic input --partitions=1 --replication-factor=1"
验证主题是否存在:
bashdocker compose exec kafka \ bash -c "kafka-topics.sh --list --bootstrap-server kafka:29092"
列表中应包含 input 主题。
bashdocker compose exec kafka \ bash -c "cat /data/lipsum.txt | kafka-console-producer.sh --topic input --broker-list kafka:29092"
验证数据是否存在(注意:将 max-messages 设置为预期文本的行数):
bashdocker compose exec kafka \ bash -c "kafka-console-consumer.sh --topic input --bootstrap-server kafka:29092 --from-beginning --max-messages=9"
应看到最后一行 Processed a total of 9 messages(共处理了9条消息)。
现在,可以构建Kafka Connect镜像并启动它:
bash./mvnw clean install docker compose up connect-jib-1
等待日志中出现 Kafka Connect Started(Kafka Connect已启动),然后创建FileSink连接器。如果未提供 file 参数,连接器任务将把数据写入容器的标准输出(终端1)。
终端3
使用Kafka Connect REST API启动此过程:
bashcurl -XPUT http://localhost:8083/connectors/console-sink/config -H 'Content-Type: application/json' -d '{ "connector.class": "FileStreamSink", "tasks.max": 1, "topics": "input", "transforms": "MakeMap,AddPartition", "transforms.MakeMap.type": "org.apache.kafka.connect.transforms.HoistField$Value", "transforms.MakeMap.field" : "line", "transforms.AddPartition.type": "org.apache.kafka.connect.transforms.InsertField$Value", "transforms.AddPartition.partition.field" : "partition!", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.storage.StringConverter" }'
这将从 input 主题的起始位置读取数据并开始处理。
在终端2的输出中,应看到类似以下内容:
textconnect-jib_1 | Struct{line=Morbi eu pharetra dolor. ....,partition=1} connect-jib_1 | Struct{line=,partition=1} connect-jib_1 | Struct{line=Nullam mauris sapien, vestibulum ....,partition=1}
这是Kafka Connect内部 Struct 类的 toString() 表示。由于添加了 HoistField$Value 转换,因此会有一个结构化对象,其中 line 字段设置为从 lipsum.txt 文件行中读取的Kafka消息值,partition 字段设置为消费记录的分区。该主题仅创建了一个分区。
要重复此过程,请删除连接器并重置消费者组:
bashcurl -XDELETE http://localhost:8083/connectors/console-sink docker compose exec kafka \ bash -c "kafka-consumer-groups.sh --bootstrap-server kafka:29092 --group connect-console-sink --reset-offsets --all-topics --to-earliest --execute"
重新运行上述console-producer和 curl -XPUT ... 命令,此时将打印超过9条消息。
使用具有多个分区的新主题重新执行教程,向其中生产更多输入数据,然后增加连接器的 max.tasks。注意输出中的 partition 字段可能会变化(可能需要多次生产数据以随机化记录批次)。
扩展worker需要添加另一个容器,并设置唯一的 CONNECT_ADVERTISED_HOST_NAME 变量。例如:
ymlconnect-jib-2: image: *connect-image hostname: connect-jib-2 depends_on: - kafka environment: <<: *connect-vars CONNECT_REST_ADVERTISED_HOST_NAME: connect-jib-2
应在所有实例前添加反向代理。docker-compose.cluster.yml 中提供了使用Traefik的示例,可通过 docker compose -f docker-compose.cluster.yml up 启动,并使用 curl -H Host:connect-jib.docker.localhost http://127.0.0.1/ 进行测试。
免责声明:最好将此镜像视为可添加自定义连接器的基础镜像。以下是Apache Kafka项目提供的默认连接器插件输出。
连接器插件最好放置在 /app/libs 目录中,因此需要设置环境变量 CONNECT_PLUGIN_PATH="/app/libs"。
使用 confluent-hub 镜像标签时,可以按如下方式扩展这些镜像:
DockerfileFROM cricketeerone/apache-kafka-connect:latest-confluent-hub # 示例连接器安装 RUN confluent-hub install --no-prompt \ --component-dir /app/libs --worker-configs /app/resources/connect-distributed.properties -- \ <connector-id>
其中 <connector-id> 可从 Confluent Hub 上的可用资源中复制。无法保证Kafka Connect基础版本与安装的任何插件版本之间的兼容性。
需要重申的是,confluent-hub 不包含在基础镜像版本中;它们仅包含 Apache Kafka提供的连接器类,限于File Sink/Source和MirrorSource Connector(MirrorMaker 2.0)。通常,您可能需要按上述方式添加自己的连接器。
bash$ curl localhost:8083/connector-plugins | jq [ { "class": "org.apache.kafka.connect.file.FileStreamSinkConnector", "type": "sink", "version": "3.5.1" }, { "class": "org.apache.kafka.connect.file.FileStreamSourceConnector", "type": "source", "version": "3.5.1" }, { "class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector", "type": "source", "version": "3.5.1" }, { "class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector", "type": "source", "version": "3.5.1" }, { "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector", "type": "source", "version": "3.5.1" } ]
File Source/Sink 不应用于生产环境,根据文档,它们仅作为"简单的独立示例":
Kafka源代码中的
file包包含一个简单示例连接器。此连接器仅用于独立模式...
文件具有简单的结构化数据——每行只是一个字符串。几乎所有实际连接器都需要具有更复杂数据格式的模式。
话虽如此,MirrorSource将是更贴近实际应用的示例。
Confluent文档涵盖了基本身份验证。
创建文件:
shell$ cat /tmp/connect-jaas.conf KafkaConnect { org.apache.kafka.connect.rest.basic.auth.extension.PropertyFileLoginModule required file="/tmp/connect.password"; }; $ cat /tmp/connect.password # 根据需要添加多行 admin: OneCricketeer
添加环境变量和挂载(JAVA_TOOL_OPTIONS 来自Eclipse Temurin基础镜像):
yamlenvironment: ... # 认证 CONNECT_REST_EXTENSION_CLASSES: org.apache.kafka.connect.rest.basic.auth.extension.BasicAuthSecurityRestExtension JAVA_TOOL_OPTIONS: "-Djava.security.auth.login.config=/app/connect-jaas.conf" volumes: # 认证 - /tmp/connect-jaas.conf:/app/connect-jaas.conf:ro - /tmp/connect.password:/tmp/connect.password:ro
启动 docker compose up 并测试:
shell$ curl -w'\n' http://localhost:8083 User cannot access the resource. # 用户无法访问资源 $ curl -w'\n' -uadmin:OneCricketeer http://localhost:8083 {"version":"3.5.1","commit":"2c6fb6c54472e90a","kafka_cluster_id":"nA5eYC5WSrSHjaKgw1BpHg"}
exec:java 目标可用于在Docker外部运行Kafka Connect。
例如,要重建容器,运行 ./mvnw clean install 或 make。
bashdocker compose rm -sf # 清理挂载的docker卷 docker volume ls | grep $(basename `pwd`) | awk '{print $2}' | xargs docker volume rm # 清理网络 docker network ls | grep $(basename `pwd`) | awk '{print $2}' | xargs docker network rm
了解 更多关于Jib。
了解 更多关于Apache Kafka和Kafka Connect。


探索更多轩辕镜像的使用方法,找到最适合您系统的配置方式
通过 Docker 登录认证访问私有仓库
无需登录使用专属域名
Kubernetes 集群配置 Containerd
K3s 轻量级 Kubernetes 镜像加速
VS Code Dev Containers 配置
Podman 容器引擎配置
HPC 科学计算容器配置
ghcr、Quay、nvcr 等镜像仓库
Harbor Proxy Repository 对接专属域名
Portainer Registries 加速拉取
Nexus3 Docker Proxy 内网缓存
需要其他帮助?请查看我们的 常见问题Docker 镜像访问常见问题解答 或 提交工单
manifest unknown
no matching manifest(架构)
invalid tar header(解压)
TLS 证书失败
DNS 超时
410 Gone 排查
402 与流量用尽
401 认证失败
429 限流
D-Bus 凭证提示
413 与超大单层
来自真实用户的反馈,见证轩辕镜像的优质服务