airflow.triggers.external_task

模块内容

WorkflowTrigger

一个用于监视 Apache Airflow 中任务、任务组和 DAG 执行的触发器。

TaskStateTrigger

异步等待不同 DAG 中的任务在特定逻辑日期完成。

DagStateTrigger

异步等待 DAG 在特定逻辑日期完成。

class airflow.triggers.external_task.WorkflowTrigger(external_dag_id, execution_dates, external_task_ids=None, external_task_group_id=None, failed_states=None, skipped_states=None, allowed_states=None, poke_interval=2.0, soft_fail=False, **kwargs)[来源]

基类:airflow.triggers.base.BaseTrigger

一个用于监视 Apache Airflow 中任务、任务组和 DAG 执行的触发器。

参数
  • external_dag_id (str) – 外部 DAG 的 ID。

  • execution_dates (list) – 外部 DAG 的执行日期列表。

  • external_task_ids (Collection[str] | None) – 要等待的外部任务 ID 的集合。

  • external_task_group_id (str | None) – 要等待的外部任务组的 ID。

  • failed_states (Iterable[str] | None) – 被视为外部任务失败的状态。

  • skipped_states (Iterable[str] | None) – 被视为外部任务跳过的状态。

  • allowed_states (Iterable[str] | None) – 被视为外部任务成功的状态。

  • poke_interval (float) – 轮询外部任务的间隔(以秒为单位)。

  • soft_fail (bool) – 如果为 True,则触发器不会在外部任务失败时导致整个 DAG 失败。

serialize()[来源]

序列化触发器参数和模块路径。

async run()[来源]

定期检查任务、任务组或 DAG 状态。

class airflow.triggers.external_task.TaskStateTrigger(dag_id, execution_dates, trigger_start_time, states=None, task_id=None, poll_interval=2.0)[来源]

基类:airflow.triggers.base.BaseTrigger

异步等待不同 DAG 中的任务在特定逻辑日期完成。

参数
  • dag_id (str) – 包含您要等待的任务的 dag_id

  • task_id (str | None) – 包含您要等待的任务的 task_id。

  • states (list[str] | None) – 允许的状态,默认为 ['success']

  • execution_dates (list[datetime.datetime]) – 任务执行时间间隔

  • poll_interval (float) – 检查状态的时间间隔(以秒为单位)。默认值为 5 秒。

  • trigger_start_time (datetime.datetime) – 触发器启动时的时间,格式为 Datetime。用于控制触发器的执行,以防止在数据库中不存在指定的 DAG 名称时出现无限循环。它将等待一段时间,等于 _timeout_sec 参数,从触发器启动的时间开始,如果执行时间比预期长,则触发器将以“timeout”状态终止。

serialize()[来源]

序列化 TaskStateTrigger 参数和类路径。

async run()[来源]

定期在数据库中检查以查看 DAG 是否存在并且处于运行状态。

如果找到,请等待,直到指定的任务达到预期的状态之一。如果在触发器的执行过程开始后的 _timeout_sec 秒后,具有指定名称的 DAG 未处于运行状态,则以“timeout”状态终止。

count_running_dags(session)[来源]

计算数据库中处于运行状态的 DAG 实例数。

count_tasks(*, session=NEW_SESSION)[来源]

计算数据库中有多少任务实例符合我们的条件。

class airflow.triggers.external_task.DagStateTrigger(dag_id, states, execution_dates, poll_interval=5.0)[来源]

基类:airflow.triggers.base.BaseTrigger

异步等待 DAG 在特定逻辑日期完成。

参数
  • dag_id (str) – 包含您要等待的任务的 dag_id

  • states (list[airflow.utils.state.DagRunState]) – 允许的状态,默认为 ['success']

  • execution_dates (list[datetime.datetime]) – DAG 运行的逻辑日期。

  • poll_interval (float) – 检查状态的时间间隔(秒)。默认值为 5.0 秒。

serialize()[源代码]

序列化 DagStateTrigger 参数和类路径。

async run()[源代码]

定期检查 dag 运行是否存在,并且是否已达到其中一种状态。

count_dags(*, session=NEW_SESSION)[源代码]

计算数据库中有多少 DAG 运行符合我们的条件。

此条目是否有帮助?