Airflow 2025 峰会将于 10 月 07 日至 09 日举行。立即注册即可获得早鸟票!

时间表

对于基于时间调度的 DAG(与事件驱动相对),DAG 的内部“时间表”驱动调度。时间表还决定了为 DAG 创建的每次运行的数据间隔和逻辑日期。

使用 cron 表达式或 timedelta 对象调度的 DAG 在内部被转换为始终使用时间表。

如果 cron 表达式或 timedelta 足以满足您的用例,则无需担心编写自定义时间表,因为 Airflow 具有处理这些情况的默认时间表。但对于更复杂的调度要求,您可以创建自己的时间表类并将其传递给 DAG 的 schedule 参数。

自定义时间表实现的有用示例

  • 每天在不同时间发生的任务运行。例如,天文学家可能会发现,在黎明时运行任务来处理前一夜收集的数据很有用。

  • 不遵循公历的调度。例如,为农历中的每个月创建一次运行。这在概念上类似于日出情况,但时间尺度不同。

  • 滚动窗口或重叠的数据间隔。例如,您可能希望每天都有一次运行,但让每次运行涵盖前七天的时间段。可以使用 cron 表达式来巧妙地实现这一点,但自定义数据间隔提供了更自然的表示方式。

  • 数据间隔之间存在“间隔”,而不是连续间隔,因为 cron 表达式和 timedelta 调度都表示连续间隔。参见数据间隔

Airflow 允许您在插件中编写自定义时间表并由 DAG 使用。您可以在使用时间表自定义 DAG 调度操作指南中找到演示自定义时间表的示例。

注意

作为一般规则,始终在代码中尽可能晚地访问 Variables、Connections 或其他任何需要访问数据库的内容。有关应遵循的更多最佳实践,请参见时间表

内置时间表

Airflow 内置了几个常用的时间表,以涵盖最常见的用例。插件中可能提供额外的时间表。

DeltaTriggerTimetable

接受 datetime.timedeltadateutil.relativedelta.relativedelta,并在经过一个时间差后运行 DAG 的时间表。

from datetime import timedelta

from airflow.timetables.trigger import DeltaTriggerTimetable


@dag(schedule=DeltaTriggerTimetable(timedelta(days=7)), ...)  # Once every week.
def example_dag():
    pass

您还可以向时间表提供静态数据间隔。可选的 interval 参数也应该是一个 datetime.timedeltadateutil.relativedelta.relativedelta。使用这些参数时,触发的 DAG 运行的数据间隔跨越指定的持续时间,并以触发时间为结束

from datetime import UTC, datetime, timedelta

from dateutil.relativedelta import relativedelta, FR

from airflow.timetables.trigger import DeltaTriggerTimetable


@dag(
    # Runs every Friday at 18:00 to cover the work week.
    schedule=DeltaTriggerTimetable(
        relativedelta(weekday=FR(), hour=18),
        interval=timedelta(days=4, hours=9),
    ),
    start_date=datetime(2025, 1, 3, 18, tzinfo=UTC),
    ...,
)
def example_dag():
    pass

CronTriggerTimetable

接受 cron 表达式并根据其触发 DAG 运行的时间表。

from airflow.timetables.trigger import CronTriggerTimetable


@dag(schedule=CronTriggerTimetable("0 1 * * 3", timezone="UTC"), ...)  # At 01:00 on Wednesday
def example_dag():
    pass

您还可以向时间表提供静态数据间隔。可选的 interval 参数必须是一个 datetime.timedeltadateutil.relativedelta.relativedelta。使用这些参数时,触发的 DAG 运行的数据间隔跨越指定的持续时间,并以触发时间为结束

from datetime import timedelta

from airflow.timetables.trigger import CronTriggerTimetable


@dag(
    # Runs every Friday at 18:00 to cover the work week (9:00 Monday to 18:00 Friday).
    schedule=CronTriggerTimetable(
        "0 18 * * 5",
        timezone="UTC",
        interval=timedelta(days=4, hours=9),
    ),
    ...,
)
def example_dag():
    pass

MultipleCronTriggerTimetable

这类似于CronTriggerTimetable,但它接受多个 cron 表达式。只要任何表达式与时间匹配,就会调度一次 DAG 运行。当所需的调度无法用一个单一的 cron 表达式表示时,这尤其有用。

