Google Cloud Dataflow 运算符¶
Dataflow 是一项用于执行各种数据处理模式的托管服务。 这些管道是使用 Apache Beam 编程模型创建的,该模型允许批量和流式处理。
先决条件任务¶
要使用这些运算符,您必须执行以下几项操作
使用 Cloud Console 选择或创建 Cloud Platform 项目。
为您的项目启用结算,如 Google Cloud 文档中所述。
启用 API,如 Cloud Console 文档中所述。
通过 pip 安装 API 库。
pip install 'apache-airflow[google]'有关详细信息,请参阅 安装。
运行数据管道的方式¶
根据您的环境、源文件,有多种方法可以运行 Dataflow 管道
非模板化管道:如果您有 Java 的
*.jar
文件或 Python 的*.py
文件,开发人员可以在 Airflow 工作器上以本地进程的方式运行管道。 这也意味着必须在工作器上安装必要的系统依赖项。 对于 Java,工作器必须安装 JRE 运行时。 对于 Python,则为 Python 解释器。运行时版本必须与管道版本兼容。 这是启动管道的最快方法,但由于其系统依赖项的频繁问题,可能会导致问题。 有关更多详细信息,请参阅: Java SDK 管道, Python SDK 管道。 开发人员还可以通过 JSON 格式传递管道的结构,然后运行它来创建作业。 有关更多详细信息,请参阅: JSON 格式的管道 和 JSON 格式的管道。模板化管道:程序员可以通过准备一个模板来使管道独立于环境,然后该模板将在 Google 管理的机器上运行。 这样,对环境的更改就不会影响您的管道。 模板有两种类型
SQL 管道:开发人员可以将管道编写为 SQL 语句,然后在 Dataflow 中执行。 请参阅: Dataflow SQL
最好使用非模板化管道测试您的管道,然后使用模板在生产环境中运行管道。
有关管道类型之间差异的详细信息,请参阅 Google Cloud 文档中的 Dataflow 模板。
启动非模板化管道¶
JSON 格式的管道¶
可以通过 JSON 格式传递管道的结构来创建新的管道。 请参阅 DataflowCreatePipelineOperator
这将创建一个新的管道,该管道将在 Dataflow Pipelines UI 上可见。
以下是如何通过运行 DataflowCreatePipelineOperator 创建 Dataflow 管道的示例
create_pipeline = DataflowCreatePipelineOperator(
task_id="create_pipeline",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
body={
"name": f"projects/{GCP_PROJECT_ID}/locations/{GCP_LOCATION}/pipelines/{PIPELINE_NAME}",
"type": PIPELINE_TYPE,
"workload": {
"dataflowFlexTemplateRequest": {
"launchParameter": {
"containerSpecGcsPath": GCS_PATH,
"jobName": PIPELINE_JOB_NAME,
"environment": {"tempLocation": TEMP_LOCATION},
"parameters": {
"inputFile": INPUT_FILE,
"output": OUTPUT,
},
},
"projectId": GCP_PROJECT_ID,
"location": GCP_LOCATION,
}
},
},
)
要运行新创建的管道,您可以使用 DataflowRunPipelineOperator
run_pipeline = DataflowRunPipelineOperator(
task_id="run_pipeline",
pipeline_name=PIPELINE_NAME,
project_id=GCP_PROJECT_ID,
)
调用后,DataflowRunPipelineOperator 将返回通过运行给定管道创建的 Google Cloud Dataflow 作业。
有关 API 用法的更多信息,请参阅 Google Cloud 文档中的 Data Pipelines API REST 资源。
要使用源文件(Java 中的 JAR 或 Python 文件)创建新管道,请使用创建作业运算符。 源文件可以位于 GCS 上或本地文件系统上。 BeamRunJavaPipelineOperator
或 BeamRunPythonPipelineOperator
Java SDK 管道¶
对于 Java 管道,必须为 BeamRunJavaPipelineOperator
指定 jar
参数,因为它包含要在 Dataflow 上执行的管道。 JAR 可以位于 Airflow 可以下载的 GCS 上,也可以位于本地文件系统上(提供其绝对路径)。
以下是使用存储在 GCS 上的 jar 在 Java 中创建和运行管道的示例
start_java_job = BeamRunJavaPipelineOperator(
runner=BeamRunnerType.DataflowRunner,
task_id="start_java_job",
jar=GCS_JAR,
pipeline_options={
"output": GCS_OUTPUT,
},
job_class="org.apache.beam.examples.WordCount",
dataflow_config={
"job_name": f"java-pipeline-job-{ENV_ID}",
"check_if_running": CheckJobRunning.IgnoreJob,
"location": LOCATION,
"poll_sleep": 10,
"append_job_name": False,
},
)
以下是在可延迟模式下使用存储在 GCS 上的 jar 在 Java 中创建和运行管道的示例
start_java_deferrable = BeamRunJavaPipelineOperator(
runner=BeamRunnerType.DataflowRunner,
task_id="start_java_job_deferrable",
jar=GCS_JAR,
pipeline_options={
"output": GCS_OUTPUT,
},
job_class="org.apache.beam.examples.WordCount",
dataflow_config={
"job_name": f"deferrable-java-pipeline-job-{ENV_ID}",
"check_if_running": CheckJobRunning.WaitForRun,
"location": LOCATION,
"poll_sleep": 10,
"append_job_name": False,
},
deferrable=True,
)
以下是使用存储在本地文件系统上的 jar 在 Java 中创建和运行管道的示例
start_java_job_local = BeamRunJavaPipelineOperator(
task_id="start_java_job_local",
jar=LOCAL_JAR,
pipeline_options={
"output": GCS_OUTPUT,
},
job_class="org.apache.beam.examples.WordCount",
dataflow_config={
"check_if_running": CheckJobRunning.WaitForRun,
"location": LOCATION,
"poll_sleep": 10,
},
)
Python SDK 管道¶
必须为 BeamRunPythonPipelineOperator
指定 py_file
参数,因为它包含要在 Dataflow 上执行的管道。 Python 文件可以位于 Airflow 可以下载的 GCS 上,也可以位于本地文件系统上(提供其绝对路径)。
py_interpreter
参数指定执行管道时要使用的 Python 版本,默认为 python3
。如果您的 Airflow 实例在 Python 2 上运行,请指定 python2
并确保您的 py_file
在 Python 2 中。 为了获得最佳结果,请使用 Python 3。
如果指定了 py_requirements
参数,将创建一个带有指定要求的临时 Python 虚拟环境,并且将在其中运行管道。
py_system_site_packages
参数指定是否可以从您的 Airflow 实例访问虚拟环境中的所有 Python 包(如果指定了 py_requirements
参数),建议避免,除非 Dataflow 作业需要它。
start_python_job = BeamRunPythonPipelineOperator(
runner=BeamRunnerType.DataflowRunner,
task_id="start_python_job",
py_file=GCS_PYTHON_SCRIPT,
py_options=[],
pipeline_options={
"output": GCS_OUTPUT,
},
py_requirements=["apache-beam[gcp]==2.59.0"],
py_interpreter="python3",
py_system_site_packages=False,
dataflow_config={"location": LOCATION, "job_name": "start_python_job"},
)
执行模型¶
Dataflow 具有多种执行管道的选项。 它可以在以下模式下完成:批处理异步(触发后忘记)、批处理阻塞(等待完成)或流式处理(无限期运行)。 在 Airflow 中,最佳实践是使用异步批处理管道或流,并使用传感器来侦听预期的作业状态。
默认情况下,BeamRunJavaPipelineOperator
、BeamRunPythonPipelineOperator
、DataflowTemplatedJobStartOperator
和 DataflowStartFlexTemplateOperator
的参数 wait_until_finished
设置为 None
,这会导致不同的行为取决于管道的类型
对于流式管道,等待作业启动,
对于批处理管道,等待作业完成。
如果 wait_until_finished
设置为 True
,操作符将始终等待管道执行结束。如果设置为 False
,则仅提交作业。
参见:在 Cloud Dataflow 服务上执行的 PipelineOptions 配置
异步执行¶
Dataflow 批处理作业默认是异步的;但是,这取决于应用程序代码(包含在 JAR 或 Python 文件中)以及它的编写方式。为了使 Dataflow 作业异步执行,请确保不等待管道对象(不在应用程序代码中的 PipelineResult
上调用 waitUntilFinish
或 wait_until_finish
)。
start_python_job_async = BeamRunPythonPipelineOperator(
task_id="start_python_job_async",
runner=BeamRunnerType.DataflowRunner,
py_file=GCS_PYTHON_SCRIPT,
py_options=[],
pipeline_options={
"output": GCS_OUTPUT,
},
py_requirements=["apache-beam[gcp]==2.59.0"],
py_interpreter="python3",
py_system_site_packages=False,
dataflow_config={
"job_name": "start_python_job_async",
"location": LOCATION,
"wait_until_finished": False,
},
)
阻塞执行¶
为了使 Dataflow 作业执行并等待完成,请确保在应用程序代码中等待管道对象。对于 Java SDK,可以通过在从 pipeline.run()
返回的 PipelineResult
上调用 waitUntilFinish
来实现,对于 Python SDK,可以通过在从 pipeline.run()
返回的 PipelineResult
上调用 wait_until_finish
来实现。
应该避免阻塞作业,因为在 Airflow 上运行时会发生后台进程。此进程会持续运行以等待 Dataflow 作业完成,并且会增加 Airflow 在此过程中对资源的消耗。
流式执行¶
要执行流式 Dataflow 作业,请确保设置了流式选项(对于 Python),或者在您的管道中从无界数据源(例如 Pub/Sub)读取数据(对于 Java)。
start_streaming_python_job = BeamRunPythonPipelineOperator(
runner=BeamRunnerType.DataflowRunner,
task_id="start_streaming_python_job",
py_file=GCS_PYTHON_SCRIPT,
py_options=[],
pipeline_options={
"temp_location": GCS_TMP,
"input_topic": "projects/pubsub-public-data/topics/taxirides-realtime",
"output_topic": f"projects/{PROJECT_ID}/topics/{TOPIC_ID}",
"streaming": True,
},
py_requirements=["apache-beam[gcp]==2.59.0"],
py_interpreter="python3",
py_system_site_packages=False,
dataflow_config={"location": LOCATION, "job_name": "start_python_job_streaming"},
)
将参数 drain_pipeline
设置为 True
允许通过排空而不是在杀死任务实例期间取消来停止流式作业。
参见:停止正在运行的管道。
模板化作业¶
模板提供了在 Cloud Storage 上暂存管道并从那里运行它的能力。这在开发工作流程中提供了灵活性,因为它将管道的开发与暂存和执行步骤分离开来。Dataflow 有两种类型的模板:经典模板和灵活模板。有关更多信息,请参阅 Dataflow 模板的官方文档。
以下是使用经典模板和 DataflowTemplatedJobStartOperator
运行 Dataflow 作业的示例
start_template_job = DataflowTemplatedJobStartOperator(
task_id="start_template_job",
project_id=PROJECT_ID,
template="gs://dataflow-templates/latest/Word_Count",
parameters={"inputFile": f"gs://{BUCKET_NAME}/{CSV_FILE_NAME}", "output": GCS_OUTPUT},
location=LOCATION,
wait_until_finished=True,
)
对于此操作,您还可以使用可延迟模式的操作符
start_template_job_deferrable = DataflowTemplatedJobStartOperator(
task_id="start_template_job_deferrable",
project_id=PROJECT_ID,
template="gs://dataflow-templates/latest/Word_Count",
parameters={"inputFile": f"gs://{BUCKET_NAME}/{CSV_FILE_NAME}", "output": GCS_OUTPUT},
location=LOCATION,
deferrable=True,
)
请参阅 可与此操作符一起使用的 Google 提供的模板列表。
以下是使用灵活模板和 DataflowStartFlexTemplateOperator
运行 Dataflow 作业的示例
start_flex_template_job = DataflowStartFlexTemplateOperator(
task_id="start_flex_template_job",
project_id=PROJECT_ID,
body=BODY,
location=LOCATION,
append_job_name=False,
wait_until_finished=True,
)
对于此操作,您还可以使用可延迟模式的操作符
start_flex_template_job_deferrable = DataflowStartFlexTemplateOperator(
task_id="start_flex_template_job_deferrable",
project_id=PROJECT_ID,
body=BODY,
location=LOCATION,
append_job_name=False,
deferrable=True,
)
Dataflow SQL¶
Dataflow SQL 支持 ZetaSQL 查询语法的变体,并包括用于运行 Dataflow 流式作业的其他流式扩展。
以下是使用 DataflowStartSqlJobOperator
运行 Dataflow SQL 作业的示例
start_sql = DataflowStartSqlJobOperator(
task_id="start_sql_query",
job_name=DATAFLOW_SQL_JOB_NAME,
query=f"""
SELECT
emp_name as employee,
salary as employee_salary
FROM
bigquery.table.`{PROJECT_ID}`.`{BQ_SQL_DATASET}`.`{BQ_SQL_TABLE_INPUT}`
WHERE salary >= 1000;
""",
options={
"bigquery-project": PROJECT_ID,
"bigquery-dataset": BQ_SQL_DATASET,
"bigquery-table": BQ_SQL_TABLE_OUTPUT,
},
location=LOCATION,
do_xcom_push=True,
)
警告
此操作符需要 Airflow 工作节点上安装 gcloud
命令(Google Cloud SDK) <https://cloud.google.com/sdk/docs/install>`__
请参阅 Dataflow SQL 参考。
Dataflow YAML¶
Beam YAML 是一个无代码 SDK,用于通过使用 YAML 文件配置 Apache Beam 管道。您可以使用 Beam YAML 来编写和运行 Beam 管道,而无需编写任何代码。此 API 可用于定义流式和批处理管道。
以下是使用 DataflowStartYamlJobOperator
运行 Dataflow YAML 作业的示例
start_dataflow_yaml_job = DataflowStartYamlJobOperator(
task_id="start_dataflow_yaml_job",
job_name=DATAFLOW_YAML_JOB_NAME,
yaml_pipeline_file=DATAFLOW_YAML_PIPELINE_FILE_URL,
append_job_name=True,
deferrable=False,
region=REGION,
project_id=PROJECT_ID,
jinja_variables=BQ_VARIABLES,
)
可以通过传递 deferrable=True
作为参数,以可延迟模式运行此操作符。
start_dataflow_yaml_job_def = DataflowStartYamlJobOperator(
task_id="start_dataflow_yaml_job_def",
job_name=DATAFLOW_YAML_JOB_NAME,
yaml_pipeline_file=DATAFLOW_YAML_PIPELINE_FILE_URL,
append_job_name=True,
deferrable=True,
region=REGION,
project_id=PROJECT_ID,
jinja_variables=BQ_VARIABLES_DEF,
expected_terminal_state=DataflowJobStatus.JOB_STATE_DONE,
)
警告
此操作符需要 Airflow 工作节点上安装 gcloud
命令(Google Cloud SDK) <https://cloud.google.com/sdk/docs/install>`__
请参阅 Dataflow YAML 参考。
停止管道¶
要停止一个或多个 Dataflow 管道,可以使用 DataflowStopJobOperator
。默认情况下,会排空流式管道,将 drain_pipeline
设置为 False
将改为取消它们。提供 job_id
以停止特定作业,或提供 job_name_prefix
以停止所有具有指定名称前缀的作业。
stop_dataflow_job = DataflowStopJobOperator(
task_id="stop_dataflow_job",
location=LOCATION,
job_name_prefix="start-python-pipeline",
)
参见:停止正在运行的管道。
删除管道¶
要删除 Dataflow 管道,可以使用 DataflowDeletePipelineOperator
。以下是如何使用此操作符的示例
delete_pipeline = DataflowDeletePipelineOperator(
task_id="delete_pipeline",
pipeline_name=PIPELINE_NAME,
project_id=GCP_PROJECT_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
传感器¶
当异步触发作业时,可以使用传感器来运行特定作业属性的检查。
wait_for_python_job_async_done = DataflowJobStatusSensor(
task_id="wait_for_python_job_async_done",
job_id="{{task_instance.xcom_pull('start_python_job_async')['dataflow_job_id']}}",
expected_statuses={DataflowJobStatus.JOB_STATE_DONE},
location=LOCATION,
)
可以通过传递 deferrable=True
作为参数,以可延迟模式运行此操作符。
wait_for_beam_python_pipeline_job_status_def = DataflowJobStatusSensor(
task_id="wait_for_beam_python_pipeline_job_status_def",
job_id="{{task_instance.xcom_pull('start_beam_python_pipeline')['dataflow_job_id']}}",
expected_statuses=DataflowJobStatus.JOB_STATE_DONE,
location=LOCATION,
deferrable=True,
)
def check_metric_scalar_gte(metric_name: str, value: int) -> Callable:
"""Check is metric greater than equals to given value."""
def callback(metrics: list[dict]) -> bool:
dag.log.info("Looking for '%s' >= %d", metric_name, value)
for metric in metrics:
context = metric.get("name", {}).get("context", {})
original_name = context.get("original_name", "")
tentative = context.get("tentative", "")
if original_name == "Service-cpu_num_seconds" and not tentative:
return metric["scalar"] >= value
raise AirflowException(f"Metric '{metric_name}' not found in metrics")
return callback
wait_for_python_job_async_metric = DataflowJobMetricsSensor(
task_id="wait_for_python_job_async_metric",
job_id="{{task_instance.xcom_pull('start_python_job_async')['dataflow_job_id']}}",
location=LOCATION,
callback=check_metric_scalar_gte(metric_name="Service-cpu_num_seconds", value=100),
fail_on_terminal_state=False,
)
可以通过传递 deferrable=True
作为参数,以可延迟模式运行此操作符。
def check_metric_scalar_gte(metric_name: str, value: int) -> Callable:
"""Check is metric greater than equals to given value."""
def callback(metrics: list[dict]) -> bool:
dag.log.info("Looking for '%s' >= %d", metric_name, value)
for metric in metrics:
context = metric.get("name", {}).get("context", {})
original_name = context.get("original_name", "")
tentative = context.get("tentative", "")
if original_name == "Service-cpu_num_seconds" and not tentative:
return metric["scalar"] >= value
raise AirflowException(f"Metric '{metric_name}' not found in metrics")
return callback
wait_for_beam_python_pipeline_job_metric_def = DataflowJobMetricsSensor(
task_id="wait_for_beam_python_pipeline_job_metric_def",
job_id="{{task_instance.xcom_pull('start_beam_python_pipeline')['dataflow_job_id']}}",
location=LOCATION,
callback=check_metric_scalar_gte(metric_name="Service-cpu_num_seconds", value=100),
fail_on_terminal_state=False,
deferrable=True,
)
def check_message(messages: list[dict]) -> bool:
"""Check message"""
for message in messages:
if "Adding workflow start and stop steps." in message.get("messageText", ""):
return True
return False
wait_for_python_job_async_message = DataflowJobMessagesSensor(
task_id="wait_for_python_job_async_message",
job_id="{{task_instance.xcom_pull('start_python_job_async')['dataflow_job_id']}}",
location=LOCATION,
callback=check_message,
fail_on_terminal_state=False,
)
可以通过传递 deferrable=True
作为参数,以可延迟模式运行此操作符。
def check_job_message(messages: list[dict]) -> bool:
"""Check job message."""
for message in messages:
if "Adding workflow start and stop steps." in message.get("messageText", ""):
return True
return False
wait_for_beam_python_pipeline_job_message_def = DataflowJobMessagesSensor(
task_id="wait_for_beam_python_pipeline_job_message_def",
job_id="{{task_instance.xcom_pull('start_beam_python_pipeline')['dataflow_job_id']}}",
location=LOCATION,
callback=check_job_message,
fail_on_terminal_state=False,
deferrable=True,
)
DataflowJobAutoScalingEventsSensor
.
def check_autoscaling_event(autoscaling_events: list[dict]) -> bool:
"""Check autoscaling event"""
for autoscaling_event in autoscaling_events:
if "Worker pool started." in autoscaling_event.get("description", {}).get("messageText", ""):
return True
return False
wait_for_python_job_async_autoscaling_event = DataflowJobAutoScalingEventsSensor(
task_id="wait_for_python_job_async_autoscaling_event",
job_id="{{task_instance.xcom_pull('start_python_job_async')['dataflow_job_id']}}",
location=LOCATION,
callback=check_autoscaling_event,
fail_on_terminal_state=False,
)
可以通过传递 deferrable=True
作为参数,以可延迟模式运行此操作符。
def check_autoscaling_event(autoscaling_events: list[dict]) -> bool:
"""Check autoscaling event."""
for autoscaling_event in autoscaling_events:
if "Worker pool started." in autoscaling_event.get("description", {}).get("messageText", ""):
return True
return False
wait_for_beam_python_pipeline_job_autoscaling_event_def = DataflowJobAutoScalingEventsSensor(
task_id="wait_for_beam_python_pipeline_job_autoscaling_event_def",
job_id="{{task_instance.xcom_pull('start_beam_python_pipeline')['dataflow_job_id']}}",
location=LOCATION,
callback=check_autoscaling_event,
fail_on_terminal_state=False,
deferrable=True,
)