Google Cloud Bigtable 算子¶
先决条件任务¶
要使用这些算子,您必须执行以下操作:
使用 Cloud Console 选择或创建 Cloud Platform 项目。
为您的项目启用结算,如 Google Cloud 文档所述。
启用 API,如 Cloud Console 文档所述。
通过 pip 安装 API 库。
pip install 'apache-airflow[google]'有关详细信息,请参阅 安装。
BigtableCreateInstanceOperator¶
使用 BigtableCreateInstanceOperator
创建 Google Cloud Bigtable 实例。
如果具有给定 ID 的 Cloud Bigtable 实例存在,则该算子不会比较其配置并立即成功。不会对现有实例进行任何更改。
使用算子¶
您可以在有或没有项目 ID 的情况下创建算子。如果缺少项目 ID,将从使用的 Google Cloud 连接中检索。两种变体都显示如下
create_instance_task = BigtableCreateInstanceOperator(
project_id=PROJECT_ID,
instance_id=CBT_INSTANCE_ID_1,
main_cluster_id=CBT_CLUSTER_ID,
main_cluster_zone=CBT_CLUSTER_ZONE,
instance_display_name=CBT_INSTANCE_DISPLAY_NAME,
instance_type=CBT_INSTANCE_TYPE, # type: ignore[arg-type]
instance_labels=CBT_INSTANCE_LABELS,
cluster_nodes=None,
cluster_storage_type=CBT_CLUSTER_STORAGE_TYPE, # type: ignore[arg-type]
task_id="create_instance_task",
)
BigtableUpdateInstanceOperator¶
使用 BigtableUpdateInstanceOperator
更新现有 Google Cloud Bigtable 实例。
仅可以更新现有实例的以下配置:instance_display_name、instance_type 和 instance_labels。
使用算子¶
您可以在有或没有项目 ID 的情况下创建算子。如果缺少项目 ID,将从使用的 Google Cloud 连接中检索。两种变体都显示如下
update_instance_task = BigtableUpdateInstanceOperator(
instance_id=CBT_INSTANCE_ID_1,
instance_display_name=CBT_INSTANCE_DISPLAY_NAME_UPDATED,
instance_type=CBT_INSTANCE_TYPE_PROD,
instance_labels=CBT_INSTANCE_LABELS_UPDATED,
task_id="update_instance_task",
)
BigtableDeleteInstanceOperator¶
使用 BigtableDeleteInstanceOperator
删除 Google Cloud Bigtable 实例。
使用算子¶
您可以在有或没有项目 ID 的情况下创建算子。如果缺少项目 ID,将从使用的 Google Cloud 连接中检索。两种变体都显示如下
delete_instance_task = BigtableDeleteInstanceOperator(
project_id=PROJECT_ID,
instance_id=CBT_INSTANCE_ID_1,
task_id="delete_instance_task",
)
BigtableUpdateClusterOperator¶
使用 BigtableUpdateClusterOperator
修改 Cloud Bigtable 集群中的节点数。
使用算子¶
您可以在有或没有项目 ID 的情况下创建算子。如果缺少项目 ID,将从使用的 Google Cloud 连接中检索。两种变体都显示如下
cluster_update_task = BigtableUpdateClusterOperator(
project_id=PROJECT_ID,
instance_id=CBT_INSTANCE_ID_1,
cluster_id=CBT_CLUSTER_ID,
nodes=CBT_CLUSTER_NODES_UPDATED,
task_id="update_cluster_task",
)
BigtableCreateTableOperator¶
在 Cloud Bigtable 实例中创建表。
如果具有给定 ID 的表存在于 Cloud Bigtable 实例中,则该算子会比较列族。如果列族相同,则算子成功。否则,算子会失败并显示相应的错误消息。
使用算子¶
您可以在有或没有项目 ID 的情况下创建算子。如果缺少项目 ID,将从使用的 Google Cloud 连接中检索。两种变体都显示如下
create_table_task = BigtableCreateTableOperator(
project_id=PROJECT_ID,
instance_id=CBT_INSTANCE_ID_1,
table_id=CBT_TABLE_ID,
task_id="create_table",
)
BigtableDeleteTableOperator¶
使用 BigtableDeleteTableOperator
删除 Google Cloud Bigtable 中的表。
使用算子¶
您可以在有或没有项目 ID 的情况下创建算子。如果缺少项目 ID,将从使用的 Google Cloud 连接中检索。两种变体都显示如下
delete_table_task = BigtableDeleteTableOperator(
project_id=PROJECT_ID,
instance_id=CBT_INSTANCE_ID_1,
table_id=CBT_TABLE_ID,
task_id="delete_table_task",
)
BigtableTableReplicationCompletedSensor¶
您可以在有或没有项目 ID 的情况下创建算子。如果缺少项目 ID,将从使用的 Google Cloud 连接中检索。两种变体都显示如下
使用 BigtableTableReplicationCompletedSensor
等待表完全复制。
与 BigtableCreateTableOperator 相同的参数适用于此传感器。
注意:如果表或 Cloud Bigtable 实例不存在,则此传感器将等待该表直到超时,并且不会引发任何异常。
使用算子¶
wait_for_table_replication_task = BigtableTableReplicationCompletedSensor(
instance_id=CBT_INSTANCE_ID_2,
table_id=CBT_TABLE_ID,
poke_interval=CBT_POKE_INTERVAL,
timeout=180,
task_id="wait_for_table_replication_task2",
)