Google Cloud Data Catalog 算子

Data Catalog 是一项完全托管且可扩展的元数据管理服务,可让组织快速发现、管理和了解 Google Cloud 中的所有数据。它提供

  • 一个简单易用的搜索界面,用于数据发现,由支持 Gmail 和 Drive 的 Google 搜索技术提供支持

  • 一个灵活且强大的编目系统,用于捕获技术和业务元数据

  • 一种与 DLP API 集成的自动标记机制,用于敏感数据

先决条件任务

要使用这些算子,您必须执行以下操作

管理条目

操作员使用 Entry 表示条目

获取条目

获取条目使用 CloudDataCatalogGetEntryOperatorCloudDataCatalogLookupEntryOperator 操作员。

CloudDataCatalogGetEntryOperator 使用项目 ID、条目组 ID、条目 ID 来获取条目。

tests/system/providers/google/datacatalog/example_datacatalog_entries.py[源代码]

get_entry = CloudDataCatalogGetEntryOperator(
    task_id="get_entry", location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
)

您可以将 Jinja 模板locationentry_groupentryproject_idretrytimeoutmetadatagcp_conn_idimpersonation_chain 参数结合使用,这些参数允许您动态确定值。

结果保存到 XCom 中,这允许其他操作员使用它。

tests/system/providers/google/datacatalog/example_datacatalog_entries.py[源代码]

get_entry_result = BashOperator(task_id="get_entry_result", bash_command=f"echo {get_entry.output}")

CloudDataCatalogLookupEntryOperator 使用资源名称来获取条目。

tests/system/providers/google/datacatalog/example_datacatalog_entries.py[源代码]

current_entry_template = (
    "//datacatalog.googleapis.com/projects/{project_id}/locations/{location}/"
    "entryGroups/{entry_group}/entries/{entry}"
)
lookup_entry_linked_resource = CloudDataCatalogLookupEntryOperator(
    task_id="lookup_entry",
    linked_resource=current_entry_template.format(
        project_id=PROJECT_ID, location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
    ),
)

您可以将 Jinja 模板linked_resourcesql_resourceproject_idretrytimeoutmetadatagcp_conn_idimpersonation_chain 参数结合使用,以动态确定值。

结果保存到 XCom 中,这允许其他操作员使用它。

tests/system/providers/google/datacatalog/example_datacatalog_entries.py[源代码]

lookup_entry_result = BashOperator(
    task_id="lookup_entry_result",
    bash_command="echo \"{{ task_instance.xcom_pull('lookup_entry')['display_name'] }}\"",
)

创建条目

CloudDataCatalogCreateEntryOperator 运算符创建条目。

tests/system/providers/google/datacatalog/example_datacatalog_entries.py[源代码]

create_entry_gcs = CloudDataCatalogCreateEntryOperator(
    task_id="create_entry_gcs",
    location=LOCATION,
    entry_group=ENTRY_GROUP_ID,
    entry_id=ENTRY_ID,
    entry={
        "display_name": ENTRY_NAME,
        "type_": "FILESET",
        "gcs_fileset_spec": {"file_patterns": [f"gs://{BUCKET_NAME}/**"]},
    },
)

您可以将 Jinja 模板locationentry_groupentry_identryproject_idretrytimeoutmetadatagcp_conn_idimpersonation_chain 参数结合使用,以动态确定值。

结果保存到 XCom 中,这允许其他操作员使用它。

可以使用 entry_id 键读取新创建的条目 ID。

tests/system/providers/google/datacatalog/example_datacatalog_entries.py[源代码]

create_entry_gcs_result = BashOperator(
    task_id="create_entry_gcs_result",
    bash_command=f"echo {XComArg(create_entry_gcs, key='entry_id')}",
)

更新条目

CloudDataCatalogUpdateEntryOperator 运算符更新条目。

tests/system/providers/google/datacatalog/example_datacatalog_entries.py[源代码]

update_entry = CloudDataCatalogUpdateEntryOperator(
    task_id="update_entry",
    entry={"display_name": f"{ENTRY_NAME} UPDATED"},
    update_mask={"paths": ["display_name"]},
    location=LOCATION,
    entry_group=ENTRY_GROUP_ID,
    entry_id=ENTRY_ID,
)

您可以将 Jinja 模板entry, update_mask, location, entry_group, entry_id, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain 参数一起使用,这些参数允许您动态确定值。

删除条目

CloudDataCatalogDeleteEntryOperator 运算符删除条目。

tests/system/providers/google/datacatalog/example_datacatalog_entries.py[源代码]

