本站支持搜索的镜像仓库:Docker Hub、gcr.io、ghcr.io、quay.io、k8s.gcr.io、registry.gcr.io、elastic.co、mcr.microsoft.com
Apache Beam 是一个统一的数据处理模型,用于定义批处理和流处理数据并行处理管道,同时提供一套特定语言的 SDK 用于构建管道,以及在分布式处理后端执行管道的运行器(Runner)。支持的后端包括 Apache Apex、Apache Flink、Apache Spark、Google Cloud Dataflow 和 Hazelcast Jet 等。其核心目标是简化跨平台、跨场景的数据处理逻辑开发,实现"一次编写,多处运行"的能力。
基于 Google Dataflow 模型演进而来,统一批处理和流处理的编程范式,支持无界数据流(流处理)和有界数据流(批处理)的统一表达,简化复杂数据处理逻辑的开发。
提供多种编程语言的 SDK,满足不同技术栈需求:
支持在多种分布式处理引擎上执行管道,无需修改业务逻辑:
核心概念包括:
场景:使用现有 SDK 编写数据处理管道,并在指定运行器上执行。
适用范围:数据工程师、数据分析师需快速开发批流处理任务,无需关注底层执行引擎细节。例如:日志数据实时清洗、用户行为分析、ETL 流程构建等。
场景:为特定用户群体开发 Beam SDK(如 Scala、R、图形化界面等)。
适用范围:语言专家或框架开发者,需扩展 Beam 的语言生态,满足特定技术栈需求。
场景:为分布式处理环境开发 PipelineRunner,支持基于 Beam 模型编写的程序。
适用范围:分布式计算框架开发者,需将 Beam 生态集成至自有执行引擎,扩展框架的数据处理能力。
Apache Beam 官方未提供统一的 Docker 镜像,建议基于 SDK 语言构建自定义镜像。以下为 Java SDK 示例:
Dockerfile(Java SDK 示例)
FROM maven:3.8.5-openjdk-11 AS builder WORKDIR /app COPY pom.xml . # 缓存依赖 RUN mvn dependency:go-offline COPY src ./src # 构建 Beam 管道应用 RUN mvn package -DskipTests FROM openjdk:11-jre-slim WORKDIR /app COPY --from=builder /app/target/*.jar app.jar # 运行 DirectRunner(本地调试) ENTRYPOINT ["java", "-jar", "app.jar", "--runner=DirectRunner"]
构建并运行镜像
# 构建镜像 docker build -t beam-java-app:latest . # 运行(使用 DirectRunner 本地执行) docker run --rm beam-java-app:latest
若需提交至 Flink 集群执行,需在运行时指定 Flink 集群地址:
docker run --rm \ -e FLINK_MASTER=flink-jobmanager:8081 \ beam-java-app:latest \ --runner=FlinkRunner \ --flink-master=${FLINK_MASTER} \ --streaming=true # 若为流处理任务
version: "3.8" services: flink-jobmanager: image: flink:1.17-scala_2.12 ports: - "8081:8081" command: jobmanager environment: - JOB_MANAGER_RPC_ADDRESS=flink-jobmanager flink-taskmanager: image: flink:1.17-scala_2.12 depends_on: - flink-jobmanager command: taskmanager environment: - JOB_MANAGER_RPC_ADDRESS=flink-jobmanager beam-app: build: . depends_on: - flink-jobmanager environment: - FLINK_MASTER=flink-jobmanager:8081 command: > java -jar app.jar --runner=FlinkRunner --flink-master=${FLINK_MASTER} --jobName=beam-flink-demo
| 参数名 | 说明 | 示例值 |
|---|---|---|
--runner | 指定运行器类型 | DirectRunner/FlinkRunner |
--jobName | 任务名称 | beam-wordcount-demo |
--inputFile | 输入文件路径(批处理) | /data/input.txt |
--output | 输出路径 | /data/output |
FlinkRunner
| 参数名 | 说明 | 示例值 |
|---|---|---|
--flink-master | Flink 集群 JobManager 地址 | flink-jobmanager:8081 |
--streaming | 是否启用流处理模式 | true/false |
--parallelism | 任务并行度 | 4 |
SparkRunner
| 参数名 | 说明 | 示例值 |
|---|---|---|
--spark-master | Spark 集群 Master 地址 | spark://spark-master:7077 |
--spark-submit | Spark 提交命令路径 | /opt/spark/bin/spark-submit |
DataflowRunner
| 参数名 | 说明 | 示例值 |
|---|---|---|
--project | GCP 项目 ID | my-gcp-project |
--region | 区域 | us-central1 |
--tempLocation | GCS 临时文件路径 | gs://my-bucket/temp |
| 环境变量名 | 说明 | 示例值 |
|---|---|---|
BEAM_HOME | Beam 安装路径(可选) | /opt/apache-beam |
FLINK_CONF_DIR | Flink 配置文件目录 | /etc/flink |
SPARK_HOME | Spark 安装路径 | /opt/spark |
import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.FlatMapElements; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TypeDescriptors; import java.util.Arrays; public class WordCount { public static void main(String[] args) { PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); Pipeline p = Pipeline.create(options); p.apply(TextIO.read().from(options.as(WordCountOptions.class).getInputFile())) .apply(FlatMapElements.into(TypeDescriptors.strings()) .via(line -> Arrays.asList(line.split(" ")))) .apply(Count.perElement()) .apply(MapElements.into(TypeDescriptors.strings()) .via(wordCount -> wordCount.getKey() + ": " + wordCount.getValue())) .apply(TextIO.write().to(options.as(WordCountOptions.class).getOutput())); p.run().waitUntilFinish(); } public interface WordCountOptions extends PipelineOptions { String getInputFile(); void setInputFile(String value); String getOutput(); void setOutput(String value); } }
docker run --rm \ -v $(pwd)/input.txt:/data/input.txt \ -v $(pwd)/output:/data/output \ beam-java-app:latest \ --runner=DirectRunner \ --inputFile=/data/input.txt \ --output=/data/output/result
免费版仅支持 Docker Hub 加速,不承诺可用性和速度;专业版支持更多镜像源,保证可用性和稳定速度,提供优先客服响应。
免费版仅支持 docker.io;专业版支持 docker.io、gcr.io、ghcr.io、registry.k8s.io、nvcr.io、quay.io、mcr.microsoft.com、docker.elastic.co 等。
当返回 402 Payment Required 错误时,表示流量已耗尽,需要充值流量包以恢复服务。
通常由 Docker 版本过低导致,需要升级到 20.x 或更高版本以支持 V2 协议。
先检查 Docker 版本,版本过低则升级;版本正常则验证镜像信息是否正确。
使用 docker tag 命令为镜像打上新标签,去掉域名前缀,使镜像名称更简洁。
探索更多轩辕镜像的使用方法,找到最适合您系统的配置方式
通过 Docker 登录方式配置轩辕镜像加速服务,包含7个详细步骤
在 Linux 系统上配置轩辕镜像源,支持主流发行版
在 Docker Desktop 中配置轩辕镜像加速,适用于桌面系统
在 Docker Compose 中使用轩辕镜像加速,支持容器编排
在 k8s 中配置 containerd 使用轩辕镜像加速
在宝塔面板中配置轩辕镜像加速,提升服务器管理效率
在 Synology 群晖NAS系统中配置轩辕镜像加速
在飞牛fnOS系统中配置轩辕镜像加速
在极空间NAS中配置轩辕镜像加速
在爱快ikuai系统中配置轩辕镜像加速
在绿联NAS系统中配置轩辕镜像加速
在威联通NAS系统中配置轩辕镜像加速
在 Podman 中配置轩辕镜像加速,支持多系统
配置轩辕镜像加速9大主流镜像仓库,包含详细配置步骤
无需登录即可使用轩辕镜像加速服务,更加便捷高效
需要其他帮助?请查看我们的 常见问题 或 官方QQ群: 13763429