Google Dataprep 算子

Dataprep 是一项智能云数据服务,可用于以可视化方式浏览、清理和准备数据,以便进行分析和机器学习。该服务可用于浏览和转换不同且/或大型数据集中的原始数据,使其成为可用于进一步分析和处理的干净且结构化的数据。Dataprep 作业是一个内部对象,用于对运行 Cloud Dataprep 作业组的一部分所需的信息进行编码。有关该服务的详细信息,请访问Google Dataprep API 文档

开始之前

在 Airflow 中使用 Dataprep 之前,您需要使用令牌对您的帐户进行身份验证。要使用 Airflow 连接 Dataprep,您需要一个 Dataprep 令牌。请按照 Dataprep 说明进行操作。

令牌应以 JSON 格式添加到 Airflow 中的连接。您可以查看管理连接

DataprepRunJobGroupOperator 将运行指定作业。操作员需要一个配方 ID。要识别配方 ID,请使用 runJobGroup 的 API 文档,例如,如果 URL 是 /flows/10?recipe=7,则配方 ID 为 7。无法通过此操作员创建配方。它只能通过 此处 提供的 UI 创建。一些参数可以通过 DAG 的正文请求覆盖。如何在示例 dag 中显示。

请参阅以下示例:为这些字段设置值:.. code-block

Connection Id: "your_conn_id"
Extra: {"token": "TOKEN", "base_url": "https://api.clouddataprep.com"}

先决条件任务

要使用这些操作员,您必须执行以下操作

运行作业组

操作员任务是创建作业组,该作业组以经过身份验证的用户身份启动指定作业。这执行与在应用程序中单击“运行作业”按钮相同的操作。

要获取 Cloud Dataprep 作业中作业的信息,请使用: DataprepRunJobGroupOperator

示例用法

tests/system/providers/google/cloud/dataprep/example_dataprep.py[源代码]

run_job_group_task = DataprepRunJobGroupOperator(
    task_id="run_job_group",
    dataprep_conn_id=CONNECTION_ID,
    project_id=GCP_PROJECT_ID,
    body_request={
        "wrangledDataset": {"id": DATASET_WRANGLED_ID},
        "overrides": WRITE_SETTINGS,
    },
)

获取作业组的作业

操作员任务是获取 Cloud Dataprep 作业中批量作业的信息。

要获取 Cloud Dataprep 作业中作业的信息,请使用: DataprepGetJobsForJobGroupOperator

示例用法

tests/system/providers/google/cloud/dataprep/example_dataprep.py[源代码]

get_jobs_for_job_group_task = DataprepGetJobsForJobGroupOperator(
    task_id="get_jobs_for_job_group",
    dataprep_conn_id=CONNECTION_ID,
    job_group_id="{{ task_instance.xcom_pull('run_flow')['data'][0]['id'] }}",
)

获取作业组

操作员任务是获取指定的作业组。作业组是从流程中的特定节点执行的作业。

要获取 Cloud Dataprep 作业中作业的信息,请使用:DataprepGetJobGroupOperator

示例用法

tests/system/providers/google/cloud/dataprep/example_dataprep.py[源代码]

get_job_group_task = DataprepGetJobGroupOperator(
    task_id="get_job_group",
    dataprep_conn_id=CONNECTION_ID,
    project_id=GCP_PROJECT_ID,
    job_group_id="{{ task_instance.xcom_pull('run_flow')['data'][0]['id'] }}",
    embed="",
    include_deleted=False,
)

复制流程

操作员任务是复制流程。

要获取 Cloud Dataprep 作业中作业的信息,请使用:DataprepCopyFlowOperator

示例用法

tests/system/providers/google/cloud/dataprep/example_dataprep.py[源代码]

copy_task = DataprepCopyFlowOperator(
    task_id="copy_flow",
    dataprep_conn_id=CONNECTION_ID,
    project_id=GCP_PROJECT_ID,
    flow_id=FLOW_ID,
    name=f"copy_{DATASET_NAME}",
)

运行流程

操作员任务是运行流程。流程是用于整理逻辑的容器,其中包含导入的数据集、配方、输出对象和引用。

要获取 Cloud Dataprep 作业中作业的信息,请使用:DataprepRunFlowOperator

示例用法

tests/system/providers/google/cloud/dataprep/example_dataprep.py[源代码]

run_flow_task = DataprepRunFlowOperator(
    task_id="run_flow",
    dataprep_conn_id=CONNECTION_ID,
    project_id=GCP_PROJECT_ID,
    flow_id=FLOW_COPY_ID,
    body_request={},
)

删除流程

操作员任务是删除流程。流程是用于整理逻辑的容器,其中包含导入的数据集、配方、输出对象和引用。

要获取 Cloud Dataprep 作业中作业的信息,请使用:DataprepDeleteFlowOperator

示例用法

tests/system/providers/google/cloud/dataprep/example_dataprep.py[源代码]

delete_flow_task = DataprepDeleteFlowOperator(
    task_id="delete_flow",
    dataprep_conn_id=CONNECTION_ID,
    flow_id="{{ task_instance.xcom_pull('copy_flow')['id'] }}",
)

检查作业组是否完成

传感器任务是告诉系统启动的作业组是否完成,无论是否成功。作业组是从流程中的特定节点执行的作业。

要获取 Cloud Dataprep 作业中作业的信息,请使用:DataprepJobGroupIsFinishedSensor

示例用法

tests/system/providers/google/cloud/dataprep/example_dataprep.py[源代码]

check_flow_status_sensor = DataprepJobGroupIsFinishedSensor(
    task_id="check_flow_status",
    dataprep_conn_id=CONNECTION_ID,
    job_group_id="{{ task_instance.xcom_pull('run_flow')['data'][0]['id'] }}",
)

此条目有帮助吗?