时间表

对于具有基于时间调度(相对于事件驱动)的 DAG,DAG 的内部“时间表”驱动调度。时间表还确定为 DAG 创建的每个运行的数据间隔和逻辑日期。

使用 cron 表达式或 timedelta 对象调度的 DAG 在内部转换为始终使用时间表。

如果 cron 表达式或 timedelta 足以满足你的用例,则无需担心编写自定义时间表,因为 Airflow 具有处理这些情况的默认时间表。但对于更复杂的时间安排要求,你可以创建自己的时间表类,并将其传递给 DAG 的 schedule 参数。

自定义时间表实现有用的示例

  • 每天不同时间运行的任务。例如,天文学家可能会发现,在黎明时分运行一项任务来处理前一晚收集的数据很有用。

  • 不遵循公历的时间表。例如,为中国传统历法中的每个月创建一个运行。从概念上来说,这与日出情况类似,但时间尺度不同。

  • 滚动窗口或重叠数据间隔。例如,您可能希望每天运行一次,但每次运行都涵盖前七天的周期。可以使用 cron 表达式破解此问题,但自定义数据间隔提供了更自然的表示。

  • 具有间隔之间“空隙”的数据间隔,而不是连续间隔,因为 cron 表达式和timedelta时间表都表示连续间隔。请参阅数据间隔

Airflow 允许您在插件中编写自定义时间表,并由 DAG 使用。您可以在使用时间表自定义 DAG 调度操作指南中找到演示自定义时间表的示例。

注意

作为一般规则,请始终在代码中尽可能晚地访问变量、连接或任何其他需要访问数据库的内容。有关要遵循的更多最佳实践,请参阅时间表

内置时间表

Airflow 内置了几个常见的时间表,以涵盖最常见的用例。插件中可能还有其他时间表。

CronTriggerTimetable

接受 cron 表达式并根据该表达式触发 DAG 运行的时间表。

from airflow.timetables.trigger import CronTriggerTimetable


@dag(schedule=CronTriggerTimetable("0 1 * * 3", timezone="UTC"), ...)  # At 01:00 on Wednesday
def example_dag():
    pass

您还可以向时间表提供一个静态数据间隔。可选的 interval 参数必须是 datetime.timedeltadateutil.relativedelta.relativedelta。使用这些参数时,触发的 DAG 运行的数据间隔跨越指定持续时间,并触发时间结束

from datetime import timedelta

from airflow.timetables.trigger import CronTriggerTimetable


@dag(
    # Runs every Friday at 18:00 to cover the work week (9:00 Monday to 18:00 Friday).
    schedule=CronTriggerTimetable(
        "0 18 * * 5",
        timezone="UTC",
        interval=timedelta(days=4, hours=9),
    ),
    ...,
)
def example_dag():
    pass

DeltaDataIntervalTimetable

一个使用时间增量安排数据间隔的时间表。您可以通过向 DAG 的 schedule 参数提供 datetime.timedeltadateutil.relativedelta.relativedelta 来选择它。

此时间表重点关注数据间隔值,并不一定将执行日期与任意界限(例如,一天或一小时的开始)对齐。

@dag(schedule=datetime.timedelta(minutes=30))
def example_dag():
    pass

CronDataIntervalTimetable

一个接受 cron 表达式的时间表,根据每个 cron 触发点之间的间隔创建数据间隔,并在每个数据间隔结束时触发 DAG 运行。

通过向 DAG 的 schedule 参数提供一个有效的 cron 表达式(字符串形式)来选择此时间表,如 DAG 文档中所述。

@dag(schedule="0 1 * * 3")  # At 01:00 on Wednesday.
def example_dag():
    pass

EventsTimetable

传递一个 datetime 列表,以便在之后运行 DAG。这对于基于体育赛事、计划的传播活动以及其他任意且不规则但可预测的日程安排的时间安排非常有用。

事件列表必须是有限的且大小合理,因为它必须在每次解析 DAG 时加载。或者,使用 restrict_to_events 标志强制手动运行 DAG,该 DAG 使用最近或最早事件的时间作为数据间隔。否则,手动运行将以 data_interval_startdata_interval_end(等于手动运行开始时间)开始。您还可以使用 description 参数为事件集命名,该参数将显示在 Airflow UI 中。

from airflow.timetables.events import EventsTimetable


@dag(
    schedule=EventsTimetable(
        event_dates=[
            pendulum.datetime(2022, 4, 5, 8, 27, tz="America/Chicago"),
            pendulum.datetime(2022, 4, 17, 8, 27, tz="America/Chicago"),
            pendulum.datetime(2022, 4, 22, 20, 50, tz="America/Chicago"),
        ],
        description="My Team's Baseball Games",
        restrict_to_events=False,
    ),
    ...,
)
def example_dag():
    pass

基于数据集事件的调度与基于时间的调度

将条件数据集表达式与基于时间的调度相结合可增强调度灵活性。

DatasetOrTimeSchedule 是一个专门的时间表,它允许根据基于时间的调度和数据集事件对 DAG 进行调度。它还促进了按传统时间表进行的计划运行和独立运行的数据集触发运行的创建。

此功能在 DAG 需要在数据集更新时以及定期运行的情况下特别有用。它确保工作流对数据更改保持响应,并持续运行定期检查或更新。

以下是使用 DatasetOrTimeSchedule 的 DAG 示例

from airflow.timetables import DatasetOrTimeSchedule
from airflow.timetables.trigger import CronTriggerTimetable