delete_entry = CloudDataCatalogDeleteEntryOperator(
    task_id="delete_entry", location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
)

您可以将 Jinja 模板locationentry_groupentryproject_idretrytimeoutmetadatagcp_conn_idimpersonation_chain 参数结合使用,这些参数允许您动态确定值。

管理条目组

运算符使用 Entry 来表示条目组。

创建条目组

操作符 CloudDataCatalogCreateEntryGroupOperator 创建条目组。

tests/system/providers/google/datacatalog/example_datacatalog_entries.py[源代码]

create_entry_group = CloudDataCatalogCreateEntryGroupOperator(
    task_id="create_entry_group",
    location=LOCATION,
    entry_group_id=ENTRY_GROUP_ID,
    entry_group={"display_name": ENTRY_GROUP_NAME},
)

你可以使用 Jinja 模板locationentry_group_identry_groupproject_idretrytimeoutmetadatagcp_conn_idimpersonation_chain 参数,它们允许你动态确定值。

结果保存到 XCom 中,这允许其他操作员使用它。

新创建的条目组 ID 可以通过 entry_group_id 键读取。

tests/system/providers/google/datacatalog/example_datacatalog_entries.py[源代码]

create_entry_group_result = BashOperator(
    task_id="create_entry_group_result",
    bash_command=f"echo {XComArg(create_entry_group, key='entry_group_id')}",
)

获取条目组

操作符 CloudDataCatalogGetEntryGroupOperator 获取条目组。

tests/system/providers/google/datacatalog/example_datacatalog_entries.py[源代码]

get_entry_group = CloudDataCatalogGetEntryGroupOperator(
    task_id="get_entry_group",
    location=LOCATION,
    entry_group=ENTRY_GROUP_ID,
    read_mask=FieldMask(paths=["name", "display_name"]),
)

你可以使用 Jinja 模板locationentry_groupread_maskproject_idretrytimeoutmetadatagcp_conn_idimpersonation_chain 参数,它们允许你动态确定值。

结果保存到 XCom 中,这允许其他操作员使用它。

tests/system/providers/google/datacatalog/example_datacatalog_entries.py[源代码]

get_entry_group_result = BashOperator(
    task_id="get_entry_group_result",
    bash_command=f"echo {get_entry_group.output}",
)

删除条目组

CloudDataCatalogDeleteEntryGroupOperator 操作符删除条目组。

tests/system/providers/google/datacatalog/example_datacatalog_entries.py[源代码]

delete_entry_group = CloudDataCatalogDeleteEntryGroupOperator(
    task_id="delete_entry_group", location=LOCATION, entry_group=ENTRY_GROUP_ID
)

你可以将 Jinja 模板location, entry_group, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain 参数一起使用,这些参数允许你动态确定值。

管理标签模板

操作符使用 TagTemplate 来表示标签模板。

创建标签模板

CloudDataCatalogCreateTagTemplateOperator 操作符获取标签模板。

tests/system/providers/google/datacatalog/example_datacatalog_tag_templates.py[源代码]

create_tag_template = CloudDataCatalogCreateTagTemplateOperator(
    task_id="create_tag_template",
    location=LOCATION,
    tag_template_id=TEMPLATE_ID,
    tag_template={
        "display_name": TAG_TEMPLATE_DISPLAY_NAME,
        "fields": {
            FIELD_NAME_1: TagTemplateField(
                display_name="first-field", type_=dict(primitive_type="STRING")
            )
        },
    },
)

您可以将 Jinja 模板locationtag_template_idtag_templateproject_idretrytimeoutmetadatagcp_conn_idimpersonation_chain 参数结合使用,这些参数允许您动态确定值。

结果保存到 XCom 中,这允许其他操作员使用它。

可以使用 tag_template_id 键读取新创建的标签模板 ID。

tests/system/providers/google/datacatalog/example_datacatalog_tag_templates.py[源代码]

create_tag_template_result = BashOperator(
    task_id="create_tag_template_result",
    bash_command=f"echo {XComArg(create_tag_template, key='tag_template_id')}",
)

删除标签模板

CloudDataCatalogDeleteTagTemplateOperator 运算符删除标签模板。

tests/system/providers/google/datacatalog/example_datacatalog_tag_templates.py[源代码]

delete_tag_template = CloudDataCatalogDeleteTagTemplateOperator(
    task_id="delete_tag_template", location=LOCATION, tag_template=TEMPLATE_ID, force=True
)

您可以将 Jinja 模板locationtag_templateforceproject_idretrytimeoutmetadatagcp_conn_idimpersonation_chain 参数结合使用,这些参数允许您动态确定值。

