Google Cloud Datastore 算子¶
Datastore 模式下的 Firestore 是一个 NoSQL 文档数据库,专为自动扩展、高性能和简化应用程序开发而构建。
有关该服务的详细信息,请访问 Datastore 产品文档
先决任务¶
要使用这些算子,您必须执行以下操作
使用 Cloud 控制台 选择或创建一个 Cloud Platform 项目。
为您的项目启用帐单,如 Google Cloud 文档 中所述。
启用 API,如 Cloud 控制台文档 中所述。
通过 pip 安装 API 库。
pip install 'apache-airflow[google]'有关 安装 的详细信息,请访问。
导出实体¶
要将实体从 Google Cloud Datastore 导出到 Cloud Storage,请使用 CloudDatastoreExportEntitiesOperator
export_task = CloudDatastoreExportEntitiesOperator(
task_id="export_task",
bucket=BUCKET_NAME,
project_id=PROJECT_ID,
overwrite_existing=True,
)
导入实体¶
要将实体从 Cloud Storage 导入 Google Cloud Datastore,请使用 CloudDatastoreImportEntitiesOperator
import_task = CloudDatastoreImportEntitiesOperator(
task_id="import_task",
bucket="{{ task_instance.xcom_pull('export_task')['response']['outputUrl'].split('/')[2] }}",
file="{{ '/'.join(task_instance.xcom_pull('export_task')['response']['outputUrl'].split('/')[3:]) }}",
project_id=PROJECT_ID,
)
分配 ID¶
要为不完整的键分配 ID,请使用 CloudDatastoreAllocateIdsOperator
allocate_ids = CloudDatastoreAllocateIdsOperator(
task_id="allocate_ids", partial_keys=KEYS, project_id=PROJECT_ID
)
操作员所需的局部键示例
KEYS = [
{
"partitionId": {"projectId": PROJECT_ID, "namespaceId": ""},
"path": {"kind": "airflow"},
}
]
开始事务¶
要开始新事务,请使用 CloudDatastoreBeginTransactionOperator
begin_transaction_commit = CloudDatastoreBeginTransactionOperator(
task_id="begin_transaction_commit",
transaction_options=TRANSACTION_OPTIONS,
project_id=PROJECT_ID,
)
操作员所需事务选项示例
TRANSACTION_OPTIONS: dict[str, Any] = {"readWrite": {}}
提交事务¶
要提交事务(可以选择创建、删除或修改一些实体),请使用 CloudDatastoreCommitOperator
commit_task = CloudDatastoreCommitOperator(task_id="commit_task", body=COMMIT_BODY, project_id=PROJECT_ID)
操作员所需的提交信息示例
COMMIT_BODY = {
"mode": "TRANSACTIONAL",
"mutations": [
{
"insert": {
"key": KEYS[0],
"properties": {"string": {"stringValue": "airflow is awesome!"}},
}
}
],
"singleUseTransaction": {"readWrite": {}},
}
运行查询¶
要针对实体运行查询,请使用 CloudDatastoreRunQueryOperator
run_query = CloudDatastoreRunQueryOperator(task_id="run_query", body=QUERY, project_id=PROJECT_ID)
操作员所需的查询示例
QUERY = {
"partitionId": {"projectId": PROJECT_ID, "namespaceId": "query"},
"readOptions": {"transaction": begin_transaction_query.output},
"query": {},
}
回滚事务¶
要回滚事务,请使用 CloudDatastoreRollbackOperator
rollback_transaction = CloudDatastoreRollbackOperator(
task_id="rollback_transaction",
transaction=begin_transaction_to_rollback.output,
)
获取操作状态¶
要获取长时间运行的操作的当前状态,请使用 CloudDatastoreGetOperationOperator
get_operation = CloudDatastoreGetOperationOperator(
task_id="get_operation", name="{{ task_instance.xcom_pull('export_task')['name'] }}"
)
删除操作¶
要删除操作,请使用 CloudDatastoreDeleteOperationOperator
delete_export_operation = CloudDatastoreDeleteOperationOperator(
task_id="delete_export_operation",
name="{{ task_instance.xcom_pull('export_task')['name'] }}",
trigger_rule=TriggerRule.ALL_DONE,
)