Google DataFusion 算子¶
Cloud Data Fusion 是一项完全托管的云原生数据集成服务,可帮助用户高效地构建和管理 ETL/ELT 数据管道。借助图形界面和预先配置的连接器和转换的广泛开源库,Cloud Data Fusion 将组织的重点从代码和集成转移到洞察和行动。
先决任务¶
要使用这些算子,您必须执行以下操作
使用Cloud 控制台选择或创建 Cloud Platform 项目。
为您的项目启用帐单,如Google Cloud 文档中所述。
启用 API,如Cloud 控制台文档中所述。
通过pip安装 API 库。
pip install 'apache-airflow[google]'有关详细信息,请参阅 安装。
重新启动 DataFusion 实例¶
要重新启动 Data Fusion 实例,请使用:CloudDataFusionRestartInstanceOperator
。
restart_instance = CloudDataFusionRestartInstanceOperator(
location=LOCATION, instance_name=INSTANCE_NAME, task_id="restart_instance"
)
您可以将 Jinja 模板 与 instance_name
, impersonation_chain
参数结合使用,以便动态确定值。结果会保存到 XCom,以便其他运算符使用它。
删除 DataFusion 实例¶
要删除 Data Fusion 实例,请使用:CloudDataFusionDeleteInstanceOperator
。
delete_instance = CloudDataFusionDeleteInstanceOperator(
location=LOCATION,
instance_name=INSTANCE_NAME,
task_id="delete_instance",
trigger_rule=TriggerRule.ALL_DONE,
)
您可以将 Jinja 模板 与 instance_name
, impersonation_chain
参数结合使用,以便动态确定值。结果会保存到 XCom,以便其他运算符使用它。
创建 DataFusion 实例¶
要创建 Data Fusion 实例,请使用:CloudDataFusionCreateInstanceOperator
。
create_instance = CloudDataFusionCreateInstanceOperator(
location=LOCATION,
instance_name=INSTANCE_NAME,
instance=INSTANCE,
task_id="create_instance",
)
您可以将 Jinja 模板 与 instance_name
, instance
, impersonation_chain
参数结合使用,以便动态确定值。结果会保存到 XCom,以便其他运算符使用它。
更新 DataFusion 实例¶
要更新 Data Fusion 实例,请使用:CloudDataFusionUpdateInstanceOperator
。
update_instance = CloudDataFusionUpdateInstanceOperator(
location=LOCATION,
instance_name=INSTANCE_NAME,
instance=INSTANCE,
update_mask="",
task_id="update_instance",
)
您可以将 Jinja 模板 与 instance_name
, instance
, impersonation_chain
参数结合使用,以便动态确定值。结果会保存到 XCom,以便其他运算符使用它。
获取 DataFusion 实例¶
要检索 Data Fusion 实例,请使用:CloudDataFusionGetInstanceOperator
。
get_instance = CloudDataFusionGetInstanceOperator(
location=LOCATION, instance_name=INSTANCE_NAME, task_id="get_instance"
)
您可以将 Jinja 模板 与 instance_name
, impersonation_chain
参数结合使用,以便动态确定值。结果会保存到 XCom,以便其他运算符使用它。
创建 DataFusion 管道¶
要创建 Data Fusion 管道,请使用:CloudDataFusionCreatePipelineOperator
。
create_pipeline = CloudDataFusionCreatePipelineOperator(
location=LOCATION,
pipeline_name=PIPELINE_NAME,
pipeline=PIPELINE,
instance_name=INSTANCE_NAME,
task_id="create_pipeline",
)
您可以将Jinja 模板与instance_name
、pipeline_name
、impersonation_chain
参数一起使用,这些参数允许您动态确定值。结果将保存到XCom中,以便其他运算符使用它。
启动 DataFusion 管道¶
要使用同步模式启动 Data Fusion 管道:CloudDataFusionStartPipelineOperator
。
start_pipeline = CloudDataFusionStartPipelineOperator(
location=LOCATION,
pipeline_name=PIPELINE_NAME,
instance_name=INSTANCE_NAME,
task_id="start_pipeline",
)
要使用异步模式启动 Data Fusion 管道:CloudDataFusionStartPipelineOperator
。
start_pipeline_async = CloudDataFusionStartPipelineOperator(
location=LOCATION,
pipeline_name=PIPELINE_NAME,
instance_name=INSTANCE_NAME,
asynchronous=True,
task_id="start_pipeline_async",
)
可以使用可延迟模式异步启动 Data Fusion 管道。虽然异步参数提供了使用同步 sleep() 方法等待 DataFusion 管道达到终止状态的可能性,但可延迟模式使用异步调用检查状态。无法同时使用异步和可延迟参数。请查看使用可延迟模式的示例:CloudDataFusionStartPipelineOperator
。
start_pipeline_def = CloudDataFusionStartPipelineOperator(
location=LOCATION,
pipeline_name=PIPELINE_NAME,
instance_name=INSTANCE_NAME,
task_id="start_pipeline_def",
deferrable=True,
)
你可以将Jinja 模板与instance_name
、pipeline_name
、runtime_args
、impersonation_chain
参数一起使用,这些参数允许你动态确定值。结果保存到XCom中,其他运算符可以使用它。
停止 DataFusion 管道¶
要停止 Data Fusion 管道,请使用:CloudDataFusionStopPipelineOperator
。
stop_pipeline = CloudDataFusionStopPipelineOperator(
location=LOCATION,
pipeline_name=PIPELINE_NAME,
instance_name=INSTANCE_NAME,
task_id="stop_pipeline",
)
您可以将Jinja 模板与instance_name
、pipeline_name
、impersonation_chain
参数一起使用,这些参数允许您动态确定值。结果将保存到XCom中,以便其他运算符使用它。
删除 DataFusion 管道¶
要删除 Data Fusion 管道,请使用:CloudDataFusionDeletePipelineOperator
。
delete_pipeline = CloudDataFusionDeletePipelineOperator(
location=LOCATION,
pipeline_name=PIPELINE_NAME,
instance_name=INSTANCE_NAME,
task_id="delete_pipeline",
trigger_rule=TriggerRule.ALL_DONE,
)
您可以将 Jinja 模板 与 instance_name
、version_id
、pipeline_name
、impersonation_chain
参数配合使用,以便动态确定值。结果保存到 XCom 中,以便其他运算符使用。
列出 DataFusion 管道¶
要列出 Data Fusion 管道,请使用:CloudDataFusionListPipelinesOperator
。
list_pipelines = CloudDataFusionListPipelinesOperator(
location=LOCATION, instance_name=INSTANCE_NAME, task_id="list_pipelines"
)
您可以将 Jinja 模板 与 instance_name
、artifact_name
、artifact_version
、impersonation_chain
参数配合使用,以便动态确定值。结果保存到 XCom 中,以便其他运算符使用。
传感器¶
当异步触发启动管道时,可以使用传感器运行检查并验证管道是否处于正确状态。
CloudDataFusionPipelineStateSensor
.
start_pipeline_sensor = CloudDataFusionPipelineStateSensor(
task_id="pipeline_state_sensor",
pipeline_name=PIPELINE_NAME,
pipeline_id=start_pipeline_async.output,
expected_statuses=["COMPLETED"],
failure_statuses=["FAILED"],
instance_name=INSTANCE_NAME,
location=LOCATION,
)