跨 DAG 依赖关系¶
当两个 DAG 具有依赖关系时,值得考虑将它们合并为一个 DAG,这通常更容易理解。Airflow 还为同一 DAG 上的任务提供了更好的依赖关系可视化表示。但是,有时将所有相关任务放在同一 DAG 上是不实际的。例如
两个 DAG 可能具有不同的计划。例如,每周 DAG 可能具有依赖于每日 DAG 上其他任务的任务。
不同的团队负责不同的 DAG,但这些 DAG 具有一些跨 DAG 依赖关系。
一个任务可能依赖于同一 DAG 上的另一个任务,但对于不同的
execution_date
(数据间隔的开始)。对于在不同时间运行的任务,请使用
execution_delta
,例如execution_delta=timedelta(hours=1)
以检查 1 小时前运行的任务。
ExternalTaskSensor
可用于建立不同 DAG 之间的此类依赖关系。当它与 ExternalTaskMarker
一起使用时,清除依赖任务也可以跨不同的 DAG 发生。
ExternalTaskSensor¶
使用 ExternalTaskSensor
使 DAG 上的任务等待另一个 DAG 上的任务的特定 execution_date
。
ExternalTaskSensor 还提供了通过 allowed_states
和 failed_states
参数设置远程 DAG 上的任务是否成功或失败的选项。
child_task1 = ExternalTaskSensor(
task_id="child_task1",
external_dag_id=parent_dag.dag_id,
external_task_id=parent_task.task_id,
timeout=600,
allowed_states=["success"],
failed_states=["failed", "skipped"],
mode="reschedule",
)
对于此操作,您还可以使用可延迟模式下的传感器
external_task_sensor = ExternalTaskSensor(
task_id="parent_task_sensor",
external_task_id="child_task",
external_dag_id="child_dag",
deferrable=True,
)
具有 task_group 依赖关系的 ExternalTaskSensor¶
此外,我们还可以使用 ExternalTaskSensor
使 DAG 上的任务等待不同 DAG 上特定 execution_date
的另一个 task_group
。
child_task2 = ExternalTaskSensor(
task_id="child_task2",
external_dag_id=parent_dag.dag_id,
external_task_group_id="parent_dag_task_group_id",
timeout=600,
allowed_states=["success"],
failed_states=["failed", "skipped"],
mode="reschedule",
)
ExternalTaskMarker¶
如果希望在清除 parent_dag
上的 parent_task
时,还应清除 child_dag
上特定 execution_date
的 child_task1
,则应使用 ExternalTaskMarker
。请注意,仅当用户清除 parent_task
时选择了“递归”时,才会清除 child_task1
。
parent_task = ExternalTaskMarker(
task_id="parent_task",
external_dag_id="example_external_task_marker_child",
external_task_id="child_task1",
)