Google Cloud Vision 操作符¶
先决条件任务¶
要使用这些操作符,您必须执行以下几项操作
使用 Cloud Console 选择或创建一个 Cloud Platform 项目。
按照 Google Cloud 文档 中的说明,为您的项目启用结算。
按照 Cloud Console 文档 中的说明,启用 API。
通过 pip 安装 API 库。
pip install 'apache-airflow[google]'安装提供了详细信息。
CloudVisionAddProductToProductSetOperator¶
创建一个新的 ReferenceImage
资源。
有关参数定义,请查看 CloudVisionAddProductToProductSetOperator
使用操作符¶
我们正在使用 Google 库中的 Product
、ProductSet
和 Retry
对象
from google.api_core.retry import Retry # isort:skip
from google.cloud.vision_v1.types import ProductSet # isort:skip
from google.cloud.vision_v1.types import Product # isort:skip
如果 product_set_id
和 product_id
是由 API 生成的,则可以从 XCOM 中提取
add_product_to_product_set = CloudVisionAddProductToProductSetOperator(
location=LOCATION,
product_set_id=product_set_create_output,
product_id="{{ task_instance.xcom_pull('product_create') }}",
retry=Retry(maximum=10.0),
timeout=5,
task_id="add_product_to_product_set",
)
否则,可以显式指定
add_product_to_product_set_2 = CloudVisionAddProductToProductSetOperator(
location=LOCATION,
product_set_id=GCP_VISION_PRODUCT_SET_ID,
product_id=GCP_VISION_PRODUCT_ID,
retry=Retry(maximum=10.0),
timeout=5,
task_id="add_product_to_product_set_2",
)
模板化¶
template_fields: Sequence[str] = (
"location",
"product_set_id",
"product_id",
"project_id",
"gcp_conn_id",
"impersonation_chain",
)
更多信息¶
CloudVisionImageAnnotateOperator¶
运行图像的图像检测和注释。
有关参数定义,请查看 CloudVisionImageAnnotateOperator
使用操作符¶
我们正在使用 Google 库中的 enums
和 Retry
对象
from google.api_core.retry import Retry # isort:skip
from google.cloud.vision_v1 import Feature # isort:skip
from providers.tests.system.google import DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID
annotate_image = CloudVisionImageAnnotateOperator(
request=annotate_image_request,
retry=Retry(maximum=10.0),
timeout=5,
task_id="annotate_image",
)
结果可以从 XCOM 中提取
annotate_image_result = BashOperator(
bash_command="echo {{ task_instance.xcom_pull('annotate_image')"
"['logoAnnotations'][0]['description'] }}",
task_id="annotate_image_result",
)
模板化¶
template_fields: Sequence[str] = (
"request",
"gcp_conn_id",
"impersonation_chain",
)
更多信息¶
CloudVisionCreateProductOperator¶
创建并返回一个新的产品资源。
关于提供的 Product
对象的可能错误
如果
display_name
缺失或长度超过 4096 个字符,则返回 INVALID_ARGUMENT。如果
description
长度超过 4096 个字符,则返回 INVALID_ARGUMENT。如果
product_category
缺失或无效,则返回 INVALID_ARGUMENT。
有关参数定义,请查看 CloudVisionCreateProductOperator
使用操作符¶
我们正在使用 Google 库中的 Product
和 Retry
对象
from google.cloud.vision_v1.types import Product # isort:skip
from google.api_core.retry import Retry # isort:skip
product = Product(display_name="My Product 1", product_category="toys")
可以省略 product_id
参数(它将由 API 生成)
product_create = CloudVisionCreateProductOperator(
location=LOCATION,
product=product,
retry=Retry(maximum=10.0),
timeout=5,
task_id="product_create",
)
或者可以显式指定
product_create_2 = CloudVisionCreateProductOperator(
product_id=GCP_VISION_PRODUCT_ID,
location=LOCATION,
product=product,
retry=Retry(maximum=10.0),
timeout=5,
task_id="product_create_2",
)
模板化¶
template_fields: Sequence[str] = (
"location",
"project_id",
"product_id",
"gcp_conn_id",
"impersonation_chain",
)
更多信息¶
CloudVisionDeleteProductOperator¶
永久删除产品及其参考图像。
产品及其所有图像的元数据将立即删除,但针对包含该产品的 ProductSets
的搜索查询在所有相关缓存刷新之前可能仍然有效。
可能出现的错误
如果产品不存在,则返回 NOT_FOUND。
有关参数定义,请查看 CloudVisionDeleteProductOperator
使用操作符¶
如果 product_id
是由 API 生成的,则可以从 XCOM 中提取
product_delete = CloudVisionDeleteProductOperator(
location=LOCATION,
product_id="{{ task_instance.xcom_pull('product_create') }}",
task_id="product_delete",
)
否则,可以显式指定
product_delete_2 = CloudVisionDeleteProductOperator(
location=LOCATION, product_id=GCP_VISION_PRODUCT_ID, task_id="product_delete_2"
)
模板化¶
template_fields: Sequence[str] = (
"location",
"project_id",
"product_id",
"gcp_conn_id",
"impersonation_chain",
)
更多信息¶
CloudVisionGetProductOperator¶
获取与 Product
关联的信息。
可能出现的错误
如果
Product
不存在,则返回 NOT_FOUND。
有关参数定义,请查看 CloudVisionGetProductOperator
使用操作符¶
如果 product_id
是由 API 生成的,则可以从 XCOM 中提取
product_get = CloudVisionGetProductOperator(
location=LOCATION,
product_id="{{ task_instance.xcom_pull('product_create') }}",
task_id="product_get",
)
否则,可以显式指定
product_get_2 = CloudVisionGetProductOperator(
location=LOCATION, product_id=GCP_VISION_PRODUCT_ID, task_id="product_get_2"
)
模板化¶
template_fields: Sequence[str] = (
"location",
"project_id",
"product_id",
"gcp_conn_id",
"impersonation_chain",
)
更多信息¶
CloudVisionProductSetCreateOperator¶
创建一个新的 ProductSet
资源。
有关参数定义,请查看 CloudVisionCreateProductSetOperator
使用操作符¶
我们正在使用 Google 库中的 ProductSet
和 Retry
对象
from google.cloud.vision_v1.types import ProductSet # isort:skip
from google.api_core.retry import Retry # isort:skip
product_set = ProductSet(display_name="My Product Set")
可以省略 product_set_id
参数(它将由 API 生成)
product_set_create = CloudVisionCreateProductSetOperator(
location=LOCATION,
product_set=product_set,
retry=Retry(maximum=10.0),
timeout=5,
task_id="product_set_create",
)
或者可以显式指定
product_set_create_2 = CloudVisionCreateProductSetOperator(
product_set_id=GCP_VISION_PRODUCT_SET_ID,
location=LOCATION,
product_set=product_set,
retry=Retry(maximum=10.0),
timeout=5,
task_id="product_set_create_2",
)
模板化¶
template_fields: Sequence[str] = (
"location",
"project_id",
"product_set_id",
"gcp_conn_id",
"impersonation_chain",
)
更多信息¶
CloudVisionDeleteProductSetOperator¶
永久删除一个 ProductSet
。ProductSet
中的 Products
和 ReferenceImages
不会被删除。实际的图像文件不会从 Google Cloud Storage 中删除。
有关参数定义,请查看 CloudVisionDeleteProductSetOperator
使用该运算符¶
如果 product_set_id
是由 API 生成的,则可以从 XCOM 中提取。
product_set_delete = CloudVisionDeleteProductSetOperator(
location=LOCATION,
product_set_id=product_set_create_output,
task_id="product_set_delete",
)
否则,可以显式指定
product_set_delete_2 = CloudVisionDeleteProductSetOperator(
location=LOCATION, product_set_id=GCP_VISION_PRODUCT_SET_ID, task_id="product_set_delete_2"
)
模板化¶
template_fields: Sequence[str] = (
"location",
"project_id",
"product_set_id",
"gcp_conn_id",
"impersonation_chain",
)
更多信息¶
CloudVisionGetProductSetOperator¶
获取与 ProductSet
关联的信息。
有关参数定义,请查看 CloudVisionGetProductSetOperator
使用该运算符¶
如果 product_set_id
是由 API 生成的,则可以从 XCOM 中提取。
product_set_get = CloudVisionGetProductSetOperator(
location=LOCATION,
product_set_id=product_set_create_output,
task_id="product_set_get",
)
否则,可以显式指定
product_set_get_2 = CloudVisionGetProductSetOperator(
location=LOCATION, product_set_id=GCP_VISION_PRODUCT_SET_ID, task_id="product_set_get_2"
)
模板化¶
template_fields: Sequence[str] = (
"location",
"project_id",
"product_set_id",
"gcp_conn_id",
"impersonation_chain",
)
更多信息¶
CloudVisionUpdateProductSetOperator¶
更改 ProductSet
资源。目前只能更新 display_name
。
注意
要定位 ProductSet
资源,其 name
必须采用 projects/PROJECT_ID/locations/LOC_ID/productSets/PRODUCT_SET_ID
形式。
您可以直接将 name
作为 product_set
对象的属性提供。但是,您可以将其留空,并提供 location
和 product_set_id
(以及可选的 project_id
- 如果未提供,则将使用连接默认值),并且 name
将由运算符本身创建。
此机制的存在是为了方便您,允许将 project_id
留空,并让 Airflow 使用连接默认的 project_id
。
有关参数定义,请查看 CloudVisionUpdateProductSetOperator
使用该运算符¶
我们正在使用 Google Cloud Vision 库中的 ProductSet
对象
from google.cloud.vision_v1.types import ProductSet # isort:skip
product_set = ProductSet(display_name="My Product Set")
任务初始化
如果 product_set_id
是由 API 生成的,则可以从 XCOM 中提取。
product_set_update = CloudVisionUpdateProductSetOperator(
location=LOCATION,
product_set_id=product_set_create_output,
product_set=ProductSet(display_name="My Product Set 2"),
task_id="product_set_update",
)
否则,可以显式指定
product_set_update_2 = CloudVisionUpdateProductSetOperator(
location=LOCATION,
product_set_id=GCP_VISION_PRODUCT_SET_ID,
product_set=ProductSet(display_name="My Product Set 2"),
task_id="product_set_update_2",
)
模板化¶
template_fields: Sequence[str] = (
"location",
"project_id",
"product_set_id",
"gcp_conn_id",
"impersonation_chain",
)
更多信息¶
CloudVisionUpdateProductOperator¶
更改 Product
资源。目前只能更新 display_name
、description
和 labels
字段。如果更新了标签,则更改不会反映在查询中,直到下次索引时。
注意
要定位 Product
资源,其 name
必须采用 projects/PROJECT_ID/locations/LOC_ID/products/PRODUCT_ID
形式。
您可以直接将 name
作为 product
对象的属性提供。但是,您可以将其留空,并提供 location
和 product_id
(以及可选的 project_id
- 如果未提供,则将使用连接默认值),并且 name
将由运算符本身创建。
此机制的存在是为了方便您,允许将 project_id
留空,并让 Airflow 使用连接默认的 project_id
。
可能出现的错误
如果
Product
不存在,则返回 NOT_FOUND。如果
display_name
存在于update_mask
中,但请求中缺少或长度超过 4096 个字符,则返回 INVALID_ARGUMENT。如果
description
存在于update_mask
中,但长度超过 4096 个字符,则返回 INVALID_ARGUMENT。如果
product_category
存在于update_mask
中,则返回 INVALID_ARGUMENT。
有关参数定义,请查看 CloudVisionUpdateProductOperator
使用该运算符¶
我们正在使用 Google Cloud Vision 库中的 Product
对象
from google.cloud.vision_v1.types import Product # isort:skip
product = Product(display_name="My Product 1", product_category="toys")
如果 product_id
是由 API 生成的,则可以从 XCOM 中提取
product_update = CloudVisionUpdateProductOperator(
location=LOCATION,
product_id="{{ task_instance.xcom_pull('product_create') }}",
product=Product(display_name="My Product 2", description="My updated description"),
task_id="product_update",
)
否则,可以显式指定
product_update_2 = CloudVisionUpdateProductOperator(
location=LOCATION,
product_id=GCP_VISION_PRODUCT_ID,
product=Product(display_name="My Product 2", description="My updated description"),
task_id="product_update_2",
)
模板化¶
template_fields: Sequence[str] = (
"location",
"project_id",
"product_id",
"gcp_conn_id",
"impersonation_chain",
)
更多信息¶
CloudVisionCreateReferenceImageOperator¶
创建一个新的 ReferenceImage
资源。
有关参数定义,请查看 CloudVisionCreateReferenceImageOperator
使用该运算符¶
我们正在使用 Google 库中的 ReferenceImage
和 Retry
对象
from google.cloud.vision_v1.types import ReferenceImage # isort:skip
from google.api_core.retry import Retry # isort:skip
reference_image = ReferenceImage(uri=VISION_IMAGE_URL)
可以省略 product_set_id
参数(它将由 API 生成)
reference_image_create = CloudVisionCreateReferenceImageOperator(
location=LOCATION,
reference_image=reference_image,
product_id="{{ task_instance.xcom_pull('product_create') }}",
reference_image_id=GCP_VISION_REFERENCE_IMAGE_ID,
retry=Retry(maximum=10.0),
timeout=5,
task_id="reference_image_create",
)
或者可以显式指定
reference_image_create_2 = CloudVisionCreateReferenceImageOperator(
location=LOCATION,
reference_image=reference_image,
product_id=GCP_VISION_PRODUCT_ID,
reference_image_id=GCP_VISION_REFERENCE_IMAGE_ID,
retry=Retry(maximum=10.0),
timeout=5,
task_id="reference_image_create_2",
)
模板化¶
template_fields: Sequence[str] = (
"location",
"reference_image",
"product_id",
"reference_image_id",
"project_id",
"gcp_conn_id",
"impersonation_chain",
)
更多信息¶
CloudVisionDeleteReferenceImageOperator¶
删除一个 ReferenceImage
资源。
有关参数定义,请查看 CloudVisionDeleteReferenceImageOperator
使用该运算符¶
我们正在使用 Google 库中的 ReferenceImage
和 Retry
对象
from google.cloud.vision_v1.types import ReferenceImage # isort:skip
from google.api_core.retry import Retry # isort:skip
reference_image = ReferenceImage(uri=VISION_IMAGE_URL)
可以省略 product_set_id
参数(它将由 API 生成)
reference_image_delete = CloudVisionDeleteReferenceImageOperator(
location=LOCATION,
product_id="{{ task_instance.xcom_pull('product_create') }}",
reference_image_id=GCP_VISION_REFERENCE_IMAGE_ID,
retry=Retry(maximum=10.0),
timeout=5,
task_id="reference_image_delete",
)
或者可以显式指定
reference_image_delete_2 = CloudVisionDeleteReferenceImageOperator(
location=LOCATION,
reference_image_id=GCP_VISION_REFERENCE_IMAGE_ID,
product_id=GCP_VISION_PRODUCT_ID,
retry=Retry(maximum=10.0),
timeout=5,
task_id="reference_image_delete_2",
)
模板化¶
template_fields: Sequence[str] = (
"location",
"reference_image",
"product_id",
"reference_image_id",
"project_id",
"gcp_conn_id",
"impersonation_chain",
)
更多信息¶
CloudVisionRemoveProductFromProductSetOperator¶
创建一个新的 ReferenceImage
资源。
有关参数定义,请查看 CloudVisionRemoveProductFromProductSetOperator
使用该运算符¶
我们正在使用 Google 库中的 Product
、ProductSet
和 Retry
对象
from google.api_core.retry import Retry # isort:skip
from google.cloud.vision_v1.types import ProductSet # isort:skip
from google.cloud.vision_v1.types import Product # isort:skip
如果 product_set_id
和 product_id
是由 API 生成的,则可以从 XCOM 中提取
remove_product_from_product_set = CloudVisionRemoveProductFromProductSetOperator(
location=LOCATION,
product_set_id=product_set_create_output,
product_id="{{ task_instance.xcom_pull('product_create') }}",
retry=Retry(maximum=10.0),
timeout=5,
task_id="remove_product_from_product_set",
)
否则,可以显式指定
remove_product_from_product_set_2 = CloudVisionRemoveProductFromProductSetOperator(
location=LOCATION,
product_set_id=GCP_VISION_PRODUCT_SET_ID,
product_id=GCP_VISION_PRODUCT_ID,
retry=Retry(maximum=10.0),
timeout=5,
task_id="remove_product_from_product_set_2",
)
模板化¶
template_fields: Sequence[str] = (
"location",
"product_set_id",
"product_id",
"project_id",
"gcp_conn_id",
"impersonation_chain",
)
更多信息¶
使用该运算符¶
我们正在使用 Google 库中的 Retry
对象
from google.api_core.retry import Retry # isort:skip
detect_text = CloudVisionDetectTextOperator(
image=DETECT_IMAGE,
retry=Retry(maximum=10.0),
timeout=5,
task_id="detect_text",
language_hints="en",
web_detection_params={"include_geo_results": True},
)
结果可以从 XCOM 中提取
detect_text_result = BashOperator(
bash_command="echo {{ task_instance.xcom_pull('detect_text')['textAnnotations'][0] }}",
task_id="detect_text_result",
)
模板化¶
template_fields: Sequence[str] = (
"image",
"max_results",
"timeout",
"gcp_conn_id",
"impersonation_chain",
)
更多信息¶
使用该运算符¶
我们正在使用 Google 库中的 Retry
对象
from google.api_core.retry import Retry # isort:skip
document_detect_text = CloudVisionTextDetectOperator(
image=DETECT_IMAGE, retry=Retry(maximum=10.0), timeout=5, task_id="document_detect_text"
)
结果可以从 XCOM 中提取
document_detect_text_result = BashOperator(
bash_command="echo {{ task_instance.xcom_pull('document_detect_text')['textAnnotations'][0] }}",
task_id="document_detect_text_result",
)
模板化¶
template_fields: Sequence[str] = (
"image",
"max_results",
"timeout",
"gcp_conn_id",
"impersonation_chain",
) # Iterable[str]
更多信息¶
使用操作符¶
我们正在使用 Google 库中的 Retry
对象
from google.api_core.retry import Retry # isort:skip
detect_labels = CloudVisionDetectImageLabelsOperator(
image=DETECT_IMAGE, retry=Retry(maximum=10.0), timeout=5, task_id="detect_labels"
)
结果可以从 XCOM 中提取
detect_labels_result = BashOperator(
bash_command="echo {{ task_instance.xcom_pull('detect_labels')['labelAnnotations'][0] }}",
task_id="detect_labels_result",
)
模板化¶
template_fields: Sequence[str] = (
"image",
"max_results",
"timeout",
"gcp_conn_id",
"impersonation_chain",
)
更多信息¶
请参阅 Google Cloud Vision 标签检测文档。
CloudVisionDetectImageSafeSearchOperator¶
运行图像标签检测。
有关参数定义,请查看 CloudVisionDetectImageSafeSearchOperator
使用操作符¶
我们正在使用 Google 库中的 Retry
对象
from google.api_core.retry import Retry # isort:skip
detect_safe_search = CloudVisionDetectImageSafeSearchOperator(
image=DETECT_IMAGE, retry=Retry(maximum=10.0), timeout=5, task_id="detect_safe_search"
)
结果可以从 XCOM 中提取
detect_safe_search_result = BashOperator(
bash_command=f"echo {detect_safe_search.output}",
task_id="detect_safe_search_result",
)
模板化¶
template_fields: Sequence[str] = (
"image",
"max_results",
"timeout",
"gcp_conn_id",
"impersonation_chain",
)