Google Cloud Dataproc Metastore 操作符

Dataproc Metastore 是一个完全托管的、高可用性的、自动修复的无服务器 Apache Hive 元数据存储 (HMS),可在 Google Cloud 上运行。它支持 HMS,作为管理关系实体元数据的重要组件,并在开源数据生态系统中的数据处理应用程序之间提供互操作性。

有关该服务的更多信息,请访问 Dataproc Metastore 产品文档 <产品文档

创建服务

在创建 Dataproc Metastore 服务之前,您需要定义该服务。有关在创建服务时可传递的可用字段的更多信息,请访问 Dataproc Metastore 创建服务 API。

一个简单的服务配置可能如下所示

tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore.py[源代码]

SERVICE = {
    "name": "test-service",
}

使用此配置,我们可以创建服务: DataprocMetastoreCreateServiceOperator

tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore.py[源代码]

create_service = DataprocMetastoreCreateServiceOperator(
    task_id="create_service",
    region=REGION,
    project_id=PROJECT_ID,
    service=SERVICE,
    service_id=SERVICE_ID,
    timeout=TIMEOUT,
)

获取服务

要获取服务,可以使用

DataprocMetastoreGetServiceOperator

tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore.py[源代码]

get_service = DataprocMetastoreGetServiceOperator(
    task_id="get_service",
    region=REGION,
    project_id=PROJECT_ID,
    service_id=SERVICE_ID,
)

更新服务

可以通过提供服务配置和 updateMask 来更新服务。在 updateMask 参数中,指定相对于 Service 的要更新的字段的路径。有关 updateMask 和其他参数的更多信息,请参阅 Dataproc Metastore 更新服务 API。

新服务配置和 updateMask 的示例

tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore.py[源代码]

SERVICE_TO_UPDATE = {
    "labels": {
        "mylocalmachine": "mylocalmachine",
        "systemtest": "systemtest",
    }
}
UPDATE_MASK = FieldMask(paths=["labels"])

要更新服务,可以使用: DataprocMetastoreUpdateServiceOperator

tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore.py[源代码]

update_service = DataprocMetastoreUpdateServiceOperator(
    task_id="update_service",
    project_id=PROJECT_ID,
    service_id=SERVICE_ID,
    region=REGION,
    service=SERVICE_TO_UPDATE,
    update_mask=UPDATE_MASK,
    timeout=TIMEOUT,
)

删除服务

要删除服务,可以使用

DataprocMetastoreDeleteServiceOperator

tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore.py[源代码]

delete_service = DataprocMetastoreDeleteServiceOperator(
    task_id="delete_service",
    region=REGION,
    project_id=PROJECT_ID,
    service_id=SERVICE_ID,
    timeout=TIMEOUT,
)

导出服务元数据

要导出元数据,可以使用

DataprocMetastoreExportMetadataOperator

tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore.py[源代码]

export_metadata = DataprocMetastoreExportMetadataOperator(
    task_id="export_metadata",
    destination_gcs_folder=DESTINATION_GCS_FOLDER,
    project_id=PROJECT_ID,
    region=REGION,
    service_id=SERVICE_ID,
    timeout=TIMEOUT,
)

还原服务

要还原服务,可以使用

DataprocMetastoreRestoreServiceOperator

tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore_backup.py[源代码]

restore_service = DataprocMetastoreRestoreServiceOperator(
    task_id="restore_metastore",
    region=REGION,
    project_id=PROJECT_ID,
    service_id=SERVICE_ID,
    backup_id=BACKUP_ID,
    backup_region=REGION,
    backup_project_id=PROJECT_ID,
    backup_service_id=SERVICE_ID,
    timeout=TIMEOUT,
)

创建元数据导入

在创建 Dataproc Metastore 元数据导入之前,需要定义元数据导入。有关在创建元数据导入时传递的可用字段的更多信息,请访问 Dataproc Metastore 创建元数据导入 API。

简单的元数据导入配置如下所示

tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore.py[源代码]

METADATA_IMPORT = {
    "name": "test-metadata-import",
    "database_dump": {
        "gcs_uri": GCS_URI,
        "database_type": DB_TYPE,
    },
}

要创建元数据导入,可以使用: DataprocMetastoreCreateMetadataImportOperator

tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore.py[源代码]

import_metadata = DataprocMetastoreCreateMetadataImportOperator(
    task_id="import_metadata",
    project_id=PROJECT_ID,
    region=REGION,
    service_id=SERVICE_ID,
    metadata_import=METADATA_IMPORT,
    metadata_import_id=METADATA_IMPORT_ID,
    timeout=TIMEOUT,
)

创建备份

在为服务创建 Dataproc Metastore 备份之前,需要定义备份。有关在创建备份时可传递的可用字段的更多信息,请访问 Dataproc Metastore 创建备份 API。

简单的备份配置可以如下所示

tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore_backup.py[源代码]

BACKUP = {
    "name": "test-backup",
}

使用此配置,我们可以创建备份: DataprocMetastoreCreateBackupOperator

tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore_backup.py[源代码]

backup_service = DataprocMetastoreCreateBackupOperator(
    task_id="create_backup",
    project_id=PROJECT_ID,
    region=REGION,
    service_id=SERVICE_ID,
    backup=BACKUP,
    backup_id=BACKUP_ID,
    timeout=TIMEOUT,
)

删除备份

要删除备份,可以使用

DataprocMetastoreDeleteBackupOperator

tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore_backup.py[源代码]

delete_backup = DataprocMetastoreDeleteBackupOperator(
    task_id="delete_backup",
    project_id=PROJECT_ID,
    region=REGION,
    service_id=SERVICE_ID,
    backup_id=BACKUP_ID,
    timeout=TIMEOUT,
)

列出备份

要列出备份,可以使用

DataprocMetastoreListBackupsOperator

tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore_backup.py[源代码]

list_backups = DataprocMetastoreListBackupsOperator(
    task_id="list_backups",
    project_id=PROJECT_ID,
    region=REGION,
    service_id=SERVICE_ID,
)

检查 Hive 分区是否存在

要检查特定表的 Metastore 中是否已创建 Hive 分区,可以使用: MetastoreHivePartitionSensor

tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore_hive_partition_sensor.py[源代码]

hive_partition_sensor = MetastoreHivePartitionSensor(
    task_id="hive_partition_sensor",
    service_id=METASTORE_SERVICE_ID,
    region=REGION,
    table=TABLE_NAME,
    partitions=[PARTITION_1, PARTITION_2],
)

此条目是否有用?