Google DataFusion 操作符¶
Cloud Data Fusion 是一项完全托管的、云原生的数据集成服务,可帮助用户高效构建和管理 ETL/ELT 数据管道。借助图形界面和广泛的预配置连接器和转换开源库,Cloud Data Fusion 将组织的重心从代码和集成转移到洞察和行动。
先决条件任务¶
要使用这些操作符,您必须执行以下几项操作
使用 Cloud Console 选择或创建一个 Cloud Platform 项目。
按照 Google Cloud 文档 中的说明,为您的项目启用结算功能。
按照 Cloud Console 文档 中的说明启用 API。
通过 pip 安装 API 库。
pip install 'apache-airflow[google]'有关详细信息,请参阅 安装。
重启 DataFusion 实例¶
要重启 Data Fusion 实例,请使用:CloudDataFusionRestartInstanceOperator
。
restart_instance = CloudDataFusionRestartInstanceOperator(
location=LOCATION, instance_name=INSTANCE_NAME, task_id="restart_instance"
)
您可以使用 Jinja 模板 以及 instance_name
、impersonation_chain
参数,以动态确定值。结果将保存到 XCom 中,以便其他操作符使用。
删除 DataFusion 实例¶
要删除 Data Fusion 实例,请使用:CloudDataFusionDeleteInstanceOperator
。
delete_instance = CloudDataFusionDeleteInstanceOperator(
location=LOCATION,
instance_name=INSTANCE_NAME,
task_id="delete_instance",
trigger_rule=TriggerRule.ALL_DONE,
)
您可以使用 Jinja 模板 以及 instance_name
、impersonation_chain
参数,以动态确定值。结果将保存到 XCom 中,以便其他操作符使用。
创建 DataFusion 实例¶
要创建 Data Fusion 实例,请使用:CloudDataFusionCreateInstanceOperator
。
create_instance = CloudDataFusionCreateInstanceOperator(
location=LOCATION,
instance_name=INSTANCE_NAME,
instance=INSTANCE,
task_id="create_instance",
)
您可以使用 Jinja 模板 以及 instance_name
、instance
、impersonation_chain
参数,以动态确定值。结果将保存到 XCom 中,以便其他操作符使用。
更新 DataFusion 实例¶
要更新 Data Fusion 实例,请使用:CloudDataFusionUpdateInstanceOperator
。
update_instance = CloudDataFusionUpdateInstanceOperator(
location=LOCATION,
instance_name=INSTANCE_NAME,
instance=INSTANCE,
update_mask="",
task_id="update_instance",
)
您可以使用 Jinja 模板 以及 instance_name
、instance
、impersonation_chain
参数,以动态确定值。结果将保存到 XCom 中,以便其他操作符使用。
获取 DataFusion 实例¶
要检索 Data Fusion 实例,请使用:CloudDataFusionGetInstanceOperator
。
get_instance = CloudDataFusionGetInstanceOperator(
location=LOCATION, instance_name=INSTANCE_NAME, task_id="get_instance"
)
您可以使用 Jinja 模板 以及 instance_name
、impersonation_chain
参数,以动态确定值。结果将保存到 XCom 中,以便其他操作符使用。
创建 DataFusion 管道¶
要创建 Data Fusion 管道,请使用:CloudDataFusionCreatePipelineOperator
。
create_pipeline = CloudDataFusionCreatePipelineOperator(
location=LOCATION,
pipeline_name=PIPELINE_NAME,
pipeline=PIPELINE,
instance_name=INSTANCE_NAME,
task_id="create_pipeline",
)
您可以使用 Jinja 模板 以及 instance_name
、pipeline_name
、impersonation_chain
参数,以动态确定值。结果将保存到 XCom 中,以便其他操作符使用。
启动 DataFusion 管道¶
要使用同步模式启动 Data Fusion 管道,请使用:CloudDataFusionStartPipelineOperator
。
start_pipeline = CloudDataFusionStartPipelineOperator(
location=LOCATION,
pipeline_name=PIPELINE_NAME,
instance_name=INSTANCE_NAME,
pipeline_timeout=1000,
task_id="start_pipeline",
)
要使用异步模式启动 Data Fusion 管道,请使用:CloudDataFusionStartPipelineOperator
。
start_pipeline_async = CloudDataFusionStartPipelineOperator(
location=LOCATION,
pipeline_name=PIPELINE_NAME,
instance_name=INSTANCE_NAME,
asynchronous=True,
task_id="start_pipeline_async",
)
可以使用可延迟模式异步启动 Data Fusion 管道。虽然异步参数可以使用同步 sleep() 方法等待 DataFusion 管道达到终止状态,但可延迟模式使用异步调用检查状态。不能同时使用异步和可延迟参数。请查看使用可延迟模式的示例:CloudDataFusionStartPipelineOperator
。
start_pipeline_def = CloudDataFusionStartPipelineOperator(
location=LOCATION,
pipeline_name=PIPELINE_NAME,
instance_name=INSTANCE_NAME,
task_id="start_pipeline_def",
deferrable=True,
)
您可以使用 Jinja 模板 以及 instance_name
、pipeline_name
、runtime_args
、impersonation_chain
参数,以动态确定值。结果将保存到 XCom 中,以便其他操作符使用。
停止 DataFusion 管道¶
要停止 Data Fusion 管道,请使用:CloudDataFusionStopPipelineOperator
。
stop_pipeline = CloudDataFusionStopPipelineOperator(
location=LOCATION,
pipeline_name=PIPELINE_NAME,
instance_name=INSTANCE_NAME,
task_id="stop_pipeline",
)
您可以使用 Jinja 模板 以及 instance_name
、pipeline_name
、impersonation_chain
参数,以动态确定值。结果将保存到 XCom 中,以便其他操作符使用。
删除 DataFusion 管道¶
要删除 Data Fusion 管道,请使用:CloudDataFusionDeletePipelineOperator
。
delete_pipeline = CloudDataFusionDeletePipelineOperator(
location=LOCATION,
pipeline_name=PIPELINE_NAME,
instance_name=INSTANCE_NAME,
task_id="delete_pipeline",
trigger_rule=TriggerRule.ALL_DONE,
)
您可以使用 Jinja 模板 以及 instance_name
、version_id
、pipeline_name
、impersonation_chain
参数,以动态确定值。结果将保存到 XCom 中,以便其他操作符使用。
列出 DataFusion 管道¶
要列出 Data Fusion 管道,请使用:CloudDataFusionListPipelinesOperator
。
list_pipelines = CloudDataFusionListPipelinesOperator(
location=LOCATION, instance_name=INSTANCE_NAME, task_id="list_pipelines"
)
您可以使用 Jinja 模板 以及 instance_name
、artifact_name
、artifact_version
、impersonation_chain
参数,以动态确定值。结果将保存到 XCom 中,以便其他操作符使用。
传感器¶
当异步触发管道启动时,可以使用传感器运行检查并验证管道是否处于正确状态。
CloudDataFusionPipelineStateSensor
.
start_pipeline_sensor = CloudDataFusionPipelineStateSensor(
task_id="pipeline_state_sensor",
pipeline_name=PIPELINE_NAME,
pipeline_id=start_pipeline_async.output,
expected_statuses=["COMPLETED"],
failure_statuses=["FAILED"],
instance_name=INSTANCE_NAME,
location=LOCATION,
)