Apache Beam 算子¶
Apache Beam 是一个开源的统一模型,用于定义批处理和流式数据并行处理流水线。使用一个开源的 Beam SDK,您可以构建一个定义流水线的程序。然后,流水线由 Beam 支持的分布式处理后端之一执行,其中包括 Apache Flink、Apache Spark 和 Google Cloud Dataflow。
注意
当 Apache Beam 流水线在 Dataflow 服务 上运行时,此算子需要在 Airflow worker 上安装 gcloud
命令(Google Cloud SDK)<https://cloud.google.com/sdk/docs/install>。
在 Apache Beam 中运行 Python 流水线¶
必须为 BeamRunPythonPipelineOperator
指定 py_file
参数,因为它包含要由 Beam 执行的流水线。Python 文件可以位于 Airflow 能够下载的 GCS 上,也可以位于本地文件系统上(提供其绝对路径)。
py_interpreter
参数指定在执行流水线时要使用的 Python 版本,默认为 python3
。如果您的 Airflow 实例在 Python 2 上运行 - 请指定 python2
并确保您的 py_file
使用 Python 2。为了获得最佳结果,请使用 Python 3。
如果指定了 py_requirements
参数,则将创建一个具有指定要求的临时 Python 虚拟环境,并在其中运行流水线。
py_system_site_packages
参数指定您的 Airflow 实例中的所有 Python 包是否都可以在虚拟环境中访问(如果指定了 py_requirements
参数),建议避免这样做,除非 Dataflow 作业需要它。
使用 DirectRunner 的 Python 流水线¶
start_python_pipeline_local_direct_runner = BeamRunPythonPipelineOperator(
task_id="start_python_pipeline_local_direct_runner",
py_file="apache_beam.examples.wordcount",
py_options=["-m"],
py_requirements=["apache-beam[gcp]==2.46.0"],
py_interpreter="python3",
py_system_site_packages=False,
)
start_python_pipeline_direct_runner = BeamRunPythonPipelineOperator(
task_id="start_python_pipeline_direct_runner",
py_file=GCS_PYTHON,
py_options=[],
pipeline_options={"output": GCS_OUTPUT},
py_requirements=["apache-beam[gcp]==2.46.0"],
py_interpreter="python3",
py_system_site_packages=False,
)
您可以为此操作使用可延迟模式,以便异步运行算子。当 worker 知道它必须等待时,它将有机会释放 worker,并将恢复算子的工作交给触发器。因此,当它被挂起(延迟)时,它不会占用 worker 插槽,您的集群将减少在空闲算子或传感器上浪费的资源。
start_python_pipeline_local_direct_runner = BeamRunPythonPipelineOperator(
task_id="start_python_pipeline_local_direct_runner",
py_file="apache_beam.examples.wordcount",
py_options=["-m"],
py_requirements=["apache-beam[gcp]==2.46.0"],
py_interpreter="python3",
py_system_site_packages=False,
deferrable=True,
)
start_python_pipeline_direct_runner = BeamRunPythonPipelineOperator(
task_id="start_python_pipeline_direct_runner",
py_file=GCS_PYTHON,
py_options=[],
pipeline_options={"output": GCS_OUTPUT},
py_requirements=["apache-beam[gcp]==2.46.0"],
py_interpreter="python3",
py_system_site_packages=False,
deferrable=True,
)
使用 DataflowRunner 的 Python 流水线¶
start_python_pipeline_dataflow_runner = BeamRunPythonPipelineOperator(
task_id="start_python_pipeline_dataflow_runner",
runner="DataflowRunner",
py_file=GCS_PYTHON,
pipeline_options={
"tempLocation": GCS_TMP,
"stagingLocation": GCS_STAGING,
"output": GCS_OUTPUT,
},
py_options=[],
py_requirements=["apache-beam[gcp]==2.46.0"],
py_interpreter="python3",
py_system_site_packages=False,
dataflow_config=DataflowConfiguration(
job_name="{{task.task_id}}", project_id=GCP_PROJECT_ID, location="us-central1"
),
)
start_python_job_dataflow_runner_async = BeamRunPythonPipelineOperator(
task_id="start_python_job_dataflow_runner_async",
runner="DataflowRunner",
py_file=GCS_PYTHON_DATAFLOW_ASYNC,
pipeline_options={
"tempLocation": GCS_TMP,
"stagingLocation": GCS_STAGING,
"output": GCS_OUTPUT,
},
py_options=[],
py_requirements=["apache-beam[gcp]==2.26.0"],
py_interpreter="python3",
py_system_site_packages=False,
dataflow_config=DataflowConfiguration(
job_name="{{task.task_id}}",
project_id=GCP_PROJECT_ID,
location="us-central1",
wait_until_finished=False,
),
)
wait_for_python_job_dataflow_runner_async_done = DataflowJobStatusSensor(
task_id="wait-for-python-job-async-done",
job_id="{{task_instance.xcom_pull('start_python_job_dataflow_runner_async')['dataflow_job_id']}}",
expected_statuses={DataflowJobStatus.JOB_STATE_DONE},
project_id=GCP_PROJECT_ID,
location="us-central1",
)
start_python_job_dataflow_runner_async >> wait_for_python_job_dataflow_runner_async_done
您可以为此操作使用可延迟模式,以便异步运行算子。当 worker 知道它必须等待时,它将有机会释放 worker,并将恢复算子的工作交给触发器。因此,当它被挂起(延迟)时,它不会占用 worker 插槽,您的集群将减少在空闲算子或传感器上浪费的资源。
start_python_pipeline_dataflow_runner = BeamRunPythonPipelineOperator(
task_id="start_python_pipeline_dataflow_runner",
runner="DataflowRunner",
py_file=GCS_PYTHON,
pipeline_options={
"tempLocation": GCS_TMP,
"stagingLocation": GCS_STAGING,
"output": GCS_OUTPUT,
},
py_options=[],
py_requirements=["apache-beam[gcp]==2.46.0"],
py_interpreter="python3",
py_system_site_packages=False,
dataflow_config=DataflowConfiguration(
job_name="{{task.task_id}}", project_id=GCP_PROJECT_ID, location="us-central1"
),
deferrable=True,
)
在 Apache Beam 中运行 Java 流水线¶
对于 Java 流水线,必须为 BeamRunJavaPipelineOperator
指定 jar
参数,因为它包含要由 Apache Beam 执行的流水线。JAR 可以位于 Airflow 能够下载的 GCS 上,也可以位于本地文件系统上(提供其绝对路径)。
使用 DirectRunner 的 Java 流水线¶
jar_to_local_direct_runner = GCSToLocalFilesystemOperator(
task_id="jar_to_local_direct_runner",
bucket=GCS_JAR_DIRECT_RUNNER_BUCKET_NAME,
object_name=GCS_JAR_DIRECT_RUNNER_OBJECT_NAME,
filename="/tmp/beam_wordcount_direct_runner_{{ ds_nodash }}.jar",
)
start_java_pipeline_direct_runner = BeamRunJavaPipelineOperator(
task_id="start_java_pipeline_direct_runner",
jar="/tmp/beam_wordcount_direct_runner_{{ ds_nodash }}.jar",
pipeline_options={
"output": "/tmp/start_java_pipeline_direct_runner",
"inputFile": GCS_INPUT,
},
job_class="org.apache.beam.examples.WordCount",
)
jar_to_local_direct_runner >> start_java_pipeline_direct_runner
使用 DataflowRunner 的 Java 流水线¶
jar_to_local_dataflow_runner = GCSToLocalFilesystemOperator(
task_id="jar_to_local_dataflow_runner",
bucket=GCS_JAR_DATAFLOW_RUNNER_BUCKET_NAME,
object_name=GCS_JAR_DATAFLOW_RUNNER_OBJECT_NAME,
filename="/tmp/beam_wordcount_dataflow_runner_{{ ds_nodash }}.jar",
)
start_java_pipeline_dataflow = BeamRunJavaPipelineOperator(
task_id="start_java_pipeline_dataflow",
runner="DataflowRunner",
jar="/tmp/beam_wordcount_dataflow_runner_{{ ds_nodash }}.jar",
pipeline_options={
"tempLocation": GCS_TMP,
"stagingLocation": GCS_STAGING,
"output": GCS_OUTPUT,
},
job_class="org.apache.beam.examples.WordCount",
dataflow_config={"job_name": "{{task.task_id}}", "location": "us-central1"},
)
jar_to_local_dataflow_runner >> start_java_pipeline_dataflow
在 Apache Beam 中运行 Go 流水线¶
必须为 BeamRunGoPipelineOperator
指定 go_file
参数,因为它包含要由 Beam 执行的流水线。Go 文件可以位于 Airflow 能够下载的 GCS 上,也可以位于本地文件系统上(提供其绝对路径)。从本地文件系统运行时,等效于 go run <go_file>
。如果从 GCS 存储桶中拉取,则 beforehand 将使用 go run init example.com/main
和 go mod tidy
初始化模块并安装依赖项。
使用 DirectRunner 的 Go 流水线¶
start_go_pipeline_local_direct_runner = BeamRunGoPipelineOperator(
task_id="start_go_pipeline_local_direct_runner",
go_file="files/apache_beam/examples/wordcount.go",
)
start_go_pipeline_direct_runner = BeamRunGoPipelineOperator(
task_id="start_go_pipeline_direct_runner",
go_file=GCS_GO,
pipeline_options={"output": GCS_OUTPUT},
)
使用 DataflowRunner 的 Go 流水线¶
start_go_pipeline_dataflow_runner = BeamRunGoPipelineOperator(
task_id="start_go_pipeline_dataflow_runner",
runner="DataflowRunner",
go_file=GCS_GO,
pipeline_options={
"tempLocation": GCS_TMP,
"stagingLocation": GCS_STAGING,
"output": GCS_OUTPUT,
"WorkerHarnessContainerImage": "apache/beam_go_sdk:latest",
},
dataflow_config=DataflowConfiguration(
job_name="{{task.task_id}}", project_id=GCP_PROJECT_ID, location="us-central1"
),
)
start_go_job_dataflow_runner_async = BeamRunGoPipelineOperator(
task_id="start_go_job_dataflow_runner_async",
runner="DataflowRunner",
go_file=GCS_GO_DATAFLOW_ASYNC,
pipeline_options={
"tempLocation": GCS_TMP,
"stagingLocation": GCS_STAGING,
"output": GCS_OUTPUT,
"WorkerHarnessContainerImage": "apache/beam_go_sdk:latest",
},
dataflow_config=DataflowConfiguration(
job_name="{{task.task_id}}",
project_id=GCP_PROJECT_ID,
location="us-central1",
wait_until_finished=False,
),
)
wait_for_go_job_dataflow_runner_async_done = DataflowJobStatusSensor(
task_id="wait-for-go-job-async-done",
job_id="{{task_instance.xcom_pull('start_go_job_dataflow_runner_async')['dataflow_job_id']}}",
expected_statuses={DataflowJobStatus.JOB_STATE_DONE},
project_id=GCP_PROJECT_ID,
location="us-central1",
)
start_go_job_dataflow_runner_async >> wait_for_go_job_dataflow_runner_async_done