@dag(
    schedule=DatasetOrTimeSchedule(
        timetable=CronTriggerTimetable("0 1 * * 3", timezone="UTC"), datasets=(dag1_dataset & dag2_dataset)
    )
    # Additional arguments here, replace this comment with actual arguments
)
def example_dag():
    # DAG tasks go here
    pass

时间表比较

两个 cron 时间表之间的差异

Airflow 有两个时间表 CronTriggerTimetableCronDataIntervalTimetable,它们接受 cron 表达式。

然而,两者之间存在差异:- CronTriggerTimetable 不涉及数据间隔,而 CronDataIntervalTimetable 则涉及。- run_id 中的时间戳、CronTriggerTimetableCronDataIntervalTimetablelogical_date 根据它们处理数据间隔的方式以不同的方式定义,如 DAG 运行触发的时间 中所述。

是否处理数据间隔

CronTriggerTimetable 包含数据间隔。这意味着 data_interval_startdata_interval_end(以及旧版 execution_date)的值相同;DAG 运行触发的时间。

然而,CronDataIntervalTimetable 确实包含数据间隔。这意味着 data_interval_startdata_interval_end(以及旧版 execution_date)的值不同。 data_interval_start 是 DAG 运行触发的时间,而 data_interval_end 是该间隔的结束时间。

追赶行为

无论使用 CronTriggerTimetable 还是 CronDataIntervalTimetable,当 catchupTrue 时,没有区别。

在某些情况下,你可能希望对 catchup 使用 False,以防止运行不必要的 DAG:- 如果你创建了一个开始日期在过去的新 DAG,并且不想运行过去的 DAG。如果 catchupTrue,Airflow 将运行该时间间隔内应运行的所有 DAG。- 如果你暂停了一个现有的 DAG,然后在稍后的日期重新启动它,并且不想如果 catchupTrue

在这些情况下,run_id 中的 logical_date 基于 CronTriggerTimetableCronDataIntervalTimetable 如何处理数据间隔。

有关使用 catchup 时如何触发 DAG 运行的更多信息,请参阅 Catchup

DAG 运行触发的时间

CronTriggerTimetableCronDataIntervalTimetable 在同一时间触发 DAG 运行。但是,每个 run_id 的时间戳是不同的。

例如,假设有一个 cron 表达式 @daily0 0 * * *,计划每天凌晨 12 点运行。如果您在 1 月 31 日下午 3 点使用两个时间表启用 DAG,则: - CronTriggerTimetable 会在 2 月 1 日凌晨 12 点触发新的 DAG 运行。run_id 时间戳是 2 月 1 日午夜。 - CronDataIntervalTimetable 会立即触发新的 DAG 运行,因为 1 月 31 日凌晨 12 点开始的每日时间间隔的 DAG 运行尚未发生。run_id 时间戳是 1 月 31 日午夜,因为那是数据间隔的开始。

这是另一个示例,展示了在跳过 DAG 运行的情况下存在的差异。

假设有两个正在运行的 DAG,其 cron 表达式为 @daily0 0 * * *,它们使用两个不同的时间表。如果您在 1 月 31 日下午 3 点暂停 DAG,并在 2 月 2 日下午 3 点重新启用它们,则: - CronTriggerTimetable 会跳过原定于 2 月 1 日和 2 日触发的 DAG 运行。下一个 DAG 运行将在 2 月 3 日凌晨 12 点触发。 - CronDataIntervalTimetable 仅跳过原定于 2 月 1 日触发的 DAG 运行。重新启用 DAG 后,会立即触发 2 月 2 日的 DAG 运行。

在这些示例中,您会看到 CronTriggerTimetable 如何触发 DAG 运行更直观,并且比 CronDataIntervalTimetable 更符合人们对 cron 预期的行为。

cron 和 delta 数据间隔时间表之间的差异:

DeltaDataIntervalTimetableCronDataIntervalTimetable 之间进行选择取决于你的用例。如果你在 2 月 1 日 01:05 启用一个 DAG,下表总结了创建的 DAG 运行以及它们涵盖的数据间隔,具体取决于 3 个参数:schedulestart_datecatchup

schedule

start_date

catchup

涵盖的间隔

备注

*/30 * * * *

year-02-01

True

  • 00:00 - 00:30

  • 00:30 - 01:00

与使用 timedelta 对象相同。

*/30 * * * *

year-02-01

False

  • 00:30 - 01:00

*/30 * * * *

year-02-01 00:10

True

  • 00:30 - 01:00

00:00 - 00:30 的间隔不在开始日期之后,因此被跳过。

*/30 * * * *

year-02-01 00:10

False

  • 00:30 - 01:00

无论开始日期是什么,数据间隔都与小时/天/等边界对齐。

datetime.timedelta(minutes=30)

year-02-01

True

  • 00:00 - 00:30

  • 00:30 - 01:00

与使用 cron 表达式相同。

datetime.timedelta(minutes=30)

year-02-01

False

  • 00:35 - 01:05

间隔与开始日期不一致,但与当前时间一致。

datetime.timedelta(minutes=30)

year-02-01 00:10

True

  • 00:10 - 00:40

间隔与开始日期一致。下一个将在 5 分钟内触发,涵盖 00:40 - 01:10。

datetime.timedelta(minutes=30)

year-02-01 00:10

False

  • 00:35 - 01:05

间隔与当前时间一致。下一个运行将在 30 分钟内触发。

此条目有帮助吗?