跨DAG依赖

当两个 DAGs 之间存在依赖关系时,可以考虑将它们合并到一个单独的 DAG 中,这通常更容易理解。Airflow 也为同一 DAG 中的任务提供了更好的依赖可视化表示。然而,有时将所有相关任务放在同一个 DAG 中并不实用。例如:

  • 两个 DAGs 可能有不同的调度。例如,一个每周运行的 DAG 中的任务可能依赖于一个每天运行的 DAG 中的其他任务。

  • 不同的团队负责不同的 DAGs,但这些 DAGs 之间存在一些跨 DAG 依赖。

  • 某个任务可能依赖于同一 DAG 中但不同 execution_date(数据区间的开始)下的另一个任务。

  • 对于在不同时间运行的任务,可以使用 execution_delta,例如使用 execution_delta=timedelta(hours=1) 来检查比当前任务早运行 1 小时的任务。

可以使用 ExternalTaskSensor 来建立跨不同 DAG 的此类依赖关系。当它与 ExternalTaskMarker 一起使用时,依赖任务的清除也可以跨不同 DAG 发生。

ExternalTaskSensor

使用 ExternalTaskSensor 使一个 DAG 中的任务等待另一个不同 DAG 中特定 execution_date 下的任务完成。

ExternalTaskSensor 还提供了选项,用于通过 allowed_statesfailed_states 参数设置远程 DAG 上的任务是成功还是失败。

airflow/example_dags/example_external_task_marker_dag.py

child_task1 = ExternalTaskSensor(
    task_id="child_task1",
    external_dag_id=parent_dag.dag_id,
    external_task_id=parent_task.task_id,
    timeout=600,
    allowed_states=["success"],
    failed_states=["failed", "skipped"],
    mode="reschedule",
)

此外,对于此操作,您可以在可延迟模式下使用 Sensor

airflow-core/tests/system/core/example_external_task_parent_deferrable.py

external_task_sensor = ExternalTaskSensor(
    task_id="parent_task_sensor",
    external_task_id="child_task",
    external_dag_id="child_dag",
    deferrable=True,
)

具有 task_group 依赖的 ExternalTaskSensor

此外,我们还可以使用 ExternalTaskSensor 使一个 DAG 中的任务等待另一个不同 DAG 中特定 execution_date 下的 task_group 完成。

airflow/example_dags/example_external_task_marker_dag.py

child_task2 = ExternalTaskSensor(
    task_id="child_task2",
    external_dag_id=parent_dag.dag_id,
    external_task_group_id="parent_dag_task_group_id",
    timeout=600,
    allowed_states=["success"],
    failed_states=["failed", "skipped"],
    mode="reschedule",
)

ExternalTaskMarker

如果希望在 parent_dag 上的 parent_task 被清除时,特定 execution_datechild_dag 上的 child_task1 也被清除,则应使用 ExternalTaskMarker。请注意,只有当用户清除 parent_task 时选择“递归”选项,child_task1 才会被清除。

airflow/example_dags/example_external_task_marker_dag.py

parent_task = ExternalTaskMarker(
    task_id="parent_task",
    external_dag_id="example_external_task_marker_child",
    external_task_id="child_task1",
)

这篇内容有帮助吗?