Google Dataform 操作符

Dataform 是一项为数据分析师提供的服务,用于在 BigQuery 中开发、测试、版本控制和调度复杂 SQL 工作流程,以进行数据转换。

Dataform 允许您在数据集成过程中的提取、加载和转换 (ELT) 过程中管理数据转换。从源系统提取原始数据并加载到 BigQuery 后,Dataform 可帮助您将其转换为定义良好、经过测试和记录的数据表套件。

有关任务的更多信息,请访问 Dataform 文档

配置

在使用 Dataform 操作符之前,您需要初始化存储库和工作区,有关此方面的更多信息,请访问 Dataform 文档

创建存储库

要在 Dataform 服务中创建用于跟踪代码的存储库,请使用 DataformCreateRepositoryOperator。用法示例如下所示

tests/system/google/cloud/dataform/example_dataform.py[源代码]

    make_repository = DataformCreateRepositoryOperator(
        task_id="make-repository",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
    )

创建工作区

要在 Dataform 服务中创建用于存储代码的工作区,请使用 DataformCreateWorkspaceOperator。用法示例如下所示

tests/system/google/cloud/dataform/example_dataform.py[源代码]

    make_workspace = DataformCreateWorkspaceOperator(
        task_id="make-workspace",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workspace_id=WORKSPACE_ID,
    )

创建编译结果

要创建编译结果,请使用 DataformCreateCompilationResultOperator。一个简单的配置如下所示

tests/system/google/cloud/dataform/example_dataform.py[源代码]

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create-compilation-result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": "main",
            "workspace": (
                f"projects/{PROJECT_ID}/locations/{REGION}/repositories/{REPOSITORY_ID}/"
                f"workspaces/{WORKSPACE_ID}"
            ),
        },
    )

获取编译结果

要获取编译结果,您可以使用 DataformGetCompilationResultOperator

tests/system/google/cloud/dataform/example_dataform.py[源代码]

get_compilation_result = DataformGetCompilationResultOperator(
    task_id="get-compilation-result",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    compilation_result_id=(
        "{{ task_instance.xcom_pull('create-compilation-result')['name'].split('/')[-1] }}"
    ),
)

创建工作流程调用

要创建工作流程调用,您可以使用 DataformCreateWorkflowInvocationOperator

tests/system/google/cloud/dataform/example_dataform.py[源代码]

create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
    task_id="create-workflow-invocation",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation={
        "compilation_result": "{{ task_instance.xcom_pull('create-compilation-result')['name'] }}"
    },
)

我们可以同步模式和异步模式运行此操作,对于异步操作,我们还有一个传感器 DataformWorkflowInvocationStateSensor

tests/system/google/cloud/dataform/example_dataform.py[源代码]

create_workflow_invocation_async = DataformCreateWorkflowInvocationOperator(
    task_id="create-workflow-invocation-async",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    asynchronous=True,
    workflow_invocation={
        "compilation_result": "{{ task_instance.xcom_pull('create-compilation-result')['name'] }}"
    },
)

is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
    task_id="is-workflow-invocation-done",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation_id=(
        "{{ task_instance.xcom_pull('create-workflow-invocation-async')['name'].split('/')[-1] }}"
    ),
    expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)

我们还有一个传感器 DataformWorkflowInvocationActionStateSensor,用于检查异步触发的工作流程调用的特定操作的状态。

tests/system/google/cloud/dataform/example_dataform.py[源代码]

is_workflow_invocation_action_done = DataformWorkflowInvocationActionStateSensor(
    task_id="is-workflow-invocation-action-done",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation_id=(
        "{{ task_instance.xcom_pull('create-workflow-invocation-async')['name'].split('/')[-1] }}"
    ),
    target_name="first_view",
    expected_statuses={WorkflowInvocationAction.State.SUCCEEDED},
    failure_statuses={
        WorkflowInvocationAction.State.SKIPPED,
        WorkflowInvocationAction.State.DISABLED,
        WorkflowInvocationAction.State.CANCELLED,
        WorkflowInvocationAction.State.FAILED,
    },
)

获取工作流程调用

要获取工作流程调用,您可以使用 DataformGetWorkflowInvocationOperator

tests/system/google/cloud/dataform/example_dataform.py[源代码]

