Google Cloud Dataflow 算子¶
Dataflow 是一种托管服务,用于执行各种数据处理模式。这些管道是使用 Apache Beam 编程模型创建的,该模型允许批处理和流处理。
先决条件任务¶
要使用这些算子,您必须执行一些操作
使用 Cloud 控制台 选择或创建 Cloud Platform 项目。
为您的项目启用计费,如 Google Cloud 文档 中所述。
启用 API,如 Cloud 控制台文档 中所述。
通过 pip 安装 API 库。
pip install 'apache-airflow[google]'有关 安装 的详细信息,请参阅。
运行数据管道的途径¶
根据您的环境和源文件,有几种运行 Dataflow 管道的方法
非模板化管道:如果您有 Java 的
*.jar
文件或 Python 的*.py
文件,开发人员可以在 Airflow 工作器上以本地进程的形式运行管道。这也意味着必须在工作器上安装必要的系统依赖项。对于 Java,工作器必须安装 JRE 运行时。对于 Python,必须安装 Python 解释器。运行时版本必须与管道版本兼容。这是启动管道的最快方式,但由于其经常出现系统依赖项问题,因此可能会导致问题。请参阅:Java SDK 管道、Python SDK 管道,以获取更详细的信息。开发人员还可以通过以 JSON 格式传递其结构来创建管道,然后运行它来创建作业。请参阅:JSON 格式管道和 JSON 格式管道,以获取更详细的信息。模板化管道:程序员可以通过准备一个将在 Google 管理的机器上运行的模板,使管道独立于环境。通过这种方式,对环境的更改不会影响您的管道。模板有两种类型
SQL 管道:开发人员可以将管道编写为 SQL 语句,然后在 Dataflow 中执行它。请参阅:Dataflow SQL
最好使用非模板化管道测试您的管道,然后使用模板在生产中运行管道。
有关管道类型之间差异的详细信息,请参阅 Google Cloud 文档中的Dataflow 模板。
启动非模板化管道¶
JSON 格式管道¶
可以通过 JSON 格式传递管道结构来创建新的管道。请参阅 DataflowCreatePipelineOperator
这将创建一个新的管道,该管道将在 Dataflow Pipelines UI 中显示。
以下是通过运行 DataflowCreatePipelineOperator 创建 Dataflow 管道的一个示例
create_pipeline = DataflowCreatePipelineOperator(
task_id="create_pipeline",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
body={
"name": f"projects/{GCP_PROJECT_ID}/locations/{GCP_LOCATION}/pipelines/{PIPELINE_NAME}",
"type": PIPELINE_TYPE,
"workload": {
"dataflowFlexTemplateRequest": {
"launchParameter": {
"containerSpecGcsPath": GCS_PATH,
"jobName": PIPELINE_JOB_NAME,
"environment": {"tempLocation": TEMP_LOCATION},
"parameters": {
"inputFile": INPUT_FILE,
"output": OUTPUT,
},
},
"projectId": GCP_PROJECT_ID,
"location": GCP_LOCATION,
}
},
},
)
要运行新创建的管道,可以使用 DataflowRunPipelineOperator
run_pipeline = DataflowRunPipelineOperator(
task_id="run_pipeline",
pipeline_name=PIPELINE_NAME,
project_id=GCP_PROJECT_ID,
)
调用后,DataflowRunPipelineOperator 将返回运行给定管道创建的 Google Cloud Dataflow 作业。
有关 API 用法的更多信息,请参阅 Google Cloud 文档中的 数据管道 API REST 资源。
要使用源文件(Java 中的 JAR 或 Python 文件)创建新管道,请使用创建作业操作符。源文件可以位于 GCS 或本地文件系统上。 BeamRunJavaPipelineOperator
或 BeamRunPythonPipelineOperator
Java SDK 管道¶
对于 Java 管道,必须为 BeamRunJavaPipelineOperator
指定 jar
参数,因为它包含要在 Dataflow 上执行的管道。JAR 可以位于 GCS 上,Airflow 可以下载,也可以位于本地文件系统上(提供其绝对路径)。
以下是如何在 Java 中创建和运行一个将 jar 存储在 GCS 上的管道的示例
start_java_job = BeamRunJavaPipelineOperator(
runner=BeamRunnerType.DataflowRunner,
task_id="start-java-job",
jar=GCS_JAR,
pipeline_options={
"output": GCS_OUTPUT,
},
job_class="org.apache.beam.examples.WordCount",
dataflow_config={
"check_if_running": CheckJobRunning.IgnoreJob,
"location": LOCATION,
"poll_sleep": 10,
},
)
以下是在 GCS 上存储 jar 的情况下,使用 Java 创建和运行管道的一个示例,该模式可延迟执行
start_java_deferrable = BeamRunJavaPipelineOperator(
runner=BeamRunnerType.DataflowRunner,
task_id="start-java-job-deferrable",
jar=GCS_JAR,
pipeline_options={
"output": GCS_OUTPUT,
},
job_class="org.apache.beam.examples.WordCount",
dataflow_config={
"check_if_running": CheckJobRunning.WaitForRun,
"location": LOCATION,
"poll_sleep": 10,
"append_job_name": False,
},
deferrable=True,
)
以下是在本地文件系统上存储 jar 的情况下,使用 Java 创建和运行管道的一个示例
start_java_job_local = BeamRunJavaPipelineOperator(
task_id="start_java_job_local",
jar=JAR_FILE_NAME,
pipeline_options={
"output": GCS_OUTPUT,
},
job_class="org.apache.beam.examples.WordCount",
dataflow_config={
"check_if_running": CheckJobRunning.WaitForRun,
"location": LOCATION,
"poll_sleep": 10,
},
)
Python SDK 管道¶
py_file
参数必须为 BeamRunPythonPipelineOperator
指定,因为它包含要在 Dataflow 上执行的管道。Python 文件可以位于 GCS 上,Airflow 可以下载该文件,也可以位于本地文件系统上(提供其绝对路径)。
py_interpreter
参数指定执行管道时要使用的 Python 版本,默认值为 python3
。如果您的 Airflow 实例在 Python 2 上运行,请指定 python2
,并确保您的 py_file
采用 Python 2。为获得最佳结果,请使用 Python 3。
如果指定了 py_requirements
参数,将创建一个具有指定要求的临时 Python 虚拟环境,并在其中运行管道。
py_system_site_packages
参数指定您的 Airflow 实例中的所有 Python 包是否可以在虚拟环境中访问(如果指定了 py_requirements
参数),除非 Dataflow 作业需要,否则建议避免。
start_python_job = BeamRunPythonPipelineOperator(
runner=BeamRunnerType.DataflowRunner,
task_id="start_python_job",
py_file=GCS_PYTHON_SCRIPT,
py_options=[],
pipeline_options={
"output": GCS_OUTPUT,
},
py_requirements=["apache-beam[gcp]==2.47.0"],
py_interpreter="python3",
py_system_site_packages=False,
dataflow_config={"location": LOCATION, "job_name": "start_python_job"},
)
执行模型¶
Dataflow 具有多种执行管道的方式。它可以在以下模式下完成:异步批处理(即时执行),阻塞批处理(等待完成)或流式处理(无限期运行)。在 Airflow 中,最佳做法是使用异步批处理管道或流,并使用传感器侦听预期的作业状态。
默认情况下,BeamRunJavaPipelineOperator
、BeamRunPythonPipelineOperator
、DataflowTemplatedJobStartOperator
和 DataflowStartFlexTemplateOperator
的参数 wait_until_finished
设置为 None
,这会导致不同的行为,具体取决于管道类型
对于流管道,等待作业启动,
对于批处理管道,等待作业完成。
如果 wait_until_finished
设置为 True
,则操作符将始终等待管道执行结束。如果设置为 False
,则只提交作业。
请参阅:在 Cloud Dataflow 服务上执行 PipelineOptions 的配置
异步执行¶
Dataflow 批处理作业默认情况下是异步的;但是,这取决于应用程序代码(包含在 JAR 或 Python 文件中)以及它的编写方式。为了使 Dataflow 作业异步执行,请确保管道对象没有被等待(不在应用程序代码中调用 waitUntilFinish
或 wait_until_finish
在 PipelineResult
上)。
start_python_job_async = BeamRunPythonPipelineOperator(
task_id="start_python_job_async",
runner=BeamRunnerType.DataflowRunner,
py_file=GCS_PYTHON_SCRIPT,
py_options=[],
pipeline_options={
"output": GCS_OUTPUT,
},
py_requirements=["apache-beam[gcp]==2.47.0"],
py_interpreter="python3",
py_system_site_packages=False,
dataflow_config={
"job_name": "start_python_job_async",
"location": LOCATION,
"wait_until_finished": False,
},
)
阻塞执行¶
为了让 Dataflow 作业执行并等待完成,请确保在应用程序代码中等待管道对象。对于 Java SDK,可以通过对 pipeline.run()
返回的 PipelineResult
调用 waitUntilFinish
来实现此目的,对于 Python SDK,可以通过对 pipeline.run()
返回的 PipelineResult
调用 wait_until_finish
来实现此目的。
应避免阻塞作业,因为在 Airflow 上运行时会发生后台进程。此进程会持续运行以等待 Dataflow 作业完成,并在此过程中增加 Airflow 的资源消耗。
流式执行¶
要执行流式 Dataflow 作业,请确保设置流式选项(对于 Python)或从无界数据源(如 Pub/Sub)读取您的管道(对于 Java)。
start_streaming_python_job = BeamRunPythonPipelineOperator(
runner=BeamRunnerType.DataflowRunner,
task_id="start_streaming_python_job",
py_file=GCS_PYTHON_SCRIPT,
py_options=[],
pipeline_options={
"temp_location": GCS_TMP,
"input_topic": "projects/pubsub-public-data/topics/taxirides-realtime",
"output_topic": f"projects/{PROJECT_ID}/topics/{TOPIC_ID}",
"streaming": True,
},
py_requirements=["apache-beam[gcp]==2.47.0"],
py_interpreter="python3",
py_system_site_packages=False,
dataflow_config={"location": LOCATION, "job_name": "start_python_job_streaming"},
)
将参数 drain_pipeline
设置为 True
允许通过排空作业来停止流式作业,而不是在终止任务实例期间取消作业。
请参阅 停止正在运行的管道。
模板化作业¶
模板提供了在 Cloud Storage 上暂存管道并从那里运行管道的功能。这为开发工作流提供了灵活性,因为它将管道的开发与暂存和执行步骤分离开来。Dataflow 有两种类型的模板:经典模板和 Flex 模板。有关更多信息,请参阅 Dataflow 模板的官方文档。
以下是一个使用经典模板和 DataflowTemplatedJobStartOperator
运行 Dataflow 作业的示例
start_template_job = DataflowTemplatedJobStartOperator(
task_id="start_template_job",
project_id=PROJECT_ID,
template="gs://dataflow-templates/latest/Word_Count",
parameters={"inputFile": f"gs://{BUCKET_NAME}/{CSV_FILE_NAME}", "output": GCS_OUTPUT},
location=LOCATION,
wait_until_finished=True,
)
对于此操作,您还可以使用可延迟模式下的操作符
start_template_job_deferrable = DataflowTemplatedJobStartOperator(
task_id="start_template_job_deferrable",
project_id=PROJECT_ID,
template="gs://dataflow-templates/latest/Word_Count",
parameters={"inputFile": f"gs://{BUCKET_NAME}/{CSV_FILE_NAME}", "output": GCS_OUTPUT},
location=LOCATION,
deferrable=True,
)
请参阅 可与此操作符配合使用的 Google 提供的模板列表。
以下是如何使用 DataflowStartFlexTemplateOperator
运行 Dataflow 作业的示例,其中包含 Flex 模板
start_flex_template_job = DataflowStartFlexTemplateOperator(
task_id="start_flex_template_job",
project_id=PROJECT_ID,
body=BODY,
location=LOCATION,
append_job_name=False,
wait_until_finished=True,
)
对于此操作,您还可以使用可延迟模式下的操作符
start_flex_template_job_deferrable = DataflowStartFlexTemplateOperator(
task_id="start_flex_template_job_deferrable",
project_id=PROJECT_ID,
body=BODY,
location=LOCATION,
append_job_name=False,
deferrable=True,
)
Dataflow SQL¶
Dataflow SQL 支持 ZetaSQL 查询语法的一个变体,并包含用于运行 Dataflow 流式作业的其他流式扩展。
以下是如何使用 DataflowStartSqlJobOperator
运行 Dataflow SQL 作业的示例
start_sql = DataflowStartSqlJobOperator(
task_id="start_sql_query",
job_name=DATAFLOW_SQL_JOB_NAME,
query=f"""
SELECT
emp_name as employee,
salary as employee_salary
FROM
bigquery.table.`{PROJECT_ID}`.`{BQ_SQL_DATASET}`.`{BQ_SQL_TABLE_INPUT}`
WHERE salary >= 1000;
""",
options={
"bigquery-project": PROJECT_ID,
"bigquery-dataset": BQ_SQL_DATASET,
"bigquery-table": BQ_SQL_TABLE_OUTPUT,
},
location=LOCATION,
do_xcom_push=True,
)
警告
此操作需要在 Airflow 工作进程上安装 gcloud
命令(Google Cloud SDK)<https://cloud.google.com/sdk/docs/install>`__
请参阅 Dataflow SQL 参考。
停止管道¶
要停止一个或多个 Dataflow 管道,可以使用 DataflowStopJobOperator
。默认情况下,流式管道会被耗尽,将 drain_pipeline
设置为 False
将取消它们。提供 job_id
以停止特定作业,或提供 job_name_prefix
以停止所有具有提供名称前缀的作业。
stop_dataflow_job = DataflowStopJobOperator(
task_id="stop_dataflow_job",
location=LOCATION,
job_name_prefix="start-python-pipeline",
)
请参阅:停止正在运行的管道。
删除管道¶
要删除 Dataflow 管道,可以使用 DataflowDeletePipelineOperator
。以下是如何使用此操作的示例
delete_pipeline = DataflowDeletePipelineOperator(
task_id="delete_pipeline",
pipeline_name=PIPELINE_NAME,
project_id=GCP_PROJECT_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
传感器¶
当作业被异步触发时,可以使用传感器来运行特定作业属性的检查。
wait_for_python_job_async_done = DataflowJobStatusSensor(
task_id="wait_for_python_job_async_done",
job_id="{{task_instance.xcom_pull('start_python_job_async')['dataflow_job_id']}}",
expected_statuses={DataflowJobStatus.JOB_STATE_DONE},
location=LOCATION,
)
此操作可以通过将 deferrable=True
作为参数传递来以可延迟模式运行。
wait_for_beam_python_pipeline_job_status_def = DataflowJobStatusSensor(
task_id="wait_for_beam_python_pipeline_job_status_def",
job_id="{{task_instance.xcom_pull('start_beam_python_pipeline')['dataflow_job_id']}}",
expected_statuses=DataflowJobStatus.JOB_STATE_DONE,
location=LOCATION,
deferrable=True,
)
def check_metric_scalar_gte(metric_name: str, value: int) -> Callable:
"""Check is metric greater than equals to given value."""
def callback(metrics: list[dict]) -> bool:
dag.log.info("Looking for '%s' >= %d", metric_name, value)
for metric in metrics:
context = metric.get("name", {}).get("context", {})
original_name = context.get("original_name", "")
tentative = context.get("tentative", "")
if original_name == "Service-cpu_num_seconds" and not tentative:
return metric["scalar"] >= value
raise AirflowException(f"Metric '{metric_name}' not found in metrics")
return callback
wait_for_python_job_async_metric = DataflowJobMetricsSensor(
task_id="wait_for_python_job_async_metric",
job_id="{{task_instance.xcom_pull('start_python_job_async')['dataflow_job_id']}}",
location=LOCATION,
callback=check_metric_scalar_gte(metric_name="Service-cpu_num_seconds", value=100),
fail_on_terminal_state=False,
)
此操作可以通过将 deferrable=True
作为参数传递来以可延迟模式运行。
def check_metric_scalar_gte(metric_name: str, value: int) -> Callable:
"""Check is metric greater than equals to given value."""
def callback(metrics: list[dict]) -> bool:
dag.log.info("Looking for '%s' >= %d", metric_name, value)
for metric in metrics:
context = metric.get("name", {}).get("context", {})
original_name = context.get("original_name", "")
tentative = context.get("tentative", "")
if original_name == "Service-cpu_num_seconds" and not tentative:
return metric["scalar"] >= value
raise AirflowException(f"Metric '{metric_name}' not found in metrics")
return callback
wait_for_beam_python_pipeline_job_metric_def = DataflowJobMetricsSensor(
task_id="wait_for_beam_python_pipeline_job_metric_def",
job_id="{{task_instance.xcom_pull('start_beam_python_pipeline')['dataflow_job_id']}}",
location=LOCATION,
callback=check_metric_scalar_gte(metric_name="Service-cpu_num_seconds", value=100),
fail_on_terminal_state=False,
deferrable=True,
)
def check_message(messages: list[dict]) -> bool:
"""Check message"""
for message in messages:
if "Adding workflow start and stop steps." in message.get("messageText", ""):
return True
return False
wait_for_python_job_async_message = DataflowJobMessagesSensor(
task_id="wait_for_python_job_async_message",
job_id="{{task_instance.xcom_pull('start_python_job_async')['dataflow_job_id']}}",
location=LOCATION,
callback=check_message,
fail_on_terminal_state=False,
)
此操作可以通过将 deferrable=True
作为参数传递来以可延迟模式运行。
def check_job_message(messages: list[dict]) -> bool:
"""Check job message."""
for message in messages:
if "Adding workflow start and stop steps." in message.get("messageText", ""):
return True
return False
wait_for_beam_python_pipeline_job_message_def = DataflowJobMessagesSensor(
task_id="wait_for_beam_python_pipeline_job_message_def",
job_id="{{task_instance.xcom_pull('start_beam_python_pipeline')['dataflow_job_id']}}",
location=LOCATION,
callback=check_job_message,
fail_on_terminal_state=False,
deferrable=True,
)
DataflowJobAutoScalingEventsSensor
.
def check_autoscaling_event(autoscaling_events: list[dict]) -> bool:
"""Check autoscaling event"""
for autoscaling_event in autoscaling_events:
if "Worker pool started." in autoscaling_event.get("description", {}).get("messageText", ""):
return True
return False
wait_for_python_job_async_autoscaling_event = DataflowJobAutoScalingEventsSensor(
task_id="wait_for_python_job_async_autoscaling_event",
job_id="{{task_instance.xcom_pull('start_python_job_async')['dataflow_job_id']}}",
location=LOCATION,
callback=check_autoscaling_event,
fail_on_terminal_state=False,
)
此操作可以通过将 deferrable=True
作为参数传递来以可延迟模式运行。
def check_autoscaling_event(autoscaling_events: list[dict]) -> bool:
"""Check autoscaling event."""
for autoscaling_event in autoscaling_events:
if "Worker pool started." in autoscaling_event.get("description", {}).get("messageText", ""):
return True
return False
wait_for_beam_python_pipeline_job_autoscaling_event_def = DataflowJobAutoScalingEventsSensor(
task_id="wait_for_beam_python_pipeline_job_autoscaling_event_def",
job_id="{{task_instance.xcom_pull('start_beam_python_pipeline')['dataflow_job_id']}}",
location=LOCATION,
callback=check_autoscaling_event,
fail_on_terminal_state=False,
deferrable=True,
)