获取标签模板

CloudDataCatalogGetTagTemplateOperator 运算符获取标签模板。

tests/system/providers/google/datacatalog/example_datacatalog_tag_templates.py[源代码]

get_tag_template = CloudDataCatalogGetTagTemplateOperator(
    task_id="get_tag_template", location=LOCATION, tag_template=TEMPLATE_ID
)

您可以将 Jinja 模板locationtag_templateproject_idretrytimeoutmetadatagcp_conn_idimpersonation_chain 参数结合使用,这些参数允许您动态确定值。

结果保存到 XCom 中,这允许其他操作员使用它。

tests/system/providers/google/datacatalog/example_datacatalog_tag_templates.py[源代码]

get_tag_template_result = BashOperator(
    task_id="get_tag_template_result",
    bash_command=f"echo {get_tag_template.output}",
)

更新标签模板

CloudDataCatalogUpdateTagTemplateOperator 运算符更新标签模板。

tests/system/providers/google/datacatalog/example_datacatalog_tag_templates.py[源代码]

update_tag_template = CloudDataCatalogUpdateTagTemplateOperator(
    task_id="update_tag_template",
    tag_template={"display_name": f"{TAG_TEMPLATE_DISPLAY_NAME} UPDATED"},
    update_mask={"paths": ["display_name"]},
    location=LOCATION,
    tag_template_id=TEMPLATE_ID,
)

您可以将 Jinja 模板tag_templateupdate_masklocationtag_template_idproject_idretrytimeoutmetadatagcp_conn_idimpersonation_chain 参数结合使用,这些参数允许您动态确定值。

管理标签

运算符使用 Tag 来表示标签。

在条目上创建标签

操作符 CloudDataCatalogCreateTagOperator 获取标签模板。

tests/system/providers/google/datacatalog/example_datacatalog_tags.py[源代码]

create_tag = CloudDataCatalogCreateTagOperator(
    task_id="create_tag",
    location=LOCATION,
    entry_group=ENTRY_GROUP_ID,
    entry=ENTRY_ID,
    template_id=TEMPLATE_ID,
    tag={"fields": {FIELD_NAME_1: TagField(string_value="example-value-string")}},
)

你可以将 Jinja 模板location, entry_group, entry, tag, template_id, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain 参数结合使用,这允许你动态地确定值。

结果保存到 XCom 中,这允许其他操作员使用它。

新创建的标签 ID 可通过 tag_id 键进行读取。

tests/system/providers/google/datacatalog/example_datacatalog_tags.py[源代码]

create_tag_result = BashOperator(
    task_id="create_tag_result",
    bash_command=f"echo {tag_id}",
)

更新标签

操作符 CloudDataCatalogUpdateTagOperator 更新标签模板。

tests/system/providers/google/datacatalog/example_datacatalog_tags.py[源代码]

update_tag = CloudDataCatalogUpdateTagOperator(
    task_id="update_tag",
    tag={"fields": {FIELD_NAME_1: TagField(string_value="new-value-string")}},
    update_mask={"paths": ["fields"]},
    location=LOCATION,
    entry_group=ENTRY_GROUP_ID,
    entry=ENTRY_ID,
    tag_id=tag_id,
)

您可以将 Jinja 模板tagupdate_masklocationentry_groupentrytag_idproject_idretrytimeoutmetadatagcp_conn_idimpersonation_chain 参数结合使用,这些参数允许您动态确定值。

删除标签

CloudDataCatalogDeleteTagOperator 运算符删除标签模板。

tests/system/providers/google/datacatalog/example_datacatalog_tags.py[源代码]

delete_tag = CloudDataCatalogDeleteTagOperator(
    task_id="delete_tag",
    location=LOCATION,
    entry_group=ENTRY_GROUP_ID,
    entry=ENTRY_ID,
    tag=tag_id,
)

您可以将 Jinja 模板locationentry_groupentrytagproject_idretrytimeoutmetadatagcp_conn_idimpersonation_chain 参数结合使用,这些参数允许您动态确定值。

列出条目上的标签

CloudDataCatalogListTagsOperator 运算符获取条目上的标签列表。

tests/system/providers/google/datacatalog/example_datacatalog_tags.py[源代码]

list_tags = CloudDataCatalogListTagsOperator(
    task_id="list_tags", location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
)

你可以使用 Jinja 模板location, entry_group, entry, page_size, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain 参数一起使用,这些参数允许你动态确定值。

结果保存到 XCom 中,这允许其他操作员使用它。

