Google Dataform 运算符¶
Dataform 是一项服务,供数据分析师在 BigQuery 中开发、测试、版本控制和安排复杂 SQL 工作流以进行数据转换。
Dataform 允许您管理数据集成中提取、加载和转换 (ELT) 过程中的数据转换。在从源系统中提取原始数据并加载到 BigQuery 中后,Dataform 可帮助您将其转换为定义明确、经过测试且有据可查的一套数据表。
有关任务的更多信息,请访问 Dataform 产品文档 <产品文档
配置¶
在使用 Dataform 运算符之前,您需要初始化存储库和工作区,有关此内容的更多信息,请访问 Dataform 产品文档 <产品文档
创建存储库¶
在 Dataform 服务中创建存储库以跟踪您的代码。下面可以看到使用示例
DataformCreateRepositoryOperator
make_repository = DataformCreateRepositoryOperator(
task_id="make-repository",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
)
创建工作区¶
在 Dataform 服务中创建工作区以存储您的代码。下面可以看到使用示例
DataformCreateWorkspaceOperator
make_workspace = DataformCreateWorkspaceOperator(
task_id="make-workspace",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workspace_id=WORKSPACE_ID,
)
创建编译结果¶
创建编译结果的简单配置如下所示
DataformCreateCompilationResultOperator
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create-compilation-result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": "main",
"workspace": (
f"projects/{PROJECT_ID}/locations/{REGION}/repositories/{REPOSITORY_ID}/"
f"workspaces/{WORKSPACE_ID}"
),
},
)
获取编译结果¶
要获取编译结果,您可以使用
DataformGetCompilationResultOperator
get_compilation_result = DataformGetCompilationResultOperator(
task_id="get-compilation-result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result_id=(
"{{ task_instance.xcom_pull('create-compilation-result')['name'].split('/')[-1] }}"
),
)
创建工作流调用¶
要创建工作流调用,您可以使用
DataformCreateWorkflowInvocationOperator
我们可以在同步模式和异步模式下运行此操作,对于异步操作,我们还有一个传感器:DataformWorkflowInvocationStateSensor
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id="create-workflow-invocation",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create-compilation-result')['name'] }}"
},
)
create_workflow_invocation_async = DataformCreateWorkflowInvocationOperator(
task_id="create-workflow-invocation-async",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
asynchronous=True,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create-compilation-result')['name'] }}"
},
)
is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
task_id="is-workflow-invocation-done",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation_id=(
"{{ task_instance.xcom_pull('create-workflow-invocation')['name'].split('/')[-1] }}"
),
expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)
获取工作流调用¶
要获取工作流调用,可以使用
DataformGetWorkflowInvocationOperator
get_workflow_invocation = DataformGetWorkflowInvocationOperator(
task_id="get-workflow-invocation",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation_id=(
"{{ task_instance.xcom_pull('create-workflow-invocation')['name'].split('/')[-1] }}"
),
)
查询工作流调用操作¶
要查询工作流调用操作,可以使用
DataformQueryWorkflowInvocationActionsOperator
query_workflow_invocation_actions = DataformQueryWorkflowInvocationActionsOperator(
task_id="query-workflow-invocation-actions",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation_id=(
"{{ task_instance.xcom_pull('create-workflow-invocation')['name'].split('/')[-1] }}"
),
)
取消工作流调用¶
要取消工作流调用,可以使用
DataformCancelWorkflowInvocationOperator
cancel_workflow_invocation = DataformCancelWorkflowInvocationOperator(
task_id="cancel-workflow-invocation",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation_id=(
"{{ task_instance.xcom_pull('create-workflow-invocation-for-cancel')['name'].split('/')[-1] }}"
),
)
删除存储库¶
删除存储库。用法示例如下所示
DataformDeleteRepositoryOperator
delete_workspace = DataformDeleteWorkspaceOperator(
task_id="delete-workspace",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workspace_id=WORKSPACE_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
删除工作区¶
删除工作区。用法示例如下所示
DataformDeleteRepositoryOperator
delete_repository = DataformDeleteRepositoryOperator(
task_id="delete-repository",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
删除文件¶
删除文件。用法示例如下所示
remove_test_file = DataformRemoveFileOperator(
task_id="remove-test-file",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workspace_id=WORKSPACE_ID,
filepath="test/test.txt",
)
删除目录¶
删除目录。用法示例如下所示
DataformRemoveDirectoryOperator
remove_test_directory = DataformRemoveDirectoryOperator(
task_id="remove-test-directory",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workspace_id=WORKSPACE_ID,
directory_path="test",
)
初始化工作区¶
为提供的存储库创建默认项目结构。在执行此操作之前,应创建工作区和存储库。用法示例如下所示
make_initialization_workspace_flow
first_initialization_step, last_initialization_step = make_initialization_workspace_flow(
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workspace_id=WORKSPACE_ID,
package_name=f"dataform_package_{ENV_ID}",
without_installation=True,
dataform_schema_name=DATAFORM_SCHEMA_NAME,
)
将文件写入工作区¶
将具有给定内容的文件写入指定的工作区。
test_file_content = b"""
test test for test file
"""
write_test_file = DataformWriteFileOperator(
task_id="make-test-file",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workspace_id=WORKSPACE_ID,
filepath="test/test.txt",
contents=test_file_content,
)
在工作区中创建目录¶
在指定的工作区中创建具有给定路径的目录。
make_test_directory = DataformMakeDirectoryOperator(
task_id="make-test-directory",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workspace_id=WORKSPACE_ID,
directory_path="test",
)
安装 NPM 包¶
为指定工作区安装 npm 包
DataformInstallNpmPackagesOperator
install_npm_packages = DataformInstallNpmPackagesOperator(
task_id="install-npm-packages",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workspace_id=WORKSPACE_ID,
)