Google Cloud Data Pipelines 算子

Data Pipelines 是 Dataflow 的一项功能,它允许客户创建和计划经常性作业、查看聚合作业指标以及定义和管理作业 SLO。管道由作业集合组成,包括管理它们的方法。管道可能与 Dataflow 模板(经典/灵活)相关联,并包括使用关联模板启动的所有作业。

先决条件任务

要使用这些算子,您必须执行以下操作

创建数据管道

此操作符已弃用。请使用 DataflowCreatePipelineOperator

要使用请求主体和父名称创建新的数据管道实例,请使用 CreateDataPipelineOperator。该操作符访问 Google Cloud 的数据管道 API,并调用 create 方法 来运行给定的管道。

CreateDataPipelineOperator 接受四个参数

body:管道实例,project_id:拥有作业的 GCP 项目 ID,location:管道的目标位置,gcp_conn_id:连接到 Google Cloud 的 ID。

每次都需要传递请求主体和项目 ID,而 GCP 连接 ID 和位置具有默认值。项目 ID 和位置将用于构建创建操作符所需的父名称。

以下是如何使用 CreateDataPipelineOperator 通过运行上述参数来创建数据管道实例的示例

tests/system/providers/google/cloud/datapipelines/example_datapipeline.py[源代码]

create_data_pipeline = CreateDataPipelineOperator(
    task_id="create_data_pipeline",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    body={
        "name": f"projects/{GCP_PROJECT_ID}/locations/{GCP_LOCATION}/pipelines/{PIPELINE_NAME}",
        "type": PIPELINE_TYPE,
        "workload": {
            "dataflowFlexTemplateRequest": {
                "launchParameter": {
                    "containerSpecGcsPath": GCS_PATH,
                    "jobName": PIPELINE_JOB_NAME,
                    "environment": {"tempLocation": TEMP_LOCATION},
                    "parameters": {
                        "inputFile": INPUT_FILE,
                        "output": OUTPUT,
                    },
                },
                "projectId": GCP_PROJECT_ID,
                "location": GCP_LOCATION,
            }
        },
    },
)

运行数据管道

此操作符已弃用。请使用 DataflowRunPipelineOperator

要运行数据管道实例,请使用 RunDataPipelineOperator。该运算符访问 Google Cloud 的数据管道 API,并调用 run 方法 来运行给定的管道。

RunDataPipelineOperator 可以采用四个参数

  • data_pipeline_name:数据管道实例的名称

  • project_id:拥有该作业的 GCP 项目的 ID

  • location:数据管道实例的位置

  • gcp_conn_id:连接到 Google Cloud Platform 的连接 ID

只有数据管道名称和项目 ID 是必需的参数,因为位置和 GCP 连接 ID 具有默认值。项目 ID 和位置将用于构建父名称,这是给定数据管道应位于的位置。

您可以通过使用 RunDataPipelineOperator 运行上述参数来运行数据管道实例

tests/system/providers/google/cloud/datapipelines/example_datapipeline.py[源代码]

run_data_pipeline = RunDataPipelineOperator(
    task_id="run_data_pipeline",
    data_pipeline_name=PIPELINE_NAME,
    project_id=GCP_PROJECT_ID,
)

调用后,RunDataPipelineOperator 将返回 Google Cloud Dataflow 作业,该作业是通过运行给定的管道创建的。

有关 API 用法的更多信息,请参阅 Google Cloud 文档中的 数据管道 API REST 资源

此条目是否有用?