Google Dataplex 算子

Dataplex 是一种智能数据架构,可在您的数据湖、数据仓库和数据市集中提供统一的分析和数据管理。

有关任务的更多信息,请访问 Dataplex 生产文档 <产品文档

创建任务

在创建 Dataplex 任务之前,您需要定义其主体。有关在创建任务时可传递的可用字段的更多信息,请访问 Dataplex 创建任务 API。

简单的任务配置可能如下所示

tests/system/providers/google/cloud/dataplex/example_dataplex.py[源代码]

EXAMPLE_TASK_BODY = {
    "trigger_spec": {"type_": TRIGGER_SPEC_TYPE},
    "execution_spec": {"service_account": SERVICE_ACC},
    "spark": {"python_script_file": SPARK_FILE_FULL_PATH},
}

使用此配置,我们可以同步和异步创建任务: DataplexCreateTaskOperator

tests/system/providers/google/cloud/dataplex/example_dataplex.py[源代码]

create_dataplex_task = DataplexCreateTaskOperator(
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    body=EXAMPLE_TASK_BODY,
    dataplex_task_id=DATAPLEX_TASK_ID,
    task_id="create_dataplex_task",
)

tests/system/providers/google/cloud/dataplex/example_dataplex.py[源代码]

create_dataplex_task_async = DataplexCreateTaskOperator(
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    body=EXAMPLE_TASK_BODY,
    dataplex_task_id=f"{DATAPLEX_TASK_ID}-1",
    asynchronous=True,
    task_id="create_dataplex_task_async",
)

删除任务

要删除任务,可以使用

DataplexDeleteTaskOperator

tests/system/providers/google/cloud/dataplex/example_dataplex.py[源代码]

delete_dataplex_task_async = DataplexDeleteTaskOperator(
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    dataplex_task_id=f"{DATAPLEX_TASK_ID}-1",
    task_id="delete_dataplex_task_async",
)

列出任务

要列出任务,可以使用

DataplexListTasksOperator

tests/system/providers/google/cloud/dataplex/example_dataplex.py[源代码]

list_dataplex_task = DataplexListTasksOperator(
    project_id=PROJECT_ID, region=REGION, lake_id=LAKE_ID, task_id="list_dataplex_task"
)

获取任务

要获取任务,可以使用

DataplexGetTaskOperator

tests/system/providers/google/cloud/dataplex/example_dataplex.py[源代码]

get_dataplex_task = DataplexGetTaskOperator(
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    dataplex_task_id=DATAPLEX_TASK_ID,
    task_id="get_dataplex_task",
)

等待任务

要等待异步创建的任务,可以使用

DataplexTaskStateSensor

tests/system/providers/google/cloud/dataplex/example_dataplex.py[源代码]

dataplex_task_state = DataplexTaskStateSensor(
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    dataplex_task_id=DATAPLEX_TASK_ID,
    task_id="dataplex_task_state",
)

创建湖

在创建 Dataplex 湖之前,需要定义其主体。

有关在创建湖时可传递的可用字段的更多信息,请访问 Dataplex 创建湖 API

简单的任务配置可能如下所示

tests/system/providers/google/cloud/dataplex/example_dataplex.py[源代码]

EXAMPLE_LAKE_BODY = {
    "display_name": "test_display_name",
    "labels": [],
    "description": "test_description",
    "metastore": {"service": ""},
}

使用此配置,我们可以创建湖

DataplexCreateLakeOperator

tests/system/providers/google/cloud/dataplex/example_dataplex.py[源代码]

create_lake = DataplexCreateLakeOperator(
    project_id=PROJECT_ID, region=REGION, body=EXAMPLE_LAKE_BODY, lake_id=LAKE_ID, task_id="create_lake"
)

删除湖

要删除湖,可以使用

DataplexDeleteLakeOperator

tests/system/providers/google/cloud/dataplex/example_dataplex.py[源代码]

delete_lake = DataplexDeleteLakeOperator(
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    task_id="delete_lake",
    trigger_rule=TriggerRule.ALL_DONE,
)

创建或更新数据质量扫描

在创建 Dataplex 数据质量扫描之前,需要定义其主体。有关在创建数据质量扫描时可传递的可用字段的更多信息,请访问 Dataplex 创建数据质量 API

简单的数据质量扫描配置如下所示

tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py[源代码]

