Google Cloud Data Catalog 算子¶
Data Catalog 是一项完全托管且可扩展的元数据管理服务,可让组织快速发现、管理和了解 Google Cloud 中的所有数据。它提供
一个简单易用的搜索界面,用于数据发现,由支持 Gmail 和 Drive 的 Google 搜索技术提供支持
一个灵活且强大的编目系统,用于捕获技术和业务元数据
一种与 DLP API 集成的自动标记机制,用于敏感数据
先决条件任务¶
要使用这些算子,您必须执行以下操作
使用 Cloud 控制台 选择或创建 Cloud Platform 项目。
为您的项目启用帐单,如 Google Cloud 文档 中所述。
启用 API,如 Cloud 控制台文档 中所述。
通过 pip 安装 API 库。
pip install 'apache-airflow[google]'有关 安装 的详细信息,请访问此处。
管理条目¶
操作员使用 Entry
表示条目
获取条目¶
获取条目使用 CloudDataCatalogGetEntryOperator
和 CloudDataCatalogLookupEntryOperator
操作员。
CloudDataCatalogGetEntryOperator
使用项目 ID、条目组 ID、条目 ID 来获取条目。
get_entry = CloudDataCatalogGetEntryOperator(
task_id="get_entry", location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
)
您可以将 Jinja 模板 与 location
、entry_group
、entry
、project_id
、retry
、timeout
、metadata
、gcp_conn_id
、impersonation_chain
参数结合使用,这些参数允许您动态确定值。
结果保存到 XCom 中,这允许其他操作员使用它。
get_entry_result = BashOperator(task_id="get_entry_result", bash_command=f"echo {get_entry.output}")
CloudDataCatalogLookupEntryOperator
使用资源名称来获取条目。
current_entry_template = (
"//datacatalog.googleapis.com/projects/{project_id}/locations/{location}/"
"entryGroups/{entry_group}/entries/{entry}"
)
lookup_entry_linked_resource = CloudDataCatalogLookupEntryOperator(
task_id="lookup_entry",
linked_resource=current_entry_template.format(
project_id=PROJECT_ID, location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
),
)
您可以将 Jinja 模板 与 linked_resource
、sql_resource
、project_id
、retry
、timeout
、metadata
、gcp_conn_id
、impersonation_chain
参数结合使用,以动态确定值。
结果保存到 XCom 中,这允许其他操作员使用它。
lookup_entry_result = BashOperator(
task_id="lookup_entry_result",
bash_command="echo \"{{ task_instance.xcom_pull('lookup_entry')['display_name'] }}\"",
)
创建条目¶
CloudDataCatalogCreateEntryOperator
运算符创建条目。
create_entry_gcs = CloudDataCatalogCreateEntryOperator(
task_id="create_entry_gcs",
location=LOCATION,
entry_group=ENTRY_GROUP_ID,
entry_id=ENTRY_ID,
entry={
"display_name": ENTRY_NAME,
"type_": "FILESET",
"gcs_fileset_spec": {"file_patterns": [f"gs://{BUCKET_NAME}/**"]},
},
)
您可以将 Jinja 模板 与 location
、entry_group
、entry_id
、entry
、project_id
、retry
、timeout
、metadata
、gcp_conn_id
、impersonation_chain
参数结合使用,以动态确定值。
结果保存到 XCom 中,这允许其他操作员使用它。
可以使用 entry_id
键读取新创建的条目 ID。
create_entry_gcs_result = BashOperator(
task_id="create_entry_gcs_result",
bash_command=f"echo {XComArg(create_entry_gcs, key='entry_id')}",
)
更新条目¶
该 CloudDataCatalogUpdateEntryOperator
运算符更新条目。
update_entry = CloudDataCatalogUpdateEntryOperator(
task_id="update_entry",
entry={"display_name": f"{ENTRY_NAME} UPDATED"},
update_mask={"paths": ["display_name"]},
location=LOCATION,
entry_group=ENTRY_GROUP_ID,
entry_id=ENTRY_ID,
)
您可以将 Jinja 模板 与 entry
, update_mask
, location
, entry_group
, entry_id
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
参数一起使用,这些参数允许您动态确定值。
删除条目¶
该 CloudDataCatalogDeleteEntryOperator
运算符删除条目。
delete_entry = CloudDataCatalogDeleteEntryOperator(
task_id="delete_entry", location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
)
您可以将 Jinja 模板 与 location
、entry_group
、entry
、project_id
、retry
、timeout
、metadata
、gcp_conn_id
、impersonation_chain
参数结合使用,这些参数允许您动态确定值。
管理条目组¶
运算符使用 Entry
来表示条目组。
创建条目组¶
操作符 CloudDataCatalogCreateEntryGroupOperator
创建条目组。
create_entry_group = CloudDataCatalogCreateEntryGroupOperator(
task_id="create_entry_group",
location=LOCATION,
entry_group_id=ENTRY_GROUP_ID,
entry_group={"display_name": ENTRY_GROUP_NAME},
)
你可以使用 Jinja 模板 和 location
、entry_group_id
、entry_group
、project_id
、retry
、timeout
、metadata
、gcp_conn_id
、impersonation_chain
参数,它们允许你动态确定值。
结果保存到 XCom 中,这允许其他操作员使用它。
新创建的条目组 ID 可以通过 entry_group_id
键读取。
create_entry_group_result = BashOperator(
task_id="create_entry_group_result",
bash_command=f"echo {XComArg(create_entry_group, key='entry_group_id')}",
)
获取条目组¶
操作符 CloudDataCatalogGetEntryGroupOperator
获取条目组。
get_entry_group = CloudDataCatalogGetEntryGroupOperator(
task_id="get_entry_group",
location=LOCATION,
entry_group=ENTRY_GROUP_ID,
read_mask=FieldMask(paths=["name", "display_name"]),
)
你可以使用 Jinja 模板 和 location
、entry_group
、read_mask
、project_id
、retry
、timeout
、metadata
、gcp_conn_id
、impersonation_chain
参数,它们允许你动态确定值。
结果保存到 XCom 中,这允许其他操作员使用它。
get_entry_group_result = BashOperator(
task_id="get_entry_group_result",
bash_command=f"echo {get_entry_group.output}",
)
删除条目组¶
CloudDataCatalogDeleteEntryGroupOperator
操作符删除条目组。
delete_entry_group = CloudDataCatalogDeleteEntryGroupOperator(
task_id="delete_entry_group", location=LOCATION, entry_group=ENTRY_GROUP_ID
)
你可以将 Jinja 模板 与 location
, entry_group
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
参数一起使用,这些参数允许你动态确定值。
管理标签模板¶
操作符使用 TagTemplate
来表示标签模板。
创建标签模板¶
CloudDataCatalogCreateTagTemplateOperator
操作符获取标签模板。
create_tag_template = CloudDataCatalogCreateTagTemplateOperator(
task_id="create_tag_template",
location=LOCATION,
tag_template_id=TEMPLATE_ID,
tag_template={
"display_name": TAG_TEMPLATE_DISPLAY_NAME,
"fields": {
FIELD_NAME_1: TagTemplateField(
display_name="first-field", type_=dict(primitive_type="STRING")
)
},
},
)
您可以将 Jinja 模板 与 location
、tag_template_id
、tag_template
、project_id
、retry
、timeout
、metadata
、gcp_conn_id
、impersonation_chain
参数结合使用,这些参数允许您动态确定值。
结果保存到 XCom 中,这允许其他操作员使用它。
可以使用 tag_template_id
键读取新创建的标签模板 ID。
create_tag_template_result = BashOperator(
task_id="create_tag_template_result",
bash_command=f"echo {XComArg(create_tag_template, key='tag_template_id')}",
)
删除标签模板¶
CloudDataCatalogDeleteTagTemplateOperator
运算符删除标签模板。
delete_tag_template = CloudDataCatalogDeleteTagTemplateOperator(
task_id="delete_tag_template", location=LOCATION, tag_template=TEMPLATE_ID, force=True
)
您可以将 Jinja 模板 与 location
、tag_template
、force
、project_id
、retry
、timeout
、metadata
、gcp_conn_id
、impersonation_chain
参数结合使用,这些参数允许您动态确定值。
获取标签模板¶
CloudDataCatalogGetTagTemplateOperator
运算符获取标签模板。
get_tag_template = CloudDataCatalogGetTagTemplateOperator(
task_id="get_tag_template", location=LOCATION, tag_template=TEMPLATE_ID
)
您可以将 Jinja 模板 与 location
、tag_template
、project_id
、retry
、timeout
、metadata
、gcp_conn_id
、impersonation_chain
参数结合使用,这些参数允许您动态确定值。
结果保存到 XCom 中,这允许其他操作员使用它。
get_tag_template_result = BashOperator(
task_id="get_tag_template_result",
bash_command=f"echo {get_tag_template.output}",
)
更新标签模板¶
CloudDataCatalogUpdateTagTemplateOperator
运算符更新标签模板。
update_tag_template = CloudDataCatalogUpdateTagTemplateOperator(
task_id="update_tag_template",
tag_template={"display_name": f"{TAG_TEMPLATE_DISPLAY_NAME} UPDATED"},
update_mask={"paths": ["display_name"]},
location=LOCATION,
tag_template_id=TEMPLATE_ID,
)
您可以将 Jinja 模板 与 tag_template
、update_mask
、location
、tag_template_id
、project_id
、retry
、timeout
、metadata
、gcp_conn_id
、impersonation_chain
参数结合使用,这些参数允许您动态确定值。
管理标签¶
运算符使用 Tag
来表示标签。
在条目上创建标签¶
操作符 CloudDataCatalogCreateTagOperator
获取标签模板。
create_tag = CloudDataCatalogCreateTagOperator(
task_id="create_tag",
location=LOCATION,
entry_group=ENTRY_GROUP_ID,
entry=ENTRY_ID,
template_id=TEMPLATE_ID,
tag={"fields": {FIELD_NAME_1: TagField(string_value="example-value-string")}},
)
你可以将 Jinja 模板 与 location
, entry_group
, entry
, tag
, template_id
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
参数结合使用,这允许你动态地确定值。
结果保存到 XCom 中,这允许其他操作员使用它。
新创建的标签 ID 可通过 tag_id
键进行读取。
create_tag_result = BashOperator(
task_id="create_tag_result",
bash_command=f"echo {tag_id}",
)
更新标签¶
操作符 CloudDataCatalogUpdateTagOperator
更新标签模板。
update_tag = CloudDataCatalogUpdateTagOperator(
task_id="update_tag",
tag={"fields": {FIELD_NAME_1: TagField(string_value="new-value-string")}},
update_mask={"paths": ["fields"]},
location=LOCATION,
entry_group=ENTRY_GROUP_ID,
entry=ENTRY_ID,
tag_id=tag_id,
)
您可以将 Jinja 模板 与 tag
、update_mask
、location
、entry_group
、entry
、tag_id
、project_id
、retry
、timeout
、metadata
、gcp_conn_id
、impersonation_chain
参数结合使用,这些参数允许您动态确定值。
删除标签¶
该 CloudDataCatalogDeleteTagOperator
运算符删除标签模板。
delete_tag = CloudDataCatalogDeleteTagOperator(
task_id="delete_tag",
location=LOCATION,
entry_group=ENTRY_GROUP_ID,
entry=ENTRY_ID,
tag=tag_id,
)
您可以将 Jinja 模板 与 location
、entry_group
、entry
、tag
、project_id
、retry
、timeout
、metadata
、gcp_conn_id
、impersonation_chain
参数结合使用,这些参数允许您动态确定值。
列出条目上的标签¶
CloudDataCatalogListTagsOperator
运算符获取条目上的标签列表。
list_tags = CloudDataCatalogListTagsOperator(
task_id="list_tags", location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
)
你可以使用 Jinja 模板 与 location
, entry_group
, entry
, page_size
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
参数一起使用,这些参数允许你动态确定值。
结果保存到 XCom 中,这允许其他操作员使用它。
list_tags_result = BashOperator(task_id="list_tags_result", bash_command=f"echo {list_tags.output}")
管理标签模板字段¶
运算符使用 TagTemplateField
来表示标签模板字段。
创建字段¶
CloudDataCatalogCreateTagTemplateFieldOperator
运算符获取标签模板字段。
create_tag_template_field = CloudDataCatalogCreateTagTemplateFieldOperator(
task_id="create_tag_template_field",
location=LOCATION,
tag_template=TEMPLATE_ID,
tag_template_field_id=FIELD_NAME_2,
tag_template_field=TagTemplateField(
display_name="second-field", type_=FieldType(primitive_type="STRING")
),
)
你可以将 Jinja 模板 与 location
、tag_template
、tag_template_field_id
、tag_template_field
、project_id
、retry
、timeout
、metadata
、gcp_conn_id
、impersonation_chain
参数一起使用,这些参数允许你动态确定值。
结果保存到 XCom 中,这允许其他操作员使用它。
可以使用 tag_template_field_id
键读取新创建的字段 ID。
create_tag_template_field_result = BashOperator(
task_id="create_tag_template_field_result",
bash_command=f"echo {XComArg(create_tag_template_field, key='tag_template_field_id')}",
)
重命名字段¶
CloudDataCatalogRenameTagTemplateFieldOperator
运算符重命名标签模板字段。
rename_tag_template_field = CloudDataCatalogRenameTagTemplateFieldOperator(
task_id="rename_tag_template_field",
location=LOCATION,
tag_template=TEMPLATE_ID,
field=FIELD_NAME_1,
new_tag_template_field_id=FIELD_NAME_3,
)
你可以将 Jinja 模板 与 location
、tag_template
、field
、new_tag_template_field_id
、project_id
、retry
、timeout
、metadata
、gcp_conn_id
、impersonation_chain
参数一起使用,这些参数允许你动态确定值。
更新字段¶
CloudDataCatalogUpdateTagTemplateFieldOperator
运算符获取标签模板字段。
update_tag_template_field = CloudDataCatalogUpdateTagTemplateFieldOperator(
task_id="update_tag_template_field",
tag_template_field={"display_name": "Updated template field"},
update_mask={"paths": ["display_name"]},
location=LOCATION,
tag_template=TEMPLATE_ID,
tag_template_field_id=FIELD_NAME_1,
)
你可以使用 Jinja 模板 与 tag_template_field
, update_mask
, tag_template_field_name
, location
, tag_template
, tag_template_field_id
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
参数,这些参数允许你动态确定值。
删除字段¶
该 CloudDataCatalogDeleteTagTemplateFieldOperator
运算符删除标签模板字段。
delete_tag_template_field = CloudDataCatalogDeleteTagTemplateFieldOperator(
task_id="delete_tag_template_field",
location=LOCATION,
tag_template=TEMPLATE_ID,
field=FIELD_NAME_2,
force=True,
)
你可以使用 Jinja 模板 与 location
, tag_template
, field
, force
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
参数,这些参数允许你动态确定值。
搜索资源¶
该 CloudDataCatalogSearchCatalogOperator
运算符在 Data Catalog 中搜索多个资源,例如与查询匹配的条目、标签。
该 query
参数应使用 搜索语法 进行定义。
search_catalog = CloudDataCatalogSearchCatalogOperator(
task_id="search_catalog",
scope={"include_project_ids": [PROJECT_ID]},
query=f"name:{ENTRY_GROUP_NAME}",
)
你可以将 Jinja 模板 与 scope
, query
, page_size
, order_by
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
参数配合使用,以便动态确定值。
结果保存到 XCom 中,这允许其他操作员使用它。
search_catalog_result = BashOperator(
task_id="search_catalog_result",
bash_command=f"echo {search_catalog.output}",
)