Google Cloud Spanner 运算符¶
先决条件任务¶
要使用这些运算符,您必须执行以下几项操作
使用 Cloud Console 选择或创建一个 Cloud Platform 项目。
按照 Google Cloud 文档中的说明,为您的项目启用结算。
按照 Cloud Console 文档中的说明,启用 API。
通过 pip 安装 API 库。
pip install 'apache-airflow[google]'安装 提供了详细信息。
SpannerDeployInstanceOperator¶
创建一个新的 Cloud Spanner 实例,或者如果指定的项目中存在具有相同实例 ID 的实例,则更新该 Cloud Spanner 实例。
有关参数定义,请查看 SpannerDeployInstanceOperator
。
使用运算符¶
您可以使用或不使用项目 ID 创建运算符。 如果缺少项目 ID,将从使用的 Google Cloud 连接中检索。 两种变体都显示出来
spanner_instance_create_task = SpannerDeployInstanceOperator(
project_id=PROJECT_ID,
instance_id=GCP_SPANNER_INSTANCE_ID,
configuration_name=GCP_SPANNER_CONFIG_NAME,
node_count=GCP_SPANNER_NODE_COUNT,
display_name=GCP_SPANNER_DISPLAY_NAME,
task_id="spanner_instance_create_task",
)
spanner_instance_update_task = SpannerDeployInstanceOperator(
instance_id=GCP_SPANNER_INSTANCE_ID,
configuration_name=GCP_SPANNER_CONFIG_NAME,
node_count=GCP_SPANNER_NODE_COUNT + 1,
display_name=GCP_SPANNER_DISPLAY_NAME + "_updated",
task_id="spanner_instance_update_task",
)
模板化¶
template_fields: Sequence[str] = (
"project_id",
"instance_id",
"configuration_name",
"display_name",
"gcp_conn_id",
"impersonation_chain",
)
SpannerDeleteDatabaseInstanceOperator¶
从指定的 Cloud Spanner 实例中删除数据库。如果数据库不存在,则不执行任何操作,并且运算符成功。
有关参数定义,请查看 SpannerDeleteDatabaseInstanceOperator
。
使用运算符¶
您可以使用或不使用项目 ID 创建运算符。 如果缺少项目 ID,将从使用的 Google Cloud 连接中检索。 两种变体都显示出来
spanner_database_delete_task = SpannerDeleteDatabaseInstanceOperator(
instance_id=GCP_SPANNER_INSTANCE_ID,
database_id=GCP_SPANNER_DATABASE_ID,
task_id="spanner_database_delete_task",
)
模板化¶
template_fields: Sequence[str] = (
"project_id",
"instance_id",
"gcp_conn_id",
"impersonation_chain",
)
更多信息¶
SpannerDeployDatabaseInstanceOperator¶
在指定的实例中创建一个新的 Cloud Spanner 数据库,或者如果所需的数据库存在,则假定成功,并且不对数据库配置应用任何更改。不验证数据库的结构 - 如果数据库存在且名称相同,则足够。
有关参数定义,请查看 SpannerDeployDatabaseInstanceOperator
。
使用运算符¶
您可以使用或不使用项目 ID 创建运算符。 如果缺少项目 ID,将从使用的 Google Cloud 连接中检索。 两种变体都显示出来
spanner_database_deploy_task = SpannerDeployDatabaseInstanceOperator(
instance_id=GCP_SPANNER_INSTANCE_ID,
database_id=GCP_SPANNER_DATABASE_ID,
ddl_statements=[
"CREATE TABLE my_table1 (id INT64, name STRING(MAX)) PRIMARY KEY (id)",
"CREATE TABLE my_table2 (id INT64, name STRING(MAX)) PRIMARY KEY (id)",
],
task_id="spanner_database_deploy_task",
)
模板化¶
template_fields: Sequence[str] = (
"project_id",
"instance_id",
"database_id",
"ddl_statements",
"gcp_conn_id",
"impersonation_chain",
)
template_ext: Sequence[str] = (".sql",)
template_fields_renderers = {"ddl_statements": "sql"}
SpannerUpdateDatabaseInstanceOperator¶
在 Cloud Spanner 数据库中运行 DDL 查询,并允许您修改现有数据库的结构。
您可以选择指定一个 operation_id 参数,这简化了在重新执行 update_database 调用时确定语句是否已执行的过程(幂等性检查)。operation_id 在数据库中应该是唯一的,并且必须是有效的标识符:[a-z][a-z0-9_]*
。有关更多信息,请参阅 updateDdl API 的文档
有关参数定义,请查看 SpannerUpdateDatabaseInstanceOperator
。
使用运算符¶
您可以使用或不使用项目 ID 创建运算符。 如果缺少项目 ID,将从使用的 Google Cloud 连接中检索。 两种变体都显示出来
spanner_database_update_task = SpannerUpdateDatabaseInstanceOperator(
instance_id=GCP_SPANNER_INSTANCE_ID,
database_id=GCP_SPANNER_DATABASE_ID,
ddl_statements=[
"CREATE TABLE my_table3 (id INT64, name STRING(MAX)) PRIMARY KEY (id)",
],
task_id="spanner_database_update_task",
)
spanner_database_update_idempotent1_task = SpannerUpdateDatabaseInstanceOperator(
project_id=PROJECT_ID,
instance_id=GCP_SPANNER_INSTANCE_ID,
database_id=GCP_SPANNER_DATABASE_ID,
operation_id=OPERATION_ID,
ddl_statements=[
"CREATE TABLE my_table_unique (id INT64, name STRING(MAX)) PRIMARY KEY (id)",
],
task_id="spanner_database_update_idempotent1_task",
)
spanner_database_update_idempotent2_task = SpannerUpdateDatabaseInstanceOperator(
instance_id=GCP_SPANNER_INSTANCE_ID,
database_id=GCP_SPANNER_DATABASE_ID,
operation_id=OPERATION_ID,
ddl_statements=[
"CREATE TABLE my_table_unique (id INT64, name STRING(MAX)) PRIMARY KEY (id)",
],
task_id="spanner_database_update_idempotent2_task",
)
模板化¶
template_fields: Sequence[str] = (
"project_id",
"instance_id",
"database_id",
"ddl_statements",
"gcp_conn_id",
"impersonation_chain",
)
template_ext: Sequence[str] = (".sql",)
template_fields_renderers = {"ddl_statements": "sql"}
更多信息¶
请参阅 Google Cloud Spanner API 文档,了解 数据库 update_ddl。
SpannerQueryDatabaseInstanceOperator¶
执行任意 DML 查询 (INSERT、UPDATE、DELETE)。
有关参数定义,请查看 SpannerQueryDatabaseInstanceOperator
。
使用运算符¶
您可以使用或不使用项目 ID 创建运算符。 如果缺少项目 ID,将从使用的 Google Cloud 连接中检索。 两种变体都显示出来
spanner_instance_query_task = SpannerQueryDatabaseInstanceOperator(
instance_id=GCP_SPANNER_INSTANCE_ID,
database_id=GCP_SPANNER_DATABASE_ID,
query=["DELETE FROM my_table2 WHERE true"],
task_id="spanner_instance_query_task",
)
模板化¶
template_fields: Sequence[str] = (
"project_id",
"instance_id",
"database_id",
"query",
"gcp_conn_id",
"impersonation_chain",
)
template_ext: Sequence[str] = (".sql",)
template_fields_renderers = {"query": "sql"}
SpannerDeleteInstanceOperator¶
删除 Cloud Spanner 实例。如果实例不存在,则不执行任何操作,并且运算符成功。
有关参数定义,请查看 SpannerDeleteInstanceOperator
。
使用运算符¶
您可以使用或不使用项目 ID 创建运算符。 如果缺少项目 ID,将从使用的 Google Cloud 连接中检索。 两种变体都显示出来
spanner_instance_delete_task = SpannerDeleteInstanceOperator(
instance_id=GCP_SPANNER_INSTANCE_ID, task_id="spanner_instance_delete_task"
)
模板化¶
template_fields: Sequence[str] = (
"project_id",
"instance_id",
"gcp_conn_id",
"impersonation_chain",
)