Google Cloud Data Pipelines 算子¶
Data Pipelines 是 Dataflow 的一项功能,它允许客户创建和计划经常性作业、查看聚合作业指标以及定义和管理作业 SLO。管道由作业集合组成,包括管理它们的方法。管道可能与 Dataflow 模板(经典/灵活)相关联,并包括使用关联模板启动的所有作业。
先决条件任务¶
要使用这些算子,您必须执行以下操作
使用Cloud 控制台选择或创建 Cloud Platform 项目。
为您的项目启用结算,如Google Cloud 文档中所述。
启用 API,如Cloud 控制台文档中所述。
通过pip安装 API 库。
pip install 'apache-airflow[google]'详细的信息可从 安装 中获取。
创建数据管道¶
此操作符已弃用。请使用 DataflowCreatePipelineOperator
。
要使用请求主体和父名称创建新的数据管道实例,请使用 CreateDataPipelineOperator
。该操作符访问 Google Cloud 的数据管道 API,并调用 create 方法 来运行给定的管道。
CreateDataPipelineOperator
接受四个参数body:管道实例,project_id:拥有作业的 GCP 项目 ID,location:管道的目标位置,gcp_conn_id:连接到 Google Cloud 的 ID。
每次都需要传递请求主体和项目 ID,而 GCP 连接 ID 和位置具有默认值。项目 ID 和位置将用于构建创建操作符所需的父名称。
以下是如何使用 CreateDataPipelineOperator 通过运行上述参数来创建数据管道实例的示例
create_data_pipeline = CreateDataPipelineOperator(
task_id="create_data_pipeline",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
body={
"name": f"projects/{GCP_PROJECT_ID}/locations/{GCP_LOCATION}/pipelines/{PIPELINE_NAME}",
"type": PIPELINE_TYPE,
"workload": {
"dataflowFlexTemplateRequest": {
"launchParameter": {
"containerSpecGcsPath": GCS_PATH,
"jobName": PIPELINE_JOB_NAME,
"environment": {"tempLocation": TEMP_LOCATION},
"parameters": {
"inputFile": INPUT_FILE,
"output": OUTPUT,
},
},
"projectId": GCP_PROJECT_ID,
"location": GCP_LOCATION,
}
},
},
)
运行数据管道¶
此操作符已弃用。请使用 DataflowRunPipelineOperator
。
要运行数据管道实例,请使用 RunDataPipelineOperator
。该运算符访问 Google Cloud 的数据管道 API,并调用 run 方法 来运行给定的管道。
RunDataPipelineOperator
可以采用四个参数
data_pipeline_name
:数据管道实例的名称project_id
:拥有该作业的 GCP 项目的 IDlocation
:数据管道实例的位置gcp_conn_id
:连接到 Google Cloud Platform 的连接 ID
只有数据管道名称和项目 ID 是必需的参数,因为位置和 GCP 连接 ID 具有默认值。项目 ID 和位置将用于构建父名称,这是给定数据管道应位于的位置。
您可以通过使用 RunDataPipelineOperator 运行上述参数来运行数据管道实例
run_data_pipeline = RunDataPipelineOperator(
task_id="run_data_pipeline",
data_pipeline_name=PIPELINE_NAME,
project_id=GCP_PROJECT_ID,
)
调用后,RunDataPipelineOperator 将返回 Google Cloud Dataflow 作业,该作业是通过运行给定的管道创建的。
有关 API 用法的更多信息,请参阅 Google Cloud 文档中的 数据管道 API REST 资源。