tests/system/providers/google/datacatalog/example_datacatalog_tags.py[源代码]

list_tags_result = BashOperator(task_id="list_tags_result", bash_command=f"echo {list_tags.output}")

管理标签模板字段

运算符使用 TagTemplateField 来表示标签模板字段。

创建字段

CloudDataCatalogCreateTagTemplateFieldOperator 运算符获取标签模板字段。

tests/system/providers/google/datacatalog/example_datacatalog_tag_templates.py[源代码]

create_tag_template_field = CloudDataCatalogCreateTagTemplateFieldOperator(
    task_id="create_tag_template_field",
    location=LOCATION,
    tag_template=TEMPLATE_ID,
    tag_template_field_id=FIELD_NAME_2,
    tag_template_field=TagTemplateField(
        display_name="second-field", type_=FieldType(primitive_type="STRING")
    ),
)

你可以将 Jinja 模板locationtag_templatetag_template_field_idtag_template_fieldproject_idretrytimeoutmetadatagcp_conn_idimpersonation_chain 参数一起使用,这些参数允许你动态确定值。

结果保存到 XCom 中,这允许其他操作员使用它。

可以使用 tag_template_field_id 键读取新创建的字段 ID。

tests/system/providers/google/datacatalog/example_datacatalog_tag_templates.py[源代码]

create_tag_template_field_result = BashOperator(
    task_id="create_tag_template_field_result",
    bash_command=f"echo {XComArg(create_tag_template_field, key='tag_template_field_id')}",
)

重命名字段

CloudDataCatalogRenameTagTemplateFieldOperator 运算符重命名标签模板字段。

tests/system/providers/google/datacatalog/example_datacatalog_tag_templates.py[源代码]

rename_tag_template_field = CloudDataCatalogRenameTagTemplateFieldOperator(
    task_id="rename_tag_template_field",
    location=LOCATION,
    tag_template=TEMPLATE_ID,
    field=FIELD_NAME_1,
    new_tag_template_field_id=FIELD_NAME_3,
)

你可以将 Jinja 模板locationtag_templatefieldnew_tag_template_field_idproject_idretrytimeoutmetadatagcp_conn_idimpersonation_chain 参数一起使用,这些参数允许你动态确定值。

更新字段

CloudDataCatalogUpdateTagTemplateFieldOperator 运算符获取标签模板字段。

tests/system/providers/google/datacatalog/example_datacatalog_tag_templates.py[源代码]

update_tag_template_field = CloudDataCatalogUpdateTagTemplateFieldOperator(
    task_id="update_tag_template_field",
    tag_template_field={"display_name": "Updated template field"},
    update_mask={"paths": ["display_name"]},
    location=LOCATION,
    tag_template=TEMPLATE_ID,
    tag_template_field_id=FIELD_NAME_1,
)

你可以使用 Jinja 模板tag_template_field, update_mask, tag_template_field_name, location, tag_template, tag_template_field_id, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain 参数,这些参数允许你动态确定值。

删除字段

CloudDataCatalogDeleteTagTemplateFieldOperator 运算符删除标签模板字段。

tests/system/providers/google/datacatalog/example_datacatalog_tag_templates.py[源代码]

delete_tag_template_field = CloudDataCatalogDeleteTagTemplateFieldOperator(
    task_id="delete_tag_template_field",
    location=LOCATION,
    tag_template=TEMPLATE_ID,
    field=FIELD_NAME_2,
    force=True,
)

你可以使用 Jinja 模板location, tag_template, field, force, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain 参数,这些参数允许你动态确定值。

搜索资源

CloudDataCatalogSearchCatalogOperator 运算符在 Data Catalog 中搜索多个资源,例如与查询匹配的条目、标签。

query 参数应使用 搜索语法 进行定义。

tests/system/providers/google/datacatalog/example_datacatalog_search_catalog.py[源代码]

search_catalog = CloudDataCatalogSearchCatalogOperator(
    task_id="search_catalog",
    scope={"include_project_ids": [PROJECT_ID]},
    query=f"name:{ENTRY_GROUP_NAME}",
)

你可以将 Jinja 模板scope, query, page_size, order_by, retry, timeout, metadata, gcp_conn_id, impersonation_chain 参数配合使用,以便动态确定值。

结果保存到 XCom 中,这允许其他操作员使用它。

tests/system/providers/google/datacatalog/example_datacatalog_search_catalog.py[源代码]

search_catalog_result = BashOperator(
    task_id="search_catalog_result",
    bash_command=f"echo {search_catalog.output}",
)

参考

有关更多信息,请查看

此条目有帮助吗?