from airflow.timetables.trigger import MultipleCronTriggerTimetable


# At 1:10 and 2:40 each day.
@dag(schedule=MultipleCronTriggerTimetable("10 1 * * *", "40 2 * * *", timezone="UTC"), ...)
def example_dag():
    pass

CronTriggerTimetable 相同的可选 interval 参数也可用。

from datetime import timedelta

from airflow.timetables.trigger import MultipleCronTriggerTimetable


@dag(
    schedule=MultipleCronTriggerTimetable(
        "10 1 * * *",
        "40 2 * * *",
        timezone="UTC",
        interval=timedelta(hours=1),
    ),
    ...,
)
def example_dag():
    pass

DeltaDataIntervalTimetable

以时间差调度数据间隔的时间表。您可以通过将 datetime.timedeltadateutil.relativedelta.relativedelta 提供给 DAG 的 schedule 参数来选择它。

此时间表侧重于数据间隔值,不一定将执行日期与任意边界(例如,一天或一小时的开始)对齐。

@dag(schedule=datetime.timedelta(minutes=30))
def example_dag():
    pass

CronDataIntervalTimetable

接受 cron 表达式,根据每个 cron 触发点之间的时间间隔创建数据间隔,并在每个数据间隔结束时触发 DAG 运行的时间表。

DAGs 文档中所述,通过将有效的 cron 表达式作为字符串提供给 DAG 的 schedule 参数来选择此时间表。

@dag(schedule="0 1 * * 3")  # At 01:00 on Wednesday.
def example_dag():
    pass

EventsTimetable

传递一个 datetime 列表,让 DAG 在这些时间点之后运行。这对于基于体育赛事、计划的沟通活动以及其他任意、不规则但可预测的调度很有用。

事件列表必须是有限且合理大小的,因为它必须在每次解析 DAG 时加载。可选地,使用 restrict_to_events 标志来强制手动运行 DAG,手动运行使用最近或最早事件的时间作为数据间隔。否则,手动运行的 data_interval_startdata_interval_end 等于手动运行开始的时间。您还可以使用 description 参数命名这组事件,该名称将显示在 Airflow UI 中。

from airflow.timetables.events import EventsTimetable


@dag(
    schedule=EventsTimetable(
        event_dates=[
            pendulum.datetime(2022, 4, 5, 8, 27, tz="America/Chicago"),
            pendulum.datetime(2022, 4, 17, 8, 27, tz="America/Chicago"),
            pendulum.datetime(2022, 4, 22, 20, 50, tz="America/Chicago"),
        ],
        description="My Team's Baseball Games",
        restrict_to_events=False,
    ),
    ...,
)
def example_dag():
    pass

基于资产事件和时间结合的调度

将条件资产表达式与基于时间的调度结合可以增强调度的灵活性。

AssetOrTimeSchedule 是一种专门的时间表,它允许基于时间调度和资产事件来调度 DAG。它还促进了按照传统时间表创建计划运行,以及独立运行的资产触发运行。

此功能在 DAG 需要在资产更新时运行并且还需要定期运行检查或更新的场景中特别有用。它确保了工作流对数据更改保持响应,并始终执行定期检查或更新。

以下是使用 AssetOrTimeSchedule 的 DAG 示例

from airflow.timetables.assets import AssetOrTimeSchedule
from airflow.timetables.trigger import CronTriggerTimetable


@dag(
    schedule=AssetOrTimeSchedule(
        timetable=CronTriggerTimetable("0 1 * * 3", timezone="UTC"), assets=(dag1_asset & dag2_asset)
    )
    # Additional arguments here, replace this comment with actual arguments
)
def example_dag():
    # DAG tasks go here
    pass

时间表对比

“触发器”和“数据间隔”时间表之间的区别

Airflow 为 cron 和 delta 调度提供了两组时间表

是否关心 数据间隔

触发器时间表包含数据间隔。这意味着 data_interval_startdata_interval_end 的值(以及旧版 execution_date)是相同的;即 DAG 运行被触发的时间。

对于数据间隔时间表,data_interval_startdata_interval_end 的值(以及旧版 execution_date)是不同的。data_interval_start 是 DAG 运行被触发的时间,而 data_interval_end 是间隔的结束时间。

