Google Cloud Datastore 操作符¶
Firestore 在 Datastore 模式下是一个 NoSQL 文档数据库,专为自动扩展、高性能和易于应用程序开发而构建。
有关该服务的更多信息,请访问 Datastore 产品文档
先决条件任务¶
要使用这些操作符,您必须执行以下几项操作
使用 Cloud Console 选择或创建 Cloud Platform 项目。
按照 Google Cloud 文档中的描述,为您的项目启用结算。
按照 Cloud Console 文档中的描述,启用 API。
通过 pip 安装 API 库。
pip install 'apache-airflow[google]'有关详细信息,请参阅 安装。
导出实体¶
要将实体从 Google Cloud Datastore 导出到 Cloud Storage,请使用 CloudDatastoreExportEntitiesOperator
export_task = CloudDatastoreExportEntitiesOperator(
task_id="export_task",
bucket=BUCKET_NAME,
project_id=PROJECT_ID,
overwrite_existing=True,
)
导入实体¶
要将实体从 Cloud Storage 导入到 Google Cloud Datastore,请使用 CloudDatastoreImportEntitiesOperator
import_task = CloudDatastoreImportEntitiesOperator(
task_id="import_task",
bucket="{{ task_instance.xcom_pull('export_task')['response']['outputUrl'].split('/')[2] }}",
file="{{ '/'.join(task_instance.xcom_pull('export_task')['response']['outputUrl'].split('/')[3:]) }}",
project_id=PROJECT_ID,
)
分配 ID¶
要为不完整的键分配 ID,请使用 CloudDatastoreAllocateIdsOperator
allocate_ids = CloudDatastoreAllocateIdsOperator(
task_id="allocate_ids", partial_keys=KEYS, project_id=PROJECT_ID
)
操作符所需的部分键的示例
KEYS = [
{
"partitionId": {"projectId": PROJECT_ID, "namespaceId": ""},
"path": {"kind": "airflow"},
}
]
开始事务¶
要开始新事务,请使用 CloudDatastoreBeginTransactionOperator
begin_transaction_commit = CloudDatastoreBeginTransactionOperator(
task_id="begin_transaction_commit",
transaction_options=TRANSACTION_OPTIONS,
project_id=PROJECT_ID,
)
操作符所需的事务选项的示例
TRANSACTION_OPTIONS: dict[str, Any] = {"readWrite": {}}
提交事务¶
要提交事务,可以选择创建、删除或修改某些实体,请使用 CloudDatastoreCommitOperator
commit_task = CloudDatastoreCommitOperator(task_id="commit_task", body=COMMIT_BODY, project_id=PROJECT_ID)
操作符所需的提交信息的示例
COMMIT_BODY = {
"mode": "TRANSACTIONAL",
"mutations": [
{
"insert": {
"key": KEYS[0],
"properties": {"string": {"stringValue": "airflow is awesome!"}},
}
}
],
"singleUseTransaction": {"readWrite": {}},
}
运行查询¶
要运行实体查询,请使用 CloudDatastoreRunQueryOperator
run_query = CloudDatastoreRunQueryOperator(task_id="run_query", body=QUERY, project_id=PROJECT_ID)
操作符所需的查询示例
QUERY = {
"partitionId": {"projectId": PROJECT_ID, "namespaceId": "query"},
"readOptions": {"transaction": begin_transaction_query.output},
"query": {},
}
回滚事务¶
要回滚事务,请使用 CloudDatastoreRollbackOperator
rollback_transaction = CloudDatastoreRollbackOperator(
task_id="rollback_transaction",
transaction=begin_transaction_to_rollback.output,
)
获取操作状态¶
要获取长时间运行操作的当前状态,请使用 CloudDatastoreGetOperationOperator
get_operation = CloudDatastoreGetOperationOperator(
task_id="get_operation", name="{{ task_instance.xcom_pull('export_task')['name'] }}"
)
删除操作¶
要删除操作,请使用 CloudDatastoreDeleteOperationOperator
delete_export_operation = CloudDatastoreDeleteOperationOperator(
task_id="delete_export_operation",
name="{{ task_instance.xcom_pull('export_task')['name'] }}",
trigger_rule=TriggerRule.ALL_DONE,
)