apache/beam_flink1.7_job_serverApache Beam 是一个统一的数据处理模型,用于定义批处理和流处理数据并行处理管道,同时提供一套特定语言的 SDK 用于构建管道,以及在分布式处理后端执行管道的运行器(Runner)。支持的后端包括 Apache Apex、Apache Flink、Apache Spark、Google Cloud Dataflow 和 Hazelcast Jet 等。其核心目标是简化跨平台、跨场景的数据处理逻辑开发,实现"一次编写,多处运行"的能力。
基于 Google Dataflow 模型演进而来,统一批处理和流处理的编程范式,支持无界数据流(流处理)和有界数据流(批处理)的统一表达,简化复杂数据处理逻辑的开发。
提供多种编程语言的 SDK,满足不同技术栈需求:
支持在多种分布式处理引擎上执行管道,无需修改业务逻辑:
核心概念包括:
场景:使用现有 SDK 编写数据处理管道,并在指定运行器上执行。
适用范围:数据工程师、数据分析师需快速开发批流处理任务,无需关注底层执行引擎细节。例如:日志数据实时清洗、用户行为分析、ETL 流程构建等。
场景:为特定用户群体开发 Beam SDK(如 Scala、R、图形化界面等)。
适用范围:语言专家或框架开发者,需扩展 Beam 的语言生态,满足特定技术栈需求。
场景:为分布式处理环境开发 PipelineRunner,支持基于 Beam 模型编写的程序。
适用范围:分布式计算框架开发者,需将 Beam 生态集成至自有执行引擎,扩展框架的数据处理能力。
Apache Beam 官方未提供统一的 Docker 镜像,建议基于 SDK 语言构建自定义镜像。以下为 Java SDK 示例:
Dockerfile(Java SDK 示例)
dockerfileFROM maven:3.8.5-openjdk-11 AS builder WORKDIR /app COPY pom.xml . # 缓存依赖 RUN mvn dependency:go-offline COPY src ./src # 构建 Beam 管道应用 RUN mvn package -DskipTests FROM openjdk:11-jre-slim WORKDIR /app COPY --from=builder /app/target/*.jar app.jar # 运行 DirectRunner(本地调试) ENTRYPOINT ["java", "-jar", "app.jar", "--runner=DirectRunner"]
构建并运行镜像
bash# 构建镜像 docker build -t beam-java-app:latest . # 运行(使用 DirectRunner 本地执行) docker run --rm beam-java-app:latest
若需提交至 Flink 集群执行,需在运行时指定 Flink 集群地址:
bashdocker run --rm \ -e FLINK_MASTER=flink-jobmanager:8081 \ beam-java-app:latest \ --runner=FlinkRunner \ --flink-master=${FLINK_MASTER} \ --streaming=true # 若为流处理任务
yamlversion: "3.8" services: flink-jobmanager: image: flink:1.17-scala_2.12 ports: - "8081:8081" command: jobmanager environment: - JOB_MANAGER_RPC_ADDRESS=flink-jobmanager flink-taskmanager: image: flink:1.17-scala_2.12 depends_on: - flink-jobmanager command: taskmanager environment: - JOB_MANAGER_RPC_ADDRESS=flink-jobmanager beam-app: build: . depends_on: - flink-jobmanager environment: - FLINK_MASTER=flink-jobmanager:8081 command: > java -jar app.jar --runner=FlinkRunner --flink-master=${FLINK_MASTER} --jobName=beam-flink-demo
| 参数名 | 说明 | 示例值 |
|---|---|---|
--runner | 指定运行器类型 | DirectRunner/FlinkRunner |
--jobName | 任务名称 | beam-wordcount-demo |
--inputFile | 输入文件路径(批处理) | /data/input.txt |
--output | 输出路径 | /data/output |
FlinkRunner
| 参数名 | 说明 | 示例值 |
|---|---|---|
--flink-master | Flink 集群 JobManager 地址 | flink-jobmanager:8081 |
--streaming | 是否启用流处理模式 | true/false |
--parallelism | 任务并行度 | 4 |
SparkRunner
| 参数名 | 说明 | 示例值 |
|---|---|---|
--spark-master | Spark 集群 Master 地址 | spark://spark-master:7077 |
--spark-submit | Spark 提交命令路径 | /opt/spark/bin/spark-submit |
DataflowRunner
| 参数名 | 说明 | 示例值 |
|---|---|---|
--project | GCP 项目 ID | my-gcp-project |
--region | 区域 | us-central1 |
--tempLocation | GCS 临时文件路径 | gs://my-bucket/temp |
| 环境变量名 | 说明 | 示例值 |
|---|---|---|
BEAM_HOME | Beam 安装路径(可选) | /opt/apache-beam |
FLINK_CONF_DIR | Flink 配置文件目录 | /etc/flink |
SPARK_HOME | Spark 安装路径 | /opt/spark |
javaimport org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.FlatMapElements; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TypeDescriptors; import java.util.Arrays; public class WordCount { public static void main(String[] args) { PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); Pipeline p = Pipeline.create(options); p.apply(TextIO.read().from(options.as(WordCountOptions.class).getInputFile())) .apply(FlatMapElements.into(TypeDescriptors.strings()) .via(line -> Arrays.asList(line.split(" ")))) .apply(Count.perElement()) .apply(MapElements.into(TypeDescriptors.strings()) .via(wordCount -> wordCount.getKey() + ": " + wordCount.getValue())) .apply(TextIO.write().to(options.as(WordCountOptions.class).getOutput())); p.run().waitUntilFinish(); } public interface WordCountOptions extends PipelineOptions { String getInputFile(); void setInputFile(String value); String getOutput(); void setOutput(String value); } }
bashdocker run --rm \ -v $(pwd)/input.txt:/data/input.txt \ -v $(pwd)/output:/data/output \ beam-java-app:latest \ --runner=DirectRunner \ --inputFile=/data/input.txt \ --output=/data/output/result

manifest unknown 错误
TLS 证书验证失败
DNS 解析超时
410 错误:版本过低
402 错误:流量耗尽
身份认证失败错误
429 限流错误
凭证保存错误
来自真实用户的反馈,见证轩辕镜像的优质服务