Google Dataform 运算符

Dataform 是一项服务,供数据分析师在 BigQuery 中开发、测试、版本控制和安排复杂 SQL 工作流以进行数据转换。

Dataform 允许您管理数据集成中提取、加载和转换 (ELT) 过程中的数据转换。在从源系统中提取原始数据并加载到 BigQuery 中后,Dataform 可帮助您将其转换为定义明确、经过测试且有据可查的一套数据表。

有关任务的更多信息,请访问 Dataform 产品文档 <产品文档

配置

在使用 Dataform 运算符之前,您需要初始化存储库和工作区,有关此内容的更多信息,请访问 Dataform 产品文档 <产品文档

创建存储库

在 Dataform 服务中创建存储库以跟踪您的代码。下面可以看到使用示例

DataformCreateRepositoryOperator

tests/system/providers/google/cloud/dataform/example_dataform.py[源代码]

    make_repository = DataformCreateRepositoryOperator(
        task_id="make-repository",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
    )

创建工作区

在 Dataform 服务中创建工作区以存储您的代码。下面可以看到使用示例

DataformCreateWorkspaceOperator

tests/system/providers/google/cloud/dataform/example_dataform.py[源代码]

    make_workspace = DataformCreateWorkspaceOperator(
        task_id="make-workspace",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workspace_id=WORKSPACE_ID,
    )

创建编译结果

创建编译结果的简单配置如下所示

DataformCreateCompilationResultOperator

tests/system/providers/google/cloud/dataform/example_dataform.py[源代码]

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create-compilation-result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": "main",
            "workspace": (
                f"projects/{PROJECT_ID}/locations/{REGION}/repositories/{REPOSITORY_ID}/"
                f"workspaces/{WORKSPACE_ID}"
            ),
        },
    )

获取编译结果

要获取编译结果,您可以使用

DataformGetCompilationResultOperator

tests/system/providers/google/cloud/dataform/example_dataform.py[源代码]

get_compilation_result = DataformGetCompilationResultOperator(
    task_id="get-compilation-result",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    compilation_result_id=(
        "{{ task_instance.xcom_pull('create-compilation-result')['name'].split('/')[-1] }}"
    ),
)

创建工作流调用

要创建工作流调用,您可以使用

DataformCreateWorkflowInvocationOperator

我们可以在同步模式和异步模式下运行此操作,对于异步操作,我们还有一个传感器:DataformWorkflowInvocationStateSensor

tests/system/providers/google/cloud/dataform/example_dataform.py[源代码]

create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
    task_id="create-workflow-invocation",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation={
        "compilation_result": "{{ task_instance.xcom_pull('create-compilation-result')['name'] }}"
    },
)

tests/system/providers/google/cloud/dataform/example_dataform.py[源代码]

create_workflow_invocation_async = DataformCreateWorkflowInvocationOperator(
    task_id="create-workflow-invocation-async",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    asynchronous=True,
    workflow_invocation={
        "compilation_result": "{{ task_instance.xcom_pull('create-compilation-result')['name'] }}"
    },
)

is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
    task_id="is-workflow-invocation-done",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation_id=(
        "{{ task_instance.xcom_pull('create-workflow-invocation')['name'].split('/')[-1] }}"
    ),
    expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)

获取工作流调用

要获取工作流调用,可以使用

DataformGetWorkflowInvocationOperator

tests/system/providers/google/cloud/dataform/example_dataform.py[源代码]

get_workflow_invocation = DataformGetWorkflowInvocationOperator(
    task_id="get-workflow-invocation",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation_id=(
        "{{ task_instance.xcom_pull('create-workflow-invocation')['name'].split('/')[-1] }}"
    ),
)

查询工作流调用操作

要查询工作流调用操作,可以使用

DataformQueryWorkflowInvocationActionsOperator

tests/system/providers/google/cloud/dataform/example_dataform.py[源代码]

query_workflow_invocation_actions = DataformQueryWorkflowInvocationActionsOperator(
    task_id="query-workflow-invocation-actions",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation_id=(
        "{{ task_instance.xcom_pull('create-workflow-invocation')['name'].split('/')[-1] }}"
    ),
)

取消工作流调用

要取消工作流调用,可以使用

DataformCancelWorkflowInvocationOperator

tests/system/providers/google/cloud/dataform/example_dataform.py[源代码]

cancel_workflow_invocation = DataformCancelWorkflowInvocationOperator(
    task_id="cancel-workflow-invocation",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation_id=(
        "{{ task_instance.xcom_pull('create-workflow-invocation-for-cancel')['name'].split('/')[-1] }}"
    ),
)

删除存储库

删除存储库。用法示例如下所示

DataformDeleteRepositoryOperator

tests/system/providers/google/cloud/dataform/example_dataform.py[源代码]

    delete_workspace = DataformDeleteWorkspaceOperator(
        task_id="delete-workspace",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workspace_id=WORKSPACE_ID,
        trigger_rule=TriggerRule.ALL_DONE,
    )

删除工作区

删除工作区。用法示例如下所示

DataformDeleteRepositoryOperator

tests/system/providers/google/cloud/dataform/example_dataform.py[源代码]

    delete_repository = DataformDeleteRepositoryOperator(
        task_id="delete-repository",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        trigger_rule=TriggerRule.ALL_DONE,
    )

删除文件

删除文件。用法示例如下所示

DataformRemoveFileOperator

tests/system/providers/google/cloud/dataform/example_dataform.py[源代码]

    remove_test_file = DataformRemoveFileOperator(
        task_id="remove-test-file",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workspace_id=WORKSPACE_ID,
        filepath="test/test.txt",
    )

删除目录

删除目录。用法示例如下所示

DataformRemoveDirectoryOperator

tests/system/providers/google/cloud/dataform/example_dataform.py[源代码]

    remove_test_directory = DataformRemoveDirectoryOperator(
        task_id="remove-test-directory",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workspace_id=WORKSPACE_ID,
        directory_path="test",
    )

初始化工作区

为提供的存储库创建默认项目结构。在执行此操作之前,应创建工作区和存储库。用法示例如下所示

make_initialization_workspace_flow

tests/system/providers/google/cloud/dataform/example_dataform.py[源代码]

    first_initialization_step, last_initialization_step = make_initialization_workspace_flow(
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workspace_id=WORKSPACE_ID,
        package_name=f"dataform_package_{ENV_ID}",
        without_installation=True,
        dataform_schema_name=DATAFORM_SCHEMA_NAME,
    )

将文件写入工作区

将具有给定内容的文件写入指定的工作区。

DataformWriteFileOperator

tests/system/providers/google/cloud/dataform/example_dataform.py[源代码]

test_file_content = b"""
test test for test file
"""
write_test_file = DataformWriteFileOperator(
    task_id="make-test-file",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workspace_id=WORKSPACE_ID,
    filepath="test/test.txt",
    contents=test_file_content,
)

在工作区中创建目录

在指定的工作区中创建具有给定路径的目录。

DataformMakeDirectoryOperator

tests/system/providers/google/cloud/dataform/example_dataform.py[源代码]

make_test_directory = DataformMakeDirectoryOperator(
    task_id="make-test-directory",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workspace_id=WORKSPACE_ID,
    directory_path="test",
)

安装 NPM 包

为指定工作区安装 npm 包

DataformInstallNpmPackagesOperator

tests/system/providers/google/cloud/dataform/example_dataform.py[源代码]

install_npm_packages = DataformInstallNpmPackagesOperator(
    task_id="install-npm-packages",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workspace_id=WORKSPACE_ID,
)

此条目有帮助吗?