Google DataFusion 算子

Cloud Data Fusion 是一项完全托管的云原生数据集成服务,可帮助用户高效地构建和管理 ETL/ELT 数据管道。借助图形界面和预先配置的连接器和转换的广泛开源库,Cloud Data Fusion 将组织的重点从代码和集成转移到洞察和行动。

先决任务

要使用这些算子,您必须执行以下操作

重新启动 DataFusion 实例

要重新启动 Data Fusion 实例,请使用:CloudDataFusionRestartInstanceOperator

tests/system/providers/google/cloud/datafusion/example_datafusion.py[源代码]

restart_instance = CloudDataFusionRestartInstanceOperator(
    location=LOCATION, instance_name=INSTANCE_NAME, task_id="restart_instance"
)

您可以将 Jinja 模板instance_name, impersonation_chain 参数结合使用,以便动态确定值。结果会保存到 XCom,以便其他运算符使用它。

删除 DataFusion 实例

要删除 Data Fusion 实例,请使用:CloudDataFusionDeleteInstanceOperator

tests/system/providers/google/cloud/datafusion/example_datafusion.py[源代码]

delete_instance = CloudDataFusionDeleteInstanceOperator(
    location=LOCATION,
    instance_name=INSTANCE_NAME,
    task_id="delete_instance",
    trigger_rule=TriggerRule.ALL_DONE,
)

您可以将 Jinja 模板instance_name, impersonation_chain 参数结合使用,以便动态确定值。结果会保存到 XCom,以便其他运算符使用它。

创建 DataFusion 实例

要创建 Data Fusion 实例,请使用:CloudDataFusionCreateInstanceOperator

tests/system/providers/google/cloud/datafusion/example_datafusion.py[源代码]

create_instance = CloudDataFusionCreateInstanceOperator(
    location=LOCATION,
    instance_name=INSTANCE_NAME,
    instance=INSTANCE,
    task_id="create_instance",
)

您可以将 Jinja 模板instance_name, instance, impersonation_chain 参数结合使用,以便动态确定值。结果会保存到 XCom,以便其他运算符使用它。

更新 DataFusion 实例

要更新 Data Fusion 实例,请使用:CloudDataFusionUpdateInstanceOperator

tests/system/providers/google/cloud/datafusion/example_datafusion.py[源代码]

update_instance = CloudDataFusionUpdateInstanceOperator(
    location=LOCATION,
    instance_name=INSTANCE_NAME,
    instance=INSTANCE,
    update_mask="",
    task_id="update_instance",
)

您可以将 Jinja 模板instance_name, instance, impersonation_chain 参数结合使用,以便动态确定值。结果会保存到 XCom,以便其他运算符使用它。

获取 DataFusion 实例

要检索 Data Fusion 实例,请使用:CloudDataFusionGetInstanceOperator

tests/system/providers/google/cloud/datafusion/example_datafusion.py[源代码]

get_instance = CloudDataFusionGetInstanceOperator(
    location=LOCATION, instance_name=INSTANCE_NAME, task_id="get_instance"
)

您可以将 Jinja 模板instance_name, impersonation_chain 参数结合使用,以便动态确定值。结果会保存到 XCom,以便其他运算符使用它。

创建 DataFusion 管道

要创建 Data Fusion 管道,请使用:CloudDataFusionCreatePipelineOperator

tests/system/providers/google/cloud/datafusion/example_datafusion.py[源代码]

create_pipeline = CloudDataFusionCreatePipelineOperator(
    location=LOCATION,
    pipeline_name=PIPELINE_NAME,
    pipeline=PIPELINE,
    instance_name=INSTANCE_NAME,
    task_id="create_pipeline",
)

您可以将Jinja 模板instance_namepipeline_nameimpersonation_chain参数一起使用,这些参数允许您动态确定值。结果将保存到XCom中,以便其他运算符使用它。

启动 DataFusion 管道

要使用同步模式启动 Data Fusion 管道:CloudDataFusionStartPipelineOperator

tests/system/providers/google/cloud/datafusion/example_datafusion.py[源代码]

