Google Cloud Data Catalog 运算符¶
Data Catalog 是一项完全托管且可扩展的元数据管理服务,允许组织在 Google Cloud 中快速发现、管理和了解其所有数据。它提供
一个简单易用的数据发现搜索界面,由支持 Gmail 和云端硬盘的同一 Google 搜索技术提供支持
一个灵活而强大的编目系统,用于捕获技术和业务元数据
一个用于使用 DLP API 集成标记敏感数据的自动标记机制
先决条件任务¶
要使用这些运算符,您必须执行以下几项操作
使用 Cloud Console 选择或创建 Cloud Platform 项目。
按照 Google Cloud 文档 中的说明,为您的项目启用结算。
按照 Cloud Console 文档 中的说明启用 API。
通过 pip 安装 API 库。
pip install 'apache-airflow[google]'有关详细信息,请参阅 安装。
管理条目¶
运算符使用 Entry
来表示条目
获取条目¶
获取条目是通过 CloudDataCatalogGetEntryOperator
和 CloudDataCatalogLookupEntryOperator
运算符执行的。
CloudDataCatalogGetEntryOperator
使用项目 ID、条目组 ID、条目 ID 来获取条目。
get_entry = CloudDataCatalogGetEntryOperator(
task_id="get_entry", location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
)
您可以将 Jinja 模板 与 location
、entry_group
、entry
、project_id
、retry
、timeout
、metadata
、gcp_conn_id
、impersonation_chain
参数一起使用,这允许您动态确定值。
结果将保存到 XCom,从而允许其他运算符使用它。
get_entry_result = BashOperator(task_id="get_entry_result", bash_command=f"echo {get_entry.output}")
CloudDataCatalogLookupEntryOperator
使用资源名称来获取条目。
current_entry_template = (
"//datacatalog.googleapis.com/projects/{project_id}/locations/{location}/"
"entryGroups/{entry_group}/entries/{entry}"
)
lookup_entry_linked_resource = CloudDataCatalogLookupEntryOperator(
task_id="lookup_entry",
linked_resource=current_entry_template.format(
project_id=PROJECT_ID, location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
),
)
您可以将 Jinja 模板 与 linked_resource
、sql_resource
、project_id
、retry
、timeout
、metadata
、gcp_conn_id
、impersonation_chain
参数一起使用,这允许您动态确定值。
结果将保存到 XCom,从而允许其他运算符使用它。
lookup_entry_result = BashOperator(
task_id="lookup_entry_result",
bash_command="echo \"{{ task_instance.xcom_pull('lookup_entry')['display_name'] }}\"",
)
创建条目¶
CloudDataCatalogCreateEntryOperator
运算符创建条目。
create_entry_gcs = CloudDataCatalogCreateEntryOperator(
task_id="create_entry_gcs",
location=LOCATION,
entry_group=ENTRY_GROUP_ID,
entry_id=ENTRY_ID,
entry={
"display_name": ENTRY_NAME,
"type_": "FILESET",
"gcs_fileset_spec": {"file_patterns": [f"gs://{BUCKET_NAME}/**"]},
},
)
您可以将 Jinja 模板 与 location
、entry_group
、entry_id
、entry
、project_id
、retry
、timeout
、metadata
、gcp_conn_id
、impersonation_chain
参数一起使用,这允许您动态确定值。
结果将保存到 XCom,从而允许其他运算符使用它。
新创建的条目 ID 可以通过 entry_id
键读取。
create_entry_gcs_result = BashOperator(
task_id="create_entry_gcs_result",
bash_command=f"echo {XComArg(create_entry_gcs, key='entry_id')}",
)
更新条目¶
CloudDataCatalogUpdateEntryOperator
运算符更新条目。
update_entry = CloudDataCatalogUpdateEntryOperator(
task_id="update_entry",
entry={"display_name": f"{ENTRY_NAME} UPDATED"},
update_mask={"paths": ["display_name"]},
location=LOCATION,
entry_group=ENTRY_GROUP_ID,
entry_id=ENTRY_ID,
)
您可以将 Jinja 模板 与 entry
、update_mask
、location
、entry_group
、entry_id
、project_id
、retry
、timeout
、metadata
、gcp_conn_id
、impersonation_chain
参数一起使用,这允许您动态确定值。
删除条目¶
CloudDataCatalogDeleteEntryOperator
运算符删除条目。
delete_entry = CloudDataCatalogDeleteEntryOperator(
task_id="delete_entry", location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
)
您可以将 Jinja 模板 与 location
、entry_group
、entry
、project_id
、retry
、timeout
、metadata
、gcp_conn_id
、impersonation_chain
参数一起使用,这允许您动态确定值。
管理条目组¶
运算符使用 Entry
来表示条目组。
创建条目组¶
CloudDataCatalogCreateEntryGroupOperator
运算符创建条目组。
create_entry_group = CloudDataCatalogCreateEntryGroupOperator(
task_id="create_entry_group",
location=LOCATION,
entry_group_id=ENTRY_GROUP_ID,
entry_group={"display_name": ENTRY_GROUP_NAME},
)
您可以使用 Jinja 模板 来处理 location
, entry_group_id
, entry_group
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
参数,这允许您动态确定值。
结果将保存到 XCom,从而允许其他运算符使用它。
新创建的条目组 ID 可以使用 entry_group_id
键读取。
create_entry_group_result = BashOperator(
task_id="create_entry_group_result",
bash_command=f"echo {XComArg(create_entry_group, key='entry_group_id')}",
)
获取条目组¶
CloudDataCatalogGetEntryGroupOperator
操作符获取条目组。
get_entry_group = CloudDataCatalogGetEntryGroupOperator(
task_id="get_entry_group",
location=LOCATION,
entry_group=ENTRY_GROUP_ID,
read_mask=FieldMask(paths=["name", "display_name"]),
)
您可以使用 Jinja 模板 来处理 location
, entry_group
, read_mask
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
参数,这允许您动态确定值。
结果将保存到 XCom,从而允许其他运算符使用它。
get_entry_group_result = BashOperator(
task_id="get_entry_group_result",
bash_command=f"echo {get_entry_group.output}",
)
删除条目组¶
CloudDataCatalogDeleteEntryGroupOperator
操作符删除条目组。
delete_entry_group = CloudDataCatalogDeleteEntryGroupOperator(
task_id="delete_entry_group", location=LOCATION, entry_group=ENTRY_GROUP_ID
)
您可以使用 Jinja 模板 来处理 location
, entry_group
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
参数,这允许您动态确定值。
管理标签模板¶
操作符使用 TagTemplate
来表示标签模板。
创建标签模板¶
CloudDataCatalogCreateTagTemplateOperator
操作符获取标签模板。
create_tag_template = CloudDataCatalogCreateTagTemplateOperator(
task_id="create_tag_template",
location=LOCATION,
tag_template_id=TEMPLATE_ID,
tag_template={
"display_name": TAG_TEMPLATE_DISPLAY_NAME,
"fields": {
FIELD_NAME_1: TagTemplateField(
display_name="first-field", type_=dict(primitive_type="STRING")
)
},
},
)
您可以使用 Jinja 模板 来处理 location
, tag_template_id
, tag_template
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
参数,这允许您动态确定值。
结果将保存到 XCom,从而允许其他运算符使用它。
新创建的标签模板 ID 可以使用 tag_template_id
键读取。
create_tag_template_result = BashOperator(
task_id="create_tag_template_result",
bash_command=f"echo {XComArg(create_tag_template, key='tag_template_id')}",
)
删除标签模板¶
CloudDataCatalogDeleteTagTemplateOperator
操作符删除标签模板。
delete_tag_template = CloudDataCatalogDeleteTagTemplateOperator(
task_id="delete_tag_template", location=LOCATION, tag_template=TEMPLATE_ID, force=True
)
您可以使用 Jinja 模板 来处理 location
, tag_template
, force
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
参数,这允许您动态确定值。
获取标签模板¶
CloudDataCatalogGetTagTemplateOperator
操作符获取标签模板。
get_tag_template = CloudDataCatalogGetTagTemplateOperator(
task_id="get_tag_template", location=LOCATION, tag_template=TEMPLATE_ID
)
您可以使用 Jinja 模板 来处理 location
, tag_template
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
参数,这允许您动态确定值。
结果将保存到 XCom,从而允许其他运算符使用它。
get_tag_template_result = BashOperator(
task_id="get_tag_template_result",
bash_command=f"echo {get_tag_template.output}",
)
更新标签模板¶
CloudDataCatalogUpdateTagTemplateOperator
操作符更新标签模板。
update_tag_template = CloudDataCatalogUpdateTagTemplateOperator(
task_id="update_tag_template",
tag_template={"display_name": f"{TAG_TEMPLATE_DISPLAY_NAME} UPDATED"},
update_mask={"paths": ["display_name"]},
location=LOCATION,
tag_template_id=TEMPLATE_ID,
)
您可以使用 Jinja 模板 来处理 tag_template
, update_mask
, location
, tag_template_id
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
参数,这允许您动态确定值。
管理标签¶
操作符使用 Tag
来表示标签。
在条目上创建标签¶
CloudDataCatalogCreateTagOperator
操作符获取标签模板。
create_tag = CloudDataCatalogCreateTagOperator(
task_id="create_tag",
location=LOCATION,
entry_group=ENTRY_GROUP_ID,
entry=ENTRY_ID,
template_id=TEMPLATE_ID,
tag={"fields": {FIELD_NAME_1: TagField(string_value="example-value-string")}},
)
您可以使用 Jinja 模板 来处理 location
, entry_group
, entry
, tag
, template_id
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
参数,这允许您动态确定值。
结果将保存到 XCom,从而允许其他运算符使用它。
新创建的标签 ID 可以使用 tag_id
键读取。
create_tag_result = BashOperator(
task_id="create_tag_result",
bash_command=f"echo {tag_id}",
)
更新标签¶
CloudDataCatalogUpdateTagOperator
操作符更新标签模板。
update_tag = CloudDataCatalogUpdateTagOperator(
task_id="update_tag",
tag={"fields": {FIELD_NAME_1: TagField(string_value="new-value-string")}},
update_mask={"paths": ["fields"]},
location=LOCATION,
entry_group=ENTRY_GROUP_ID,
entry=ENTRY_ID,
tag_id=tag_id,
)
您可以使用 Jinja 模板 来处理 tag
, update_mask
, location
, entry_group
, entry
, tag_id
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
参数,这允许您动态确定值。
删除标签¶
CloudDataCatalogDeleteTagOperator
操作符删除标签模板。
delete_tag = CloudDataCatalogDeleteTagOperator(
task_id="delete_tag",
location=LOCATION,
entry_group=ENTRY_GROUP_ID,
entry=ENTRY_ID,
tag=tag_id,
)
您可以使用Jinja 模板,通过 location
, entry_group
, entry
, tag
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
参数来动态确定值。
列出条目上的标签¶
CloudDataCatalogListTagsOperator
操作符获取条目上的标签列表。
list_tags = CloudDataCatalogListTagsOperator(
task_id="list_tags", location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
)
您可以使用Jinja 模板,通过 location
, entry_group
, entry
, page_size
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
参数来动态确定值。
结果将保存到 XCom,从而允许其他运算符使用它。
list_tags_result = BashOperator(task_id="list_tags_result", bash_command=f"echo {list_tags.output}")
管理标签模板字段¶
操作符使用 TagTemplateField
来表示标签模板字段。
创建字段¶
CloudDataCatalogCreateTagTemplateFieldOperator
操作符获取标签模板字段。
create_tag_template_field = CloudDataCatalogCreateTagTemplateFieldOperator(
task_id="create_tag_template_field",
location=LOCATION,
tag_template=TEMPLATE_ID,
tag_template_field_id=FIELD_NAME_2,
tag_template_field=TagTemplateField(
display_name="second-field", type_=FieldType(primitive_type="STRING")
),
)
您可以使用Jinja 模板,通过 location
, tag_template
, tag_template_field_id
, tag_template_field
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
参数来动态确定值。
结果将保存到 XCom,从而允许其他运算符使用它。
新创建的字段 ID 可以使用 tag_template_field_id
键读取。
create_tag_template_field_result = BashOperator(
task_id="create_tag_template_field_result",
bash_command=f"echo {XComArg(create_tag_template_field, key='tag_template_field_id')}",
)
重命名字段¶
CloudDataCatalogRenameTagTemplateFieldOperator
操作符重命名标签模板字段。
rename_tag_template_field = CloudDataCatalogRenameTagTemplateFieldOperator(
task_id="rename_tag_template_field",
location=LOCATION,
tag_template=TEMPLATE_ID,
field=FIELD_NAME_1,
new_tag_template_field_id=FIELD_NAME_3,
)
您可以使用Jinja 模板,通过 location
, tag_template
, field
, new_tag_template_field_id
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
参数来动态确定值。
更新字段¶
CloudDataCatalogUpdateTagTemplateFieldOperator
操作符获取标签模板字段。
update_tag_template_field = CloudDataCatalogUpdateTagTemplateFieldOperator(
task_id="update_tag_template_field",
tag_template_field={"display_name": "Updated template field"},
update_mask={"paths": ["display_name"]},
location=LOCATION,
tag_template=TEMPLATE_ID,
tag_template_field_id=FIELD_NAME_1,
)
您可以使用Jinja 模板,通过 tag_template_field
, update_mask
, tag_template_field_name
, location
, tag_template
, tag_template_field_id
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
参数来动态确定值。
删除字段¶
CloudDataCatalogDeleteTagTemplateFieldOperator
操作符删除标签模板字段。
delete_tag_template_field = CloudDataCatalogDeleteTagTemplateFieldOperator(
task_id="delete_tag_template_field",
location=LOCATION,
tag_template=TEMPLATE_ID,
field=FIELD_NAME_2,
force=True,
)
您可以使用Jinja 模板,通过 location
, tag_template
, field
, force
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
参数来动态确定值。
搜索资源¶
CloudDataCatalogSearchCatalogOperator
操作符在 Data Catalog 中搜索与查询匹配的多个资源,例如条目、标签。
query
参数应使用 搜索语法定义。
search_catalog = CloudDataCatalogSearchCatalogOperator(
task_id="search_catalog",
scope={"include_project_ids": [PROJECT_ID]},
query=f"name:{ENTRY_GROUP_NAME}",
)
您可以使用Jinja 模板,通过 scope
, query
, page_size
, order_by
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
参数来动态确定值。
结果将保存到 XCom,从而允许其他运算符使用它。
search_catalog_result = BashOperator(
task_id="search_catalog_result",
bash_command=f"echo {search_catalog.output}",
)