Google Cloud Spanner 算子¶
先决任务¶
要使用这些算子,您必须执行以下操作
使用Cloud 控制台选择或创建 Cloud Platform 项目。
为您的项目启用帐单,如Google Cloud 文档中所述。
启用 API,如Cloud 控制台文档中所述。
通过pip安装 API 库。
pip install 'apache-airflow[google]'安装提供了详细信息。
SpannerDeployInstanceOperator¶
创建新的 Cloud Spanner 实例,或者如果指定项目中存在具有相同实例 ID 的实例,则更新 Cloud Spanner 实例。
有关参数定义,请参阅 SpannerDeployInstanceOperator
。
使用运算符¶
您可以使用或不使用项目 ID 来创建运算符。如果缺少项目 ID,它将从所用的 Google Cloud 连接中检索。两种变体都已显示
spanner_instance_create_task = SpannerDeployInstanceOperator(
project_id=PROJECT_ID,
instance_id=GCP_SPANNER_INSTANCE_ID,
configuration_name=GCP_SPANNER_CONFIG_NAME,
node_count=GCP_SPANNER_NODE_COUNT,
display_name=GCP_SPANNER_DISPLAY_NAME,
task_id="spanner_instance_create_task",
)
spanner_instance_update_task = SpannerDeployInstanceOperator(
instance_id=GCP_SPANNER_INSTANCE_ID,
configuration_name=GCP_SPANNER_CONFIG_NAME,
node_count=GCP_SPANNER_NODE_COUNT + 1,
display_name=GCP_SPANNER_DISPLAY_NAME + "_updated",
task_id="spanner_instance_update_task",
)
模板化¶
template_fields: Sequence[str] = (
"project_id",
"instance_id",
"configuration_name",
"display_name",
"gcp_conn_id",
"impersonation_chain",
)
SpannerDeleteDatabaseInstanceOperator¶
从指定的 Cloud Spanner 实例中删除数据库。如果数据库不存在,则不执行任何操作,并且运算符会成功。
有关参数定义,请参阅 SpannerDeleteDatabaseInstanceOperator
。
使用运算符¶
您可以使用或不使用项目 ID 来创建运算符。如果缺少项目 ID,它将从所用的 Google Cloud 连接中检索。两种变体都已显示
spanner_database_delete_task = SpannerDeleteDatabaseInstanceOperator(
instance_id=GCP_SPANNER_INSTANCE_ID,
database_id=GCP_SPANNER_DATABASE_ID,
task_id="spanner_database_delete_task",
)
模板化¶
template_fields: Sequence[str] = (
"project_id",
"instance_id",
"gcp_conn_id",
"impersonation_chain",
)
更多信息¶
SpannerDeployDatabaseInstanceOperator¶
在指定的实例中创建新的 Cloud Spanner 数据库,或者如果所需数据库存在,则假定成功,且不对数据库配置应用任何更改。不会验证数据库的任何结构 - 如果数据库以相同名称存在,则已足够。
有关参数定义,请参阅 SpannerDeployDatabaseInstanceOperator
。
使用操作符¶
您可以使用或不使用项目 ID 来创建运算符。如果缺少项目 ID,它将从所用的 Google Cloud 连接中检索。两种变体都已显示
spanner_database_deploy_task = SpannerDeployDatabaseInstanceOperator(
instance_id=GCP_SPANNER_INSTANCE_ID,
database_id=GCP_SPANNER_DATABASE_ID,
ddl_statements=[
"CREATE TABLE my_table1 (id INT64, name STRING(MAX)) PRIMARY KEY (id)",
"CREATE TABLE my_table2 (id INT64, name STRING(MAX)) PRIMARY KEY (id)",
],
task_id="spanner_database_deploy_task",
)
模板化¶
template_fields: Sequence[str] = (
"project_id",
"instance_id",
"database_id",
"ddl_statements",
"gcp_conn_id",
"impersonation_chain",
)
template_ext: Sequence[str] = (".sql",)
template_fields_renderers = {"ddl_statements": "sql"}
SpannerUpdateDatabaseInstanceOperator¶
在 Cloud Spanner 数据库中运行 DDL 查询,允许修改现有数据库的结构。
您可以选择指定一个 operation_id 参数,该参数可简化在 update_database 调用重播(幂等性检查)的情况下确定语句是否已执行。operation_id 在数据库中应是唯一的,并且必须是有效的标识符:[a-z][a-z0-9_]*
。更多信息,请参阅 updateDdl API 文档
有关参数定义,请参阅 SpannerUpdateDatabaseInstanceOperator
。
使用操作符¶
您可以使用或不使用项目 ID 来创建运算符。如果缺少项目 ID,它将从所用的 Google Cloud 连接中检索。两种变体都已显示
spanner_database_update_task = SpannerUpdateDatabaseInstanceOperator(
instance_id=GCP_SPANNER_INSTANCE_ID,
database_id=GCP_SPANNER_DATABASE_ID,
ddl_statements=[
"CREATE TABLE my_table3 (id INT64, name STRING(MAX)) PRIMARY KEY (id)",
],
task_id="spanner_database_update_task",
)
spanner_database_update_idempotent1_task = SpannerUpdateDatabaseInstanceOperator(
project_id=PROJECT_ID,
instance_id=GCP_SPANNER_INSTANCE_ID,
database_id=GCP_SPANNER_DATABASE_ID,
operation_id=OPERATION_ID,
ddl_statements=[
"CREATE TABLE my_table_unique (id INT64, name STRING(MAX)) PRIMARY KEY (id)",
],
task_id="spanner_database_update_idempotent1_task",
)
spanner_database_update_idempotent2_task = SpannerUpdateDatabaseInstanceOperator(
instance_id=GCP_SPANNER_INSTANCE_ID,
database_id=GCP_SPANNER_DATABASE_ID,
operation_id=OPERATION_ID,
ddl_statements=[
"CREATE TABLE my_table_unique (id INT64, name STRING(MAX)) PRIMARY KEY (id)",
],
task_id="spanner_database_update_idempotent2_task",
)
模板化¶
template_fields: Sequence[str] = (
"project_id",
"instance_id",
"database_id",
"ddl_statements",
"gcp_conn_id",
"impersonation_chain",
)
template_ext: Sequence[str] = (".sql",)
template_fields_renderers = {"ddl_statements": "sql"}
更多信息¶
有关 数据库 update_ddl,请参阅 Google Cloud Spanner API 文档。
SpannerQueryDatabaseInstanceOperator¶
执行任意 DML 查询(INSERT、UPDATE、DELETE)。
有关参数定义,请参阅 SpannerQueryDatabaseInstanceOperator
。
使用运算符¶
您可以使用或不使用项目 ID 来创建运算符。如果缺少项目 ID,它将从所用的 Google Cloud 连接中检索。两种变体都已显示
spanner_instance_query_task = SpannerQueryDatabaseInstanceOperator(
instance_id=GCP_SPANNER_INSTANCE_ID,
database_id=GCP_SPANNER_DATABASE_ID,
query=["DELETE FROM my_table2 WHERE true"],
task_id="spanner_instance_query_task",
)
模板化¶
template_fields: Sequence[str] = (
"project_id",
"instance_id",
"database_id",
"query",
"gcp_conn_id",
"impersonation_chain",
)
template_ext: Sequence[str] = (".sql",)
template_fields_renderers = {"query": "sql"}
SpannerDeleteInstanceOperator¶
删除 Cloud Spanner 实例。如果实例不存在,则不执行任何操作,并且运算符成功。
有关参数定义,请参阅 SpannerDeleteInstanceOperator
。
使用运算符¶
您可以使用或不使用项目 ID 来创建运算符。如果缺少项目 ID,它将从所用的 Google Cloud 连接中检索。两种变体都已显示
spanner_instance_delete_task = SpannerDeleteInstanceOperator(
instance_id=GCP_SPANNER_INSTANCE_ID, task_id="spanner_instance_delete_task"
)
模板化¶
template_fields: Sequence[str] = (
"project_id",
"instance_id",
"gcp_conn_id",
"impersonation_chain",
)