Google Cloud Bigtable 算子¶
先决条件任务¶
要使用这些算子,您必须执行以下操作
使用Cloud 控制台选择或创建 Cloud Platform 项目。
为您的项目启用帐单,如Google Cloud 文档中所述。
启用 API,如Cloud 控制台文档中所述。
通过pip安装 API 库。
pip install 'apache-airflow[google]'有关安装的详细信息,请访问此处。
BigtableCreateInstanceOperator¶
使用BigtableCreateInstanceOperator
创建 Google Cloud Bigtable 实例。
如果具有给定 ID 的 Cloud Bigtable 实例存在,则该操作符不会比较其配置并立即成功。不会对现有实例进行任何更改。
使用操作符¶
您可以使用或不使用项目 ID 创建操作符。如果缺少项目 ID,它将从所使用的 Google Cloud 连接中检索。两种变体均已显示
create_instance_task = BigtableCreateInstanceOperator(
project_id=PROJECT_ID,
instance_id=CBT_INSTANCE_ID,
main_cluster_id=CBT_CLUSTER_ID,
main_cluster_zone=CBT_CLUSTER_ZONE,
instance_display_name=CBT_INSTANCE_DISPLAY_NAME,
instance_type=CBT_INSTANCE_TYPE, # type: ignore[arg-type]
instance_labels=CBT_INSTANCE_LABELS,
cluster_nodes=None,
cluster_storage_type=CBT_CLUSTER_STORAGE_TYPE, # type: ignore[arg-type]
task_id="create_instance_task",
)
create_instance_task2 = BigtableCreateInstanceOperator(
instance_id=CBT_INSTANCE_ID,
main_cluster_id=CBT_CLUSTER_ID,
main_cluster_zone=CBT_CLUSTER_ZONE,
instance_display_name=CBT_INSTANCE_DISPLAY_NAME,
instance_type=CBT_INSTANCE_TYPE, # type: ignore[arg-type]
instance_labels=CBT_INSTANCE_LABELS,
cluster_nodes=CBT_CLUSTER_NODES,
cluster_storage_type=CBT_CLUSTER_STORAGE_TYPE, # type: ignore[arg-type]
task_id="create_instance_task2",
)
BigtableUpdateInstanceOperator¶
使用 BigtableUpdateInstanceOperator
更新现有的 Google Cloud Bigtable 实例。
对于现有实例,只能更新以下配置:instance_display_name、instance_type 和 instance_labels。
使用操作符¶
您可以使用或不使用项目 ID 创建操作符。如果缺少项目 ID,它将从所使用的 Google Cloud 连接中检索。两种变体均已显示
update_instance_task = BigtableUpdateInstanceOperator(
instance_id=CBT_INSTANCE_ID,
instance_display_name=CBT_INSTANCE_DISPLAY_NAME_UPDATED,
instance_type=CBT_INSTANCE_TYPE_PROD,
instance_labels=CBT_INSTANCE_LABELS_UPDATED,
task_id="update_instance_task",
)
BigtableDeleteInstanceOperator¶
使用 BigtableDeleteInstanceOperator
删除 Google Cloud Bigtable 实例。
使用操作符¶
您可以使用或不使用项目 ID 创建操作符。如果缺少项目 ID,它将从所使用的 Google Cloud 连接中检索。两种变体均已显示
delete_instance_task = BigtableDeleteInstanceOperator(
project_id=PROJECT_ID,
instance_id=CBT_INSTANCE_ID,
task_id="delete_instance_task",
)
delete_instance_task2 = BigtableDeleteInstanceOperator(
instance_id=CBT_INSTANCE_ID,
task_id="delete_instance_task2",
)
BigtableUpdateClusterOperator¶
使用 BigtableUpdateClusterOperator
修改 Cloud Bigtable 集群中的节点数。
使用运算符¶
您可以使用或不使用项目 ID 创建操作符。如果缺少项目 ID,它将从所使用的 Google Cloud 连接中检索。两种变体均已显示
cluster_update_task = BigtableUpdateClusterOperator(
project_id=PROJECT_ID,
instance_id=CBT_INSTANCE_ID,
cluster_id=CBT_CLUSTER_ID,
nodes=CBT_CLUSTER_NODES_UPDATED,
task_id="update_cluster_task",
)
cluster_update_task2 = BigtableUpdateClusterOperator(
instance_id=CBT_INSTANCE_ID,
cluster_id=CBT_CLUSTER_ID,
nodes=CBT_CLUSTER_NODES_UPDATED,
task_id="update_cluster_task2",
)
BigtableCreateTableOperator¶
在 Cloud Bigtable 实例中创建表。
如果给定 ID 的表存在于 Cloud Bigtable 实例中,则运算符会比较列族。如果列族相同,则运算符会成功。否则,运算符会失败并显示相应的错误消息。
使用运算符¶
您可以使用或不使用项目 ID 创建操作符。如果缺少项目 ID,它将从所使用的 Google Cloud 连接中检索。两种变体均已显示
create_table_task = BigtableCreateTableOperator(
project_id=PROJECT_ID,
instance_id=CBT_INSTANCE_ID,
table_id=CBT_TABLE_ID,
task_id="create_table",
)
create_table_task2 = BigtableCreateTableOperator(
instance_id=CBT_INSTANCE_ID,
table_id=CBT_TABLE_ID,
task_id="create_table_task2",
)
BigtableDeleteTableOperator¶
使用 BigtableDeleteTableOperator
在 Google Cloud Bigtable 中删除表。
使用运算符¶
您可以使用或不使用项目 ID 创建操作符。如果缺少项目 ID,它将从所使用的 Google Cloud 连接中检索。两种变体均已显示
delete_table_task = BigtableDeleteTableOperator(
project_id=PROJECT_ID,
instance_id=CBT_INSTANCE_ID,
table_id=CBT_TABLE_ID,
task_id="delete_table_task",
)
delete_table_task2 = BigtableDeleteTableOperator(
instance_id=CBT_INSTANCE_ID,
table_id=CBT_TABLE_ID,
task_id="delete_table_task2",
)
BigtableTableReplicationCompletedSensor¶
您可以使用或不使用项目 ID 创建操作符。如果缺少项目 ID,它将从所使用的 Google Cloud 连接中检索。两种变体均已显示
使用 BigtableTableReplicationCompletedSensor
等待表完全复制。
此传感器与 BigtableCreateTableOperator 适用相同的参数。
注意:如果表或 Cloud Bigtable 实例不存在,此传感器会一直等到超时,并且不会引发任何异常。
使用运算符¶
wait_for_table_replication_task = BigtableTableReplicationCompletedSensor(
instance_id=CBT_INSTANCE_ID,
table_id=CBT_TABLE_ID,
poke_interval=CBT_POKE_INTERVAL,
timeout=180,
task_id="wait_for_table_replication_task2",
)