Google Cloud Dataproc Metastore 操作符

Dataproc Metastore 是一项完全托管、高可用、自动修复的无服务器 Apache Hive 元存储 (HMS),运行在 Google Cloud 上。它支持 HMS,是管理关系实体元数据的关键组件,并为开源数据生态系统中的数据处理应用程序提供互操作性。

有关该服务的更多信息,请访问 Dataproc Metastore 产品文档 <产品文档

创建服务

在创建 dataproc 元存储服务之前,您需要定义该服务。有关创建服务时要传递的可用字段的更多信息,请访问 Dataproc Metastore 创建服务 API。

一个简单的服务配置如下所示

tests/system/google/cloud/dataproc_metastore/example_dataproc_metastore.py[源代码]

SERVICE = {
    "name": "test-service",
}

使用此配置,我们可以创建服务:DataprocMetastoreCreateServiceOperator

tests/system/google/cloud/dataproc_metastore/example_dataproc_metastore.py[源代码]

create_service = DataprocMetastoreCreateServiceOperator(
    task_id="create_service",
    region=REGION,
    project_id=PROJECT_ID,
    service=SERVICE,
    service_id=SERVICE_ID,
    timeout=TIMEOUT,
)

获取服务

要获取服务,您可以使用

DataprocMetastoreGetServiceOperator

tests/system/google/cloud/dataproc_metastore/example_dataproc_metastore.py[源代码]

get_service = DataprocMetastoreGetServiceOperator(
    task_id="get_service",
    region=REGION,
    project_id=PROJECT_ID,
    service_id=SERVICE_ID,
)

更新服务

您可以通过提供服务配置和 updateMask 来更新服务。在 updateMask 参数中,您指定要更新的字段的路径,该路径相对于 Service。有关 updateMask 和其他参数的更多信息,请查看 Dataproc Metastore 更新服务 API。

一个新的服务配置和 updateMask 的示例

tests/system/google/cloud/dataproc_metastore/example_dataproc_metastore.py[源代码]

SERVICE_TO_UPDATE = {
    "labels": {
        "mylocalmachine": "mylocalmachine",
        "systemtest": "systemtest",
    }
}
UPDATE_MASK = FieldMask(paths=["labels"])

要更新服务,您可以使用:DataprocMetastoreUpdateServiceOperator

tests/system/google/cloud/dataproc_metastore/example_dataproc_metastore.py[源代码]

update_service = DataprocMetastoreUpdateServiceOperator(
    task_id="update_service",
    project_id=PROJECT_ID,
    service_id=SERVICE_ID,
    region=REGION,
    service=SERVICE_TO_UPDATE,
    update_mask=UPDATE_MASK,
    timeout=TIMEOUT,
)

删除服务

要删除服务,您可以使用

DataprocMetastoreDeleteServiceOperator

tests/system/google/cloud/dataproc_metastore/example_dataproc_metastore.py[源代码]

delete_service = DataprocMetastoreDeleteServiceOperator(
    task_id="delete_service",
    region=REGION,
    project_id=PROJECT_ID,
    service_id=SERVICE_ID,
    timeout=TIMEOUT,
)

导出服务元数据

要导出元数据,您可以使用

DataprocMetastoreExportMetadataOperator

tests/system/google/cloud/dataproc_metastore/example_dataproc_metastore.py[源代码]

export_metadata = DataprocMetastoreExportMetadataOperator(
    task_id="export_metadata",
    destination_gcs_folder=DESTINATION_GCS_FOLDER,
    project_id=PROJECT_ID,
    region=REGION,
    service_id=SERVICE_ID,
    timeout=TIMEOUT,
)

恢复服务

要恢复服务,您可以使用

DataprocMetastoreRestoreServiceOperator

tests/system/google/cloud/dataproc_metastore/example_dataproc_metastore_backup.py[源代码]

restore_service = DataprocMetastoreRestoreServiceOperator(
    task_id="restore_metastore",
    region=REGION,
    project_id=PROJECT_ID,
    service_id=SERVICE_ID,
    backup_id=BACKUP_ID,
    backup_region=REGION,
    backup_project_id=PROJECT_ID,
    backup_service_id=SERVICE_ID,
    timeout=TIMEOUT,
)

创建元数据导入

在创建 dataproc 元存储元数据导入之前,您需要定义该元数据导入。有关创建元数据导入时要传递的可用字段的更多信息,请访问 Dataproc Metastore 创建元数据导入 API。

一个简单的元数据导入配置如下所示

tests/system/google/cloud/dataproc_metastore/example_dataproc_metastore.py[源代码]

METADATA_IMPORT = {
    "name": "test-metadata-import",
    "database_dump": {
        "gcs_uri": GCS_URI,
        "database_type": DB_TYPE,
    },
}

要创建元数据导入,您可以使用:DataprocMetastoreCreateMetadataImportOperator

tests/system/google/cloud/dataproc_metastore/example_dataproc_metastore.py[源代码]

import_metadata = DataprocMetastoreCreateMetadataImportOperator(
    task_id="import_metadata",
    project_id=PROJECT_ID,
    region=REGION,
    service_id=SERVICE_ID,
    metadata_import=METADATA_IMPORT,
    metadata_import_id=METADATA_IMPORT_ID,
    timeout=TIMEOUT,
)

创建备份

在为服务创建 dataproc 元存储备份之前,您需要定义该备份。有关创建备份时要传递的可用字段的更多信息,请访问 Dataproc Metastore 创建备份 API。

一个简单的备份配置如下所示

tests/system/google/cloud/dataproc_metastore/example_dataproc_metastore_backup.py[源代码]

BACKUP = {
    "name": "test-backup",
}

使用此配置,我们可以创建备份:DataprocMetastoreCreateBackupOperator

tests/system/google/cloud/dataproc_metastore/example_dataproc_metastore_backup.py[源代码]

backup_service = DataprocMetastoreCreateBackupOperator(
    task_id="create_backup",
    project_id=PROJECT_ID,
    region=REGION,
    service_id=SERVICE_ID,
    backup=BACKUP,
    backup_id=BACKUP_ID,
    timeout=TIMEOUT,
)

删除备份

要删除备份,您可以使用

DataprocMetastoreDeleteBackupOperator

tests/system/google/cloud/dataproc_metastore/example_dataproc_metastore_backup.py[源代码]

delete_backup = DataprocMetastoreDeleteBackupOperator(
    task_id="delete_backup",
    project_id=PROJECT_ID,
    region=REGION,
    service_id=SERVICE_ID,
    backup_id=BACKUP_ID,
    timeout=TIMEOUT,
)

列出备份

要列出备份,您可以使用

DataprocMetastoreListBackupsOperator

tests/system/google/cloud/dataproc_metastore/example_dataproc_metastore_backup.py[源代码]

list_backups = DataprocMetastoreListBackupsOperator(
    task_id="list_backups",
    project_id=PROJECT_ID,
    region=REGION,
    service_id=SERVICE_ID,
)

检查 Hive 分区是否存在

要检查 Hive 分区是否已在 Metastore 中为给定表创建,您可以使用:MetastoreHivePartitionSensor

tests/system/google/cloud/dataproc_metastore/example_dataproc_metastore_hive_partition_sensor.py[源代码]

hive_partition_sensor = MetastoreHivePartitionSensor(
    task_id="hive_partition_sensor",
    service_id=METASTORE_SERVICE_ID,
    region=REGION,
    table=TABLE_NAME,
    partitions=[PARTITION_1, PARTITION_2],
)

此条目是否有帮助?