Google Cloud Datastore 算子

Datastore 模式下的 Firestore 是一个 NoSQL 文档数据库,专为自动扩展、高性能和简化应用程序开发而构建。

有关该服务的详细信息,请访问 Datastore 产品文档

先决任务

要使用这些算子,您必须执行以下操作

导出实体

要将实体从 Google Cloud Datastore 导出到 Cloud Storage,请使用 CloudDatastoreExportEntitiesOperator

tests/system/providers/google/cloud/datastore/example_datastore_commit.py[源代码]

export_task = CloudDatastoreExportEntitiesOperator(
    task_id="export_task",
    bucket=BUCKET_NAME,
    project_id=PROJECT_ID,
    overwrite_existing=True,
)

导入实体

要将实体从 Cloud Storage 导入 Google Cloud Datastore,请使用 CloudDatastoreImportEntitiesOperator

tests/system/providers/google/cloud/datastore/example_datastore_commit.py[源代码]

import_task = CloudDatastoreImportEntitiesOperator(
    task_id="import_task",
    bucket="{{ task_instance.xcom_pull('export_task')['response']['outputUrl'].split('/')[2] }}",
    file="{{ '/'.join(task_instance.xcom_pull('export_task')['response']['outputUrl'].split('/')[3:]) }}",
    project_id=PROJECT_ID,
)

分配 ID

要为不完整的键分配 ID,请使用 CloudDatastoreAllocateIdsOperator

tests/system/providers/google/cloud/datastore/example_datastore_commit.py[源代码]

allocate_ids = CloudDatastoreAllocateIdsOperator(
    task_id="allocate_ids", partial_keys=KEYS, project_id=PROJECT_ID
)

操作员所需的局部键示例

tests/system/providers/google/cloud/datastore/example_datastore_commit.py[源代码]

KEYS = [
    {
        "partitionId": {"projectId": PROJECT_ID, "namespaceId": ""},
        "path": {"kind": "airflow"},
    }
]

开始事务

要开始新事务,请使用 CloudDatastoreBeginTransactionOperator

tests/system/providers/google/cloud/datastore/example_datastore_commit.py[源代码]

begin_transaction_commit = CloudDatastoreBeginTransactionOperator(
    task_id="begin_transaction_commit",
    transaction_options=TRANSACTION_OPTIONS,
    project_id=PROJECT_ID,
)

操作员所需事务选项示例

tests/system/providers/google/cloud/datastore/example_datastore_commit.py[源代码]

TRANSACTION_OPTIONS: dict[str, Any] = {"readWrite": {}}

提交事务

要提交事务(可以选择创建、删除或修改一些实体),请使用 CloudDatastoreCommitOperator

tests/system/providers/google/cloud/datastore/example_datastore_commit.py[源代码]

commit_task = CloudDatastoreCommitOperator(task_id="commit_task", body=COMMIT_BODY, project_id=PROJECT_ID)

操作员所需的提交信息示例

tests/system/providers/google/cloud/datastore/example_datastore_commit.py[源代码]

    COMMIT_BODY = {
        "mode": "TRANSACTIONAL",
        "mutations": [
            {
                "insert": {
                    "key": KEYS[0],
                    "properties": {"string": {"stringValue": "airflow is awesome!"}},
                }
            }
        ],
        "singleUseTransaction": {"readWrite": {}},
    }

运行查询

要针对实体运行查询,请使用 CloudDatastoreRunQueryOperator

tests/system/providers/google/cloud/datastore/example_datastore_query.py[源代码]

run_query = CloudDatastoreRunQueryOperator(task_id="run_query", body=QUERY, project_id=PROJECT_ID)

操作员所需的查询示例

tests/system/providers/google/cloud/datastore/example_datastore_query.py[源代码]

    QUERY = {
        "partitionId": {"projectId": PROJECT_ID, "namespaceId": "query"},
        "readOptions": {"transaction": begin_transaction_query.output},
        "query": {},
    }

回滚事务

要回滚事务,请使用 CloudDatastoreRollbackOperator

tests/system/providers/google/cloud/datastore/example_datastore_rollback.py[源代码]

rollback_transaction = CloudDatastoreRollbackOperator(
    task_id="rollback_transaction",
    transaction=begin_transaction_to_rollback.output,
)

获取操作状态

要获取长时间运行的操作的当前状态,请使用 CloudDatastoreGetOperationOperator

tests/system/providers/google/cloud/datastore/example_datastore_commit.py[源代码]

get_operation = CloudDatastoreGetOperationOperator(
    task_id="get_operation", name="{{ task_instance.xcom_pull('export_task')['name'] }}"
)

删除操作

要删除操作,请使用 CloudDatastoreDeleteOperationOperator

tests/system/providers/google/cloud/datastore/example_datastore_commit.py[源代码]

delete_export_operation = CloudDatastoreDeleteOperationOperator(
    task_id="delete_export_operation",
    name="{{ task_instance.xcom_pull('export_task')['name'] }}",
    trigger_rule=TriggerRule.ALL_DONE,
)

参考资料

如需了解更多信息,请参阅

此条目是否有用?