get_workflow_invocation = DataformGetWorkflowInvocationOperator(
    task_id="get-workflow-invocation",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation_id=(
        "{{ task_instance.xcom_pull('create-workflow-invocation')['name'].split('/')[-1] }}"
    ),
)

查询工作流程调用操作

要查询工作流程调用操作,您可以使用 DataformQueryWorkflowInvocationActionsOperator

tests/system/google/cloud/dataform/example_dataform.py[源代码]

query_workflow_invocation_actions = DataformQueryWorkflowInvocationActionsOperator(
    task_id="query-workflow-invocation-actions",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation_id=(
        "{{ task_instance.xcom_pull('create-workflow-invocation')['name'].split('/')[-1] }}"
    ),
)

取消工作流程调用

要取消工作流程调用,您可以使用 DataformCancelWorkflowInvocationOperator

tests/system/google/cloud/dataform/example_dataform.py[源代码]

cancel_workflow_invocation = DataformCancelWorkflowInvocationOperator(
    task_id="cancel-workflow-invocation",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation_id=(
        "{{ task_instance.xcom_pull('create-workflow-invocation-for-cancel')['name'].split('/')[-1] }}"
    ),
)

删除存储库

要删除存储库,请使用 DataformDeleteRepositoryOperator。用法示例如下所示

tests/system/google/cloud/dataform/example_dataform.py[源代码]

    delete_repository = DataformDeleteRepositoryOperator(
        task_id="delete-repository",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        trigger_rule=TriggerRule.ALL_DONE,
    )

删除工作区

要删除工作区,请使用 DataformDeleteWorkspaceOperator。用法示例如下所示

tests/system/google/cloud/dataform/example_dataform.py[源代码]

    delete_workspace = DataformDeleteWorkspaceOperator(
        task_id="delete-workspace",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workspace_id=WORKSPACE_ID,
        trigger_rule=TriggerRule.ALL_DONE,
    )

删除文件

要删除文件,请使用 DataformRemoveFileOperator。用法示例如下所示

tests/system/google/cloud/dataform/example_dataform.py[源代码]

    remove_test_file = DataformRemoveFileOperator(
        task_id="remove-test-file",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workspace_id=WORKSPACE_ID,
        filepath="test/test.txt",
    )

删除目录

要删除目录,请使用 DataformRemoveDirectoryOperator。用法示例如下所示

tests/system/google/cloud/dataform/example_dataform.py[源代码]

    remove_test_directory = DataformRemoveDirectoryOperator(
        task_id="remove-test-directory",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workspace_id=WORKSPACE_ID,
        directory_path="test",
    )

初始化工作区

为提供的工作区创建默认的项目结构。在此之前,应创建工作区和存储库。用法示例如下所示

tests/system/google/cloud/dataform/example_dataform.py[源代码]

    first_initialization_step, last_initialization_step = make_initialization_workspace_flow(
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workspace_id=WORKSPACE_ID,
        package_name=f"dataform_package_{ENV_ID}",
        without_installation=True,
        dataform_schema_name=DATAFORM_SCHEMA_NAME,
    )

将文件写入工作区

要将具有给定内容的文件写入指定的工作区,请使用 DataformWriteFileOperator

tests/system/google/cloud/dataform/example_dataform.py[源代码]

test_file_content = b"""
test test for test file
"""
write_test_file = DataformWriteFileOperator(
    task_id="make-test-file",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workspace_id=WORKSPACE_ID,
    filepath="test/test.txt",
    contents=test_file_content,
)

在工作区中创建目录

要在指定的工作区中创建具有给定路径的目录,请使用 DataformMakeDirectoryOperator

tests/system/google/cloud/dataform/example_dataform.py[源代码]

make_test_directory = DataformMakeDirectoryOperator(
    task_id="make-test-directory",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workspace_id=WORKSPACE_ID,
    directory_path="test",
)

安装 NPM 包

要为指定的工作区安装 npm 包,请使用 DataformInstallNpmPackagesOperator

tests/system/google/cloud/dataform/example_dataform.py[源代码]

install_npm_packages = DataformInstallNpmPackagesOperator(
    task_id="install-npm-packages",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workspace_id=WORKSPACE_ID,
)

此条目是否有帮助?