Google Dataplex 算子¶
Dataplex 是一种智能数据架构,可在您的数据湖、数据仓库和数据市集中提供统一的分析和数据管理。
有关任务的更多信息,请访问 Dataplex 生产文档 <产品文档
创建任务¶
在创建 Dataplex 任务之前,您需要定义其主体。有关在创建任务时可传递的可用字段的更多信息,请访问 Dataplex 创建任务 API。
简单的任务配置可能如下所示
EXAMPLE_TASK_BODY = {
"trigger_spec": {"type_": TRIGGER_SPEC_TYPE},
"execution_spec": {"service_account": SERVICE_ACC},
"spark": {"python_script_file": SPARK_FILE_FULL_PATH},
}
使用此配置,我们可以同步和异步创建任务: DataplexCreateTaskOperator
create_dataplex_task = DataplexCreateTaskOperator(
project_id=PROJECT_ID,
region=REGION,
lake_id=LAKE_ID,
body=EXAMPLE_TASK_BODY,
dataplex_task_id=DATAPLEX_TASK_ID,
task_id="create_dataplex_task",
)
create_dataplex_task_async = DataplexCreateTaskOperator(
project_id=PROJECT_ID,
region=REGION,
lake_id=LAKE_ID,
body=EXAMPLE_TASK_BODY,
dataplex_task_id=f"{DATAPLEX_TASK_ID}-1",
asynchronous=True,
task_id="create_dataplex_task_async",
)
删除任务¶
要删除任务,可以使用
delete_dataplex_task_async = DataplexDeleteTaskOperator(
project_id=PROJECT_ID,
region=REGION,
lake_id=LAKE_ID,
dataplex_task_id=f"{DATAPLEX_TASK_ID}-1",
task_id="delete_dataplex_task_async",
)
列出任务¶
要列出任务,可以使用
list_dataplex_task = DataplexListTasksOperator(
project_id=PROJECT_ID, region=REGION, lake_id=LAKE_ID, task_id="list_dataplex_task"
)
获取任务¶
要获取任务,可以使用
get_dataplex_task = DataplexGetTaskOperator(
project_id=PROJECT_ID,
region=REGION,
lake_id=LAKE_ID,
dataplex_task_id=DATAPLEX_TASK_ID,
task_id="get_dataplex_task",
)
等待任务¶
要等待异步创建的任务,可以使用
dataplex_task_state = DataplexTaskStateSensor(
project_id=PROJECT_ID,
region=REGION,
lake_id=LAKE_ID,
dataplex_task_id=DATAPLEX_TASK_ID,
task_id="dataplex_task_state",
)
创建湖¶
在创建 Dataplex 湖之前,需要定义其主体。
有关在创建湖时可传递的可用字段的更多信息,请访问 Dataplex 创建湖 API。
简单的任务配置可能如下所示
EXAMPLE_LAKE_BODY = {
"display_name": "test_display_name",
"labels": [],
"description": "test_description",
"metastore": {"service": ""},
}
使用此配置,我们可以创建湖
create_lake = DataplexCreateLakeOperator(
project_id=PROJECT_ID, region=REGION, body=EXAMPLE_LAKE_BODY, lake_id=LAKE_ID, task_id="create_lake"
)
删除湖¶
要删除湖,可以使用
delete_lake = DataplexDeleteLakeOperator(
project_id=PROJECT_ID,
region=REGION,
lake_id=LAKE_ID,
task_id="delete_lake",
trigger_rule=TriggerRule.ALL_DONE,
)
创建或更新数据质量扫描¶
在创建 Dataplex 数据质量扫描之前,需要定义其主体。有关在创建数据质量扫描时可传递的可用字段的更多信息,请访问 Dataplex 创建数据质量 API。
简单的数据质量扫描配置如下所示
EXAMPLE_DATA_SCAN = dataplex_v1.DataScan()
EXAMPLE_DATA_SCAN.data.entity = (
f"projects/{PROJECT_ID}/locations/{REGION}/lakes/{LAKE_ID}/zones/{ZONE_ID}/entities/{TABLE_1}"
)
EXAMPLE_DATA_SCAN.data.resource = (
f"//bigquery.googleapis.com/projects/{PROJECT_ID}/datasets/{DATASET}/tables/{TABLE_1}"
)
EXAMPLE_DATA_SCAN.data_quality_spec = DataQualitySpec(
{
"rules": [
{
"range_expectation": {
"min_value": "0",
"max_value": "10000",
},
"column": "value",
"dimension": "VALIDITY",
}
],
}
)
使用此配置,我们可以创建或更新数据质量扫描
DataplexCreateOrUpdateDataQualityScanOperator
create_data_scan = DataplexCreateOrUpdateDataQualityScanOperator(
task_id="create_data_scan",
project_id=PROJECT_ID,
region=REGION,
body=EXAMPLE_DATA_SCAN,
data_scan_id=DATA_SCAN_ID,
)
获取数据质量扫描¶
要获取数据质量扫描,可以使用
DataplexGetDataQualityScanOperator
get_data_scan = DataplexGetDataQualityScanOperator(
task_id="get_data_scan",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
)
删除数据质量扫描¶
要删除数据质量扫描,可以使用
DataplexDeleteDataQualityScanOperator
delete_data_scan = DataplexDeleteDataQualityScanOperator(
task_id="delete_data_scan",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
运行数据质量扫描¶
可以在异步模式下运行 Dataplex 数据质量扫描,以便稍后使用传感器检查其状态
DataplexRunDataQualityScanOperator
run_data_scan_async = DataplexRunDataQualityScanOperator(
task_id="run_data_scan_async",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
asynchronous=True,
)
要检查运行 Dataplex 数据质量扫描是否成功,可以使用
DataplexDataQualityJobStatusSensor
.
get_data_scan_job_status = DataplexDataQualityJobStatusSensor(
task_id="get_data_scan_job_status",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
job_id="{{ task_instance.xcom_pull('run_data_scan_async') }}",
)
此外,对于此操作,可以在可延迟模式下使用运算符
run_data_scan_def = DataplexRunDataQualityScanOperator(
task_id="run_data_scan_def",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
deferrable=True,
)
获取数据质量扫描作业¶
要获取数据质量扫描作业,可以使用
DataplexGetDataQualityScanResultOperator
get_data_scan_job_result_2 = DataplexGetDataQualityScanResultOperator(
task_id="get_data_scan_job_result_2",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
)
此外,对于此操作,可以在可延迟模式下使用运算符
get_data_scan_job_result_def = DataplexGetDataQualityScanResultOperator(
task_id="get_data_scan_job_result_def",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
deferrable=True,
)
创建区域¶
在创建 Dataplex 区域之前,需要定义其主体。
有关在创建区域时可传递的可用字段的更多信息,请访问 Dataplex 创建区域 API。
简单的区域配置如下所示
EXAMPLE_ZONE = {
"type_": "RAW",
"resource_spec": {"location_type": "SINGLE_REGION"},
}
使用此配置,我们可以创建一个区域
create_zone = DataplexCreateZoneOperator(
task_id="create_zone",
project_id=PROJECT_ID,
region=REGION,
lake_id=LAKE_ID,
body=EXAMPLE_ZONE,
zone_id=ZONE_ID,
)
删除区域¶
要删除区域,可以使用
delete_zone = DataplexDeleteZoneOperator(
task_id="delete_zone",
project_id=PROJECT_ID,
region=REGION,
lake_id=LAKE_ID,
zone_id=ZONE_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
创建资产¶
在创建 Dataplex 资产之前,您需要定义其主体。
有关在创建资产时可传递的可用字段的更多信息,请访问 Dataplex 创建资产 API。
简单的资产配置可以如下所示
EXAMPLE_ASSET = {
"resource_spec": {"name": f"projects/{PROJECT_ID}/datasets/{DATASET}", "type_": "BIGQUERY_DATASET"},
"discovery_spec": {"enabled": True},
}
使用此配置,我们可以创建资产
create_asset = DataplexCreateAssetOperator(
task_id="create_asset",
project_id=PROJECT_ID,
region=REGION,
body=EXAMPLE_ASSET,
lake_id=LAKE_ID,
zone_id=ZONE_ID,
asset_id=ASSET_ID,
)
删除资产¶
要删除资产,可以使用
delete_asset = DataplexDeleteAssetOperator(
task_id="delete_asset",
project_id=PROJECT_ID,
region=REGION,
lake_id=LAKE_ID,
zone_id=ZONE_ID,
asset_id=ASSET_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
创建或更新数据分析扫描¶
在创建 Dataplex 数据分析扫描之前,您需要定义其主体。有关在创建数据分析扫描时可传递的可用字段的更多信息,请访问 Dataplex 创建数据分析 API。
简单的数据分析扫描配置可以如下所示
EXAMPLE_DATA_SCAN = dataplex_v1.DataScan()
EXAMPLE_DATA_SCAN.data.entity = (
f"projects/{PROJECT_ID}/locations/{REGION}/lakes/{LAKE_ID}/zones/{ZONE_ID}/entities/{TABLE_1}"
)
EXAMPLE_DATA_SCAN.data.resource = (
f"//bigquery.googleapis.com/projects/{PROJECT_ID}/datasets/{DATASET}/tables/{TABLE_1}"
)
EXAMPLE_DATA_SCAN.data_profile_spec = DataProfileSpec({})
使用此配置,我们可以创建或更新数据分析扫描
DataplexCreateOrUpdateDataProfileScanOperator
create_data_scan = DataplexCreateOrUpdateDataProfileScanOperator(
task_id="create_data_scan",
project_id=PROJECT_ID,
region=REGION,
body=EXAMPLE_DATA_SCAN,
data_scan_id=DATA_SCAN_ID,
)
获取数据分析扫描¶
要获取数据分析扫描,可以使用
DataplexGetDataProfileScanOperator
get_data_scan = DataplexGetDataProfileScanOperator(
task_id="get_data_scan",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
)
删除数据分析扫描¶
要删除数据分析扫描,可以使用
DataplexDeleteDataProfileScanOperator
delete_data_scan = DataplexDeleteDataProfileScanOperator(
task_id="delete_data_scan",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
运行数据分析扫描¶
您可以在异步模式下运行 Dataplex 数据分析扫描,以便稍后使用传感器检查其状态
DataplexRunDataProfileScanOperator
run_data_scan_async = DataplexRunDataProfileScanOperator(
task_id="run_data_scan_async",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
asynchronous=True,
)
要检查运行 Dataplex 数据分析扫描是否成功,可以使用
DataplexDataProfileJobStatusSensor
.
get_data_scan_job_status = DataplexDataProfileJobStatusSensor(
task_id="get_data_scan_job_status",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
job_id="{{ task_instance.xcom_pull('run_data_scan_async') }}",
)
此外,对于此操作,可以在可延迟模式下使用运算符
run_data_scan_def = DataplexRunDataProfileScanOperator(
task_id="run_data_scan_def",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
deferrable=True,
)
获取数据分析扫描作业¶
要获取数据分析扫描作业,可以使用
DataplexGetDataProfileScanResultOperator
get_data_scan_job_result_2 = DataplexGetDataProfileScanResultOperator(
task_id="get_data_scan_job_result_2",
project_id=PROJECT_ID,
region=REGION,
data_scan_id=DATA_SCAN_ID,
)