EXAMPLE_DATA_SCAN = dataplex_v1.DataScan()
EXAMPLE_DATA_SCAN.data.entity = (
    f"projects/{PROJECT_ID}/locations/{REGION}/lakes/{LAKE_ID}/zones/{ZONE_ID}/entities/{TABLE_1}"
)
EXAMPLE_DATA_SCAN.data.resource = (
    f"//bigquery.googleapis.com/projects/{PROJECT_ID}/datasets/{DATASET}/tables/{TABLE_1}"
)
EXAMPLE_DATA_SCAN.data_quality_spec = DataQualitySpec(
    {
        "rules": [
            {
                "range_expectation": {
                    "min_value": "0",
                    "max_value": "10000",
                },
                "column": "value",
                "dimension": "VALIDITY",
            }
        ],
    }
)

使用此配置,我们可以创建或更新数据质量扫描

DataplexCreateOrUpdateDataQualityScanOperator

tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py[源代码]

create_data_scan = DataplexCreateOrUpdateDataQualityScanOperator(
    task_id="create_data_scan",
    project_id=PROJECT_ID,
    region=REGION,
    body=EXAMPLE_DATA_SCAN,
    data_scan_id=DATA_SCAN_ID,
)

获取数据质量扫描

要获取数据质量扫描,可以使用

DataplexGetDataQualityScanOperator

tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py[源代码]

get_data_scan = DataplexGetDataQualityScanOperator(
    task_id="get_data_scan",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
)

删除数据质量扫描

要删除数据质量扫描,可以使用

DataplexDeleteDataQualityScanOperator

tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py[源代码]

delete_data_scan = DataplexDeleteDataQualityScanOperator(
    task_id="delete_data_scan",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    trigger_rule=TriggerRule.ALL_DONE,
)

运行数据质量扫描

可以在异步模式下运行 Dataplex 数据质量扫描,以便稍后使用传感器检查其状态

DataplexRunDataQualityScanOperator

tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py[源代码]

run_data_scan_async = DataplexRunDataQualityScanOperator(
    task_id="run_data_scan_async",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    asynchronous=True,
)

要检查运行 Dataplex 数据质量扫描是否成功,可以使用

DataplexDataQualityJobStatusSensor.

tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py[源代码]

get_data_scan_job_status = DataplexDataQualityJobStatusSensor(
    task_id="get_data_scan_job_status",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    job_id="{{ task_instance.xcom_pull('run_data_scan_async') }}",
)

此外,对于此操作,可以在可延迟模式下使用运算符

tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py[源代码]

run_data_scan_def = DataplexRunDataQualityScanOperator(
    task_id="run_data_scan_def",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    deferrable=True,
)

获取数据质量扫描作业

要获取数据质量扫描作业,可以使用

DataplexGetDataQualityScanResultOperator

tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py[源代码]

get_data_scan_job_result_2 = DataplexGetDataQualityScanResultOperator(
    task_id="get_data_scan_job_result_2",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
)

此外,对于此操作,可以在可延迟模式下使用运算符

tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py[源代码]

get_data_scan_job_result_def = DataplexGetDataQualityScanResultOperator(
    task_id="get_data_scan_job_result_def",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    deferrable=True,
)

创建区域

在创建 Dataplex 区域之前,需要定义其主体。

有关在创建区域时可传递的可用字段的更多信息,请访问 Dataplex 创建区域 API

简单的区域配置如下所示

tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py[源代码]

EXAMPLE_ZONE = {
    "type_": "RAW",
    "resource_spec": {"location_type": "SINGLE_REGION"},
}

使用此配置,我们可以创建一个区域

DataplexCreateZoneOperator

tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py[源代码]

create_zone = DataplexCreateZoneOperator(
    task_id="create_zone",
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    body=EXAMPLE_ZONE,
    zone_id=ZONE_ID,
)

删除区域

要删除区域,可以使用

DataplexDeleteZoneOperator

tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py[源代码]

delete_zone = DataplexDeleteZoneOperator(
    task_id="delete_zone",
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    zone_id=ZONE_ID,
    trigger_rule=TriggerRule.ALL_DONE,
)

创建资产

在创建 Dataplex 资产之前,您需要定义其主体。

有关在创建资产时可传递的可用字段的更多信息,请访问 Dataplex 创建资产 API

简单的资产配置可以如下所示

tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py[源代码]

EXAMPLE_ASSET = {
    "resource_spec": {"name": f"projects/{PROJECT_ID}/datasets/{DATASET}", "type_": "BIGQUERY_DATASET"},
    "discovery_spec": {"enabled": True},
}

使用此配置,我们可以创建资产

DataplexCreateAssetOperator

tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py[源代码]

create_asset = DataplexCreateAssetOperator(
    task_id="create_asset",
    project_id=PROJECT_ID,
    region=REGION,
    body=EXAMPLE_ASSET,
    lake_id=LAKE_ID,
    zone_id=ZONE_ID,
    asset_id=ASSET_ID,
)

删除资产

要删除资产,可以使用

DataplexDeleteAssetOperator

tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py[源代码]

delete_asset = DataplexDeleteAssetOperator(
    task_id="delete_asset",
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    zone_id=ZONE_ID,
    asset_id=ASSET_ID,
    trigger_rule=TriggerRule.ALL_DONE,
)

创建或更新数据分析扫描

在创建 Dataplex 数据分析扫描之前,您需要定义其主体。有关在创建数据分析扫描时可传递的可用字段的更多信息,请访问 Dataplex 创建数据分析 API

简单的数据分析扫描配置可以如下所示

tests/system/providers/google/cloud/dataplex/example_dataplex_dp.py[source]

EXAMPLE_DATA_SCAN = dataplex_v1.DataScan()
EXAMPLE_DATA_SCAN.data.entity = (
    f"projects/{PROJECT_ID}/locations/{REGION}/lakes/{LAKE_ID}/zones/{ZONE_ID}/entities/{TABLE_1}"
)
EXAMPLE_DATA_SCAN.data.resource = (
    f"//bigquery.googleapis.com/projects/{PROJECT_ID}/datasets/{DATASET}/tables/{TABLE_1}"
)
EXAMPLE_DATA_SCAN.data_profile_spec = DataProfileSpec({})

使用此配置,我们可以创建或更新数据分析扫描

DataplexCreateOrUpdateDataProfileScanOperator

tests/system/providers/google/cloud/dataplex/example_dataplex_dp.py[source]

create_data_scan = DataplexCreateOrUpdateDataProfileScanOperator(
    task_id="create_data_scan",
    project_id=PROJECT_ID,
    region=REGION,
    body=EXAMPLE_DATA_SCAN,
    data_scan_id=DATA_SCAN_ID,
)

获取数据分析扫描

要获取数据分析扫描,可以使用

DataplexGetDataProfileScanOperator

tests/system/providers/google/cloud/dataplex/example_dataplex_dp.py[source]

get_data_scan = DataplexGetDataProfileScanOperator(
    task_id="get_data_scan",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
)

删除数据分析扫描

要删除数据分析扫描,可以使用

DataplexDeleteDataProfileScanOperator

tests/system/providers/google/cloud/dataplex/example_dataplex_dp.py[source]

delete_data_scan = DataplexDeleteDataProfileScanOperator(
    task_id="delete_data_scan",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    trigger_rule=TriggerRule.ALL_DONE,
)

运行数据分析扫描

您可以在异步模式下运行 Dataplex 数据分析扫描,以便稍后使用传感器检查其状态

DataplexRunDataProfileScanOperator

tests/system/providers/google/cloud/dataplex/example_dataplex_dp.py[source]

run_data_scan_async = DataplexRunDataProfileScanOperator(
    task_id="run_data_scan_async",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    asynchronous=True,
)

要检查运行 Dataplex 数据分析扫描是否成功,可以使用

DataplexDataProfileJobStatusSensor.

tests/system/providers/google/cloud/dataplex/example_dataplex_dp.py[source]

get_data_scan_job_status = DataplexDataProfileJobStatusSensor(
    task_id="get_data_scan_job_status",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    job_id="{{ task_instance.xcom_pull('run_data_scan_async') }}",
)

此外,对于此操作,可以在可延迟模式下使用运算符

tests/system/providers/google/cloud/dataplex/example_dataplex_dp.py[source]

run_data_scan_def = DataplexRunDataProfileScanOperator(
    task_id="run_data_scan_def",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    deferrable=True,
)

获取数据分析扫描作业

要获取数据分析扫描作业,可以使用

DataplexGetDataProfileScanResultOperator

tests/system/providers/google/cloud/dataplex/example_dataplex_dp.py[source]

get_data_scan_job_result_2 = DataplexGetDataProfileScanResultOperator(
    task_id="get_data_scan_job_result_2",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
)

此条目是否有用?