补齐 行为

默认情况下,catchup 设置为 False。这可以防止在以下场景中运行不必要的 DAG: - 如果您创建了一个开始日期在过去的新的 DAG,并且不想运行过去的 DAG。如果 catchupTrue,Airflow 会运行在该时间间隔内本应运行的所有 DAG。 - 如果您暂停了现有的 DAG,然后在稍后的日期重新启用它,catchupFalse 意味着 Airflow 不会运行在暂停期间本应运行的 DAG。

在这些场景中,run_id 中的 logical_date 基于时间表如何处理数据间隔。

您可以使用 Airflow 配置 [scheduler] catchup_by_default 更改默认的 catchup 行为。

有关使用 catchup 时如何触发 DAG 运行的更多信息,请参见补齐

DAG 运行被触发的时间

触发器和数据间隔时间表在同一时间触发 DAG 运行。但是,run_id 的时间戳对于每种时间表都不同。这是因为 run_id 基于 logical_date

例如,假设有一个 cron 表达式 @daily0 0 * * *,计划每天 12AM 运行。如果您在 1 月 31 日 3PM 启用使用这两种时间表的 DAG: - CronTriggerTimetable 在 2 月 1 日 12AM 创建一个新的 DAG 运行。run_id 时间戳是 2 月 1 日午夜。 - CronDataIntervalTimetable 立即创建一个新的 DAG 运行,因为从 1 月 31 日 12AM 开始的每日时间间隔的 DAG 运行尚未发生。run_id 时间戳是 1 月 31 日午夜,因为那是数据间隔的开始。

以下是另一个示例,显示了跳过 DAG 运行情况下的区别

假设有两个运行中的 DAG 使用 cron 表达式 @daily0 0 * * *,并使用两种不同的时间表。如果您在 1 月 31 日 3PM 暂停 DAG,并在 2 月 2 日 3PM 重新启用它们: - CronTriggerTimetable 会跳过本应在 2 月 1 日和 2 日触发的 DAG 运行。下一次 DAG 运行将在 2 月 3 日 12AM 触发。 - CronDataIntervalTimetable 只跳过本应在 2 月 1 日触发的 DAG 运行。在您重新启用 DAG 后,会立即触发 2 月 2 日的 DAG 运行。

在这些示例中,您可以看到触发器时间表如何更直观地创建 DAG 运行,并且与人们期望工作流行为的方式相似,而数据间隔时间表则侧重于它处理的数据间隔,不反映工作流自身的属性。

Cron 和 Delta 数据间隔时间表之间的区别

选择 DeltaDataIntervalTimetableCronDataIntervalTimetable 取决于您的用例。如果您在 2 月 1 日 01:05 启用一个 DAG,下表总结了根据 3 个参数:schedulestart_datecatchup,创建的 DAG 运行以及它们涵盖的数据间隔。

调度参数 (schedule)

开始日期 (start_date)

补齐 (catchup)

涵盖的间隔

备注

*/30 * * * *

year-02-01

True

  • 00:00 - 00:30

  • 00:30 - 01:00

行为与使用 timedelta 对象相同。

*/30 * * * *

year-02-01

False

  • 00:30 - 01:00

*/30 * * * *

year-02-01 00:10

True

  • 00:30 - 01:00

间隔 00:00 - 00:30 不在开始日期之后,因此被跳过。

*/30 * * * *

year-02-01 00:10

False

  • 00:30 - 01:00

无论开始日期如何,数据间隔都与小时/天等边界对齐。

datetime.timedelta(minutes=30)

year-02-01

True

  • 00:00 - 00:30

  • 00:30 - 01:00

行为与使用 cron 表达式相同。

datetime.timedelta(minutes=30)

year-02-01

False

  • 00:35 - 01:05

间隔不与开始日期对齐,而是与当前时间对齐。

datetime.timedelta(minutes=30)

year-02-01 00:10

True

  • 00:10 - 00:40

间隔与开始日期对齐。下一个将在 5 分钟后触发,涵盖 00:40 - 01:10。

datetime.timedelta(minutes=30)

year-02-01 00:10

False

  • 00:35 - 01:05

间隔与当前时间对齐。下一次运行将在 30 分钟后触发。

本条目有帮助吗?