start_pipeline = CloudDataFusionStartPipelineOperator(
    location=LOCATION,
    pipeline_name=PIPELINE_NAME,
    instance_name=INSTANCE_NAME,
    task_id="start_pipeline",
)

要使用异步模式启动 Data Fusion 管道:CloudDataFusionStartPipelineOperator

tests/system/providers/google/cloud/datafusion/example_datafusion.py[源代码]

start_pipeline_async = CloudDataFusionStartPipelineOperator(
    location=LOCATION,
    pipeline_name=PIPELINE_NAME,
    instance_name=INSTANCE_NAME,
    asynchronous=True,
    task_id="start_pipeline_async",
)

可以使用可延迟模式异步启动 Data Fusion 管道。虽然异步参数提供了使用同步 sleep() 方法等待 DataFusion 管道达到终止状态的可能性,但可延迟模式使用异步调用检查状态。无法同时使用异步和可延迟参数。请查看使用可延迟模式的示例:CloudDataFusionStartPipelineOperator

tests/system/providers/google/cloud/datafusion/example_datafusion.py[源代码]

start_pipeline_def = CloudDataFusionStartPipelineOperator(
    location=LOCATION,
    pipeline_name=PIPELINE_NAME,
    instance_name=INSTANCE_NAME,
    task_id="start_pipeline_def",
    deferrable=True,
)

你可以将Jinja 模板instance_namepipeline_nameruntime_argsimpersonation_chain参数一起使用,这些参数允许你动态确定值。结果保存到XCom中,其他运算符可以使用它。

停止 DataFusion 管道

要停止 Data Fusion 管道,请使用:CloudDataFusionStopPipelineOperator

tests/system/providers/google/cloud/datafusion/example_datafusion.py[源代码]

stop_pipeline = CloudDataFusionStopPipelineOperator(
    location=LOCATION,
    pipeline_name=PIPELINE_NAME,
    instance_name=INSTANCE_NAME,
    task_id="stop_pipeline",
)

您可以将Jinja 模板instance_namepipeline_nameimpersonation_chain参数一起使用,这些参数允许您动态确定值。结果将保存到XCom中,以便其他运算符使用它。

删除 DataFusion 管道

要删除 Data Fusion 管道,请使用:CloudDataFusionDeletePipelineOperator

tests/system/providers/google/cloud/datafusion/example_datafusion.py[源代码]

delete_pipeline = CloudDataFusionDeletePipelineOperator(
    location=LOCATION,
    pipeline_name=PIPELINE_NAME,
    instance_name=INSTANCE_NAME,
    task_id="delete_pipeline",
    trigger_rule=TriggerRule.ALL_DONE,
)

您可以将 Jinja 模板instance_nameversion_idpipeline_nameimpersonation_chain 参数配合使用,以便动态确定值。结果保存到 XCom 中,以便其他运算符使用。

列出 DataFusion 管道

要列出 Data Fusion 管道,请使用:CloudDataFusionListPipelinesOperator

tests/system/providers/google/cloud/datafusion/example_datafusion.py[源代码]

list_pipelines = CloudDataFusionListPipelinesOperator(
    location=LOCATION, instance_name=INSTANCE_NAME, task_id="list_pipelines"
)

您可以将 Jinja 模板instance_nameartifact_nameartifact_versionimpersonation_chain 参数配合使用,以便动态确定值。结果保存到 XCom 中,以便其他运算符使用。

传感器

当异步触发启动管道时,可以使用传感器运行检查并验证管道是否处于正确状态。

CloudDataFusionPipelineStateSensor.

tests/system/providers/google/cloud/datafusion/example_datafusion.py[源代码]

start_pipeline_sensor = CloudDataFusionPipelineStateSensor(
    task_id="pipeline_state_sensor",
    pipeline_name=PIPELINE_NAME,
    pipeline_id=start_pipeline_async.output,
    expected_statuses=["COMPLETED"],
    failure_statuses=["FAILED"],
    instance_name=INSTANCE_NAME,
    location=LOCATION,
)

CloudDataFusionPipelineStateSensor.

此条目是否有用?