使用 Timetable 定制 DAG 调度¶
举例来说,假设一家公司希望在每个工作日结束后运行一个作业,以处理工作日期间收集的数据。第一个直观的答案是 schedule="0 0 * * 1-5"
(周一至周五午夜),但这意味着周五收集的数据不会在周五结束后立即处理,而是在下一个周一,并且该运行的数据区间将从周五午夜到周一午夜。此外,上述调度字符串无法跳过节假日处理。我们希望做到的是:
为每个周一、周二、周三、周四和周五调度一次运行。运行的数据区间覆盖从每天的午夜到第二天午夜(例如 2021-01-01 00:00:00 到 2021-01-02 00:00:00)。
每次运行将在数据区间结束后立即创建。覆盖周一的运行发生在周二午夜,以此类推。覆盖周五的运行发生在周六午夜。周日和周一的午夜不发生运行。
不在定义的节假日调度运行。
为简单起见,本例中我们仅处理 UTC 日期时间。
注意
自定义 Timetable 返回的所有日期时间值必须是“感知型”的,即包含时区信息。此外,它们必须使用 pendulum
的日期时间类型和时区类型。
Timetable 注册¶
Timetable 必须是 Timetable
的子类,并注册为 plugin 的一部分。以下是我们实现新 Timetable 的骨架代码:
from airflow.plugins_manager import AirflowPlugin
from airflow.timetables.base import Timetable
class AfterWorkdayTimetable(Timetable):
pass
class WorkdayTimetablePlugin(AirflowPlugin):
name = "workday_timetable_plugin"
timetables = [AfterWorkdayTimetable]
接下来,我们将开始编写 AfterWorkdayTimetable
的代码。实现完成后,我们就可以在 DAG 文件中使用这个 timetable 了。
import pendulum
from airflow.sdk import DAG
from airflow.example_dags.plugins.workday import AfterWorkdayTimetable
with DAG(
dag_id="example_after_workday_timetable_dag",
start_date=pendulum.datetime(2021, 3, 10, tz="UTC"),
schedule=AfterWorkdayTimetable(),
tags=["example", "timetable"],
):
...
定义调度逻辑¶
当 Airflow 的调度器遇到一个 DAG 时,它会调用以下两个方法之一来确定何时调度该 DAG 的下一次运行。
next_dagrun_info
: 调度器使用此方法了解 timetable 的常规调度,即示例中“每个工作日结束时运行一次”的部分。infer_manual_data_interval
: 当 DAG 运行被手动触发(例如,从 Web UI)时,调度器使用此方法了解如何反向推断非调度运行的数据区间。
我们将从 infer_manual_data_interval
开始,因为它相对简单。
src/airflow/example_dags/plugins/workday.py
def infer_manual_data_interval(self, run_after: DateTime) -> DataInterval:
start = DateTime.combine((run_after - timedelta(days=1)).date(), Time.min).replace(tzinfo=UTC)
# Skip backwards over weekends and holidays to find last run
start = self.get_next_workday(start, incr=-1)
return DataInterval(start=start, end=(start + timedelta(days=1)))
该方法接受一个参数 run_after
,它是一个 pendulum.DateTime
对象,指示 DAG 何时被外部触发。由于我们的 timetable 为每个完整工作日创建一个数据区间,这里推断出的数据区间通常应在 run_after
前一天的午夜开始,但如果 run_after
落在了周日或周一(即前一天是周六或周日),则应进一步推回到上一个周五。一旦确定了区间的开始时间,结束时间就是其后整整一天。然后我们创建一个 DataInterval
对象来描述这个区间。
接下来是 next_dagrun_info
的实现
src/airflow/example_dags/plugins/workday.py
def next_dagrun_info(
self,
*,
last_automated_data_interval: DataInterval | None,
restriction: TimeRestriction,
) -> DagRunInfo | None:
if last_automated_data_interval is not None: # There was a previous run on the regular schedule.
last_start = last_automated_data_interval.start
next_start = DateTime.combine((last_start + timedelta(days=1)).date(), Time.min)
# Otherwise this is the first ever run on the regular schedule...
elif (earliest := restriction.earliest) is None:
return None # No start_date. Don't schedule.
elif not restriction.catchup:
# If the DAG has catchup=False, today is the earliest to consider.
next_start = max(earliest, DateTime.combine(Date.today(), Time.min, tzinfo=UTC))
elif earliest.time() != Time.min:
# If earliest does not fall on midnight, skip to the next day.
next_start = DateTime.combine(earliest.date() + timedelta(days=1), Time.min)
else:
next_start = earliest
# Skip weekends and holidays
next_start = self.get_next_workday(next_start.replace(tzinfo=UTC))
if restriction.latest is not None and next_start > restriction.latest:
return None # Over the DAG's scheduled end; don't schedule.
return DagRunInfo.interval(start=next_start, end=(next_start + timedelta(days=1)))
此方法接受两个参数。last_automated_data_interval
是一个 DataInterval
实例,指示该 DAG 上一次非手动触发运行的数据区间;如果这是 DAG 首次被调度,则为 None
。restriction
封装了 DAG 及其任务如何指定调度,并包含三个属性:
earliest
:DAG 可以被调度的最早时间。这是一个pendulum.DateTime
对象,通过计算 DAG 及其任务的所有start_date
参数得出;如果完全没有找到start_date
参数,则为None
。latest
:类似于earliest
,这是 DAG 可以被调度的最晚时间,通过计算end_date
参数得出。catchup
:一个布尔值,反映 DAG 的catchup
参数。默认为False
。
注意
earliest
和 latest
都适用于 DAG 运行的逻辑日期(数据区间的开始),而不是运行将被调度的实际时间(通常在数据区间结束之后)。
如果之前有已调度的运行,我们现在应该通过循环后续日期来查找下一个非节假日工作日,该工作日不是周六、周日或美国节假日。然而,如果之前没有已调度的运行,我们则选取 restriction.earliest
之后的下一个非节假日工作日的午夜。restriction.catchup
也需要考虑——如果它为 False
,则即使 start_date
值在过去,我们也无法调度当前时间之前的运行。最后,如果我们计算出的数据区间晚于 restriction.latest
,我们必须遵守它,并返回 None
来指示不调度运行。
如果我们决定调度一次运行,我们需要使用 DagRunInfo
来描述它。该类型有两个参数和属性:
data_interval
:一个DataInterval
实例,描述下一次运行的数据区间。run_after
:一个pendulum.DateTime
实例,告诉调度器何时可以调度 DAG 运行。
一个 DagRunInfo
可以这样创建:
info = DagRunInfo(
data_interval=DataInterval(start=start, end=end),
run_after=run_after,
)
由于我们通常希望在数据区间结束后立即调度运行,上面的 end
和 run_after
通常是相同的。因此,DagRunInfo
提供了一个快捷方式:
info = DagRunInfo.interval(start=start, end=end)
assert info.data_interval.end == info.run_after # Always True.
作为参考,以下是我们的 plugin 和 DAG 文件的完整代码:
src/airflow/example_dags/plugins/workday.py
from pendulum import UTC, Date, DateTime, Time
from airflow.plugins_manager import AirflowPlugin
from airflow.timetables.base import DagRunInfo, DataInterval, Timetable
if TYPE_CHECKING:
from airflow.timetables.base import TimeRestriction
log = logging.getLogger(__name__)
try:
from pandas.tseries.holiday import USFederalHolidayCalendar
holiday_calendar = USFederalHolidayCalendar()
except ImportError:
log.warning("Could not import pandas. Holidays will not be considered.")
holiday_calendar = None # type: ignore[assignment]
class AfterWorkdayTimetable(Timetable):
def get_next_workday(self, d: DateTime, incr=1) -> DateTime:
next_start = d
while True:
if next_start.weekday() not in (5, 6): # not on weekend
if holiday_calendar is None:
holidays = set()
else:
holidays = holiday_calendar.holidays(start=next_start, end=next_start).to_pydatetime()
if next_start not in holidays:
break
next_start = next_start.add(days=incr)
return next_start
def infer_manual_data_interval(self, run_after: DateTime) -> DataInterval:
start = DateTime.combine((run_after - timedelta(days=1)).date(), Time.min).replace(tzinfo=UTC)
# Skip backwards over weekends and holidays to find last run
start = self.get_next_workday(start, incr=-1)
return DataInterval(start=start, end=(start + timedelta(days=1)))
def next_dagrun_info(
self,
*,
last_automated_data_interval: DataInterval | None,
restriction: TimeRestriction,
) -> DagRunInfo | None:
if last_automated_data_interval is not None: # There was a previous run on the regular schedule.
last_start = last_automated_data_interval.start
next_start = DateTime.combine((last_start + timedelta(days=1)).date(), Time.min)
# Otherwise this is the first ever run on the regular schedule...
elif (earliest := restriction.earliest) is None:
return None # No start_date. Don't schedule.
elif not restriction.catchup:
# If the DAG has catchup=False, today is the earliest to consider.
next_start = max(earliest, DateTime.combine(Date.today(), Time.min, tzinfo=UTC))
elif earliest.time() != Time.min:
# If earliest does not fall on midnight, skip to the next day.
next_start = DateTime.combine(earliest.date() + timedelta(days=1), Time.min)
else:
next_start = earliest
# Skip weekends and holidays
next_start = self.get_next_workday(next_start.replace(tzinfo=UTC))
if restriction.latest is not None and next_start > restriction.latest:
return None # Over the DAG's scheduled end; don't schedule.
return DagRunInfo.interval(start=next_start, end=(next_start + timedelta(days=1)))
class WorkdayTimetablePlugin(AirflowPlugin):
name = "workday_timetable_plugin"
timetables = [AfterWorkdayTimetable]
import pendulum
from airflow.sdk import DAG
from airflow.example_dags.plugins.workday import AfterWorkdayTimetable
from airflow.providers.standard.operators.empty import EmptyOperator
with DAG(
dag_id="example_workday_timetable",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=AfterWorkdayTimetable(),
tags=["example", "timetable"],
):
EmptyOperator(task_id="run_this")
参数化 Timetable¶
有时我们需要向 timetable 传递一些运行时参数。继续以 AfterWorkdayTimetable
为例,也许我们有一些 DAG 运行在不同的时区,并且我们希望在第二天上午 8 点调度一些 DAG,而不是午夜。我们不希望为每个目的创建单独的 timetable,而是希望这样做:
class SometimeAfterWorkdayTimetable(Timetable):
def __init__(self, schedule_at: Time) -> None:
self._schedule_at = schedule_at
def next_dagrun_info(self, last_automated_dagrun, restriction):
...
end = start + timedelta(days=1)
return DagRunInfo(
data_interval=DataInterval(start=start, end=end),
run_after=DateTime.combine(end.date(), self._schedule_at).replace(tzinfo=UTC),
)
然而,由于 timetable 是 DAG 的一部分,我们需要告诉 Airflow 如何使用我们在 __init__
中提供的上下文来序列化它。这可以通过在我们的 timetable 类中实现另外两个方法来实现:
class SometimeAfterWorkdayTimetable(Timetable):
...
def serialize(self) -> dict[str, Any]:
return {"schedule_at": self._schedule_at.isoformat()}
@classmethod
def deserialize(cls, value: dict[str, Any]) -> Timetable:
return cls(Time.fromisoformat(value["schedule_at"]))
当 DAG 被序列化时,会调用 serialize
来获取一个可 JSON 序列化的值。当调度器访问序列化的 DAG 以重建 timetable 时,会将该值传递给 deserialize
。
Timetable 在 UI 中的显示¶
默认情况下,自定义 timetable 在 UI 中以其类名显示(例如,“dags”表中的Schedule列)。可以通过重写 summary
属性来定制此显示。这对于参数化 timetable 尤其有用,可以将 __init__
中提供的参数包含在内。例如,对于我们的 SometimeAfterWorkdayTimetable
类,我们可以这样做:
@property
def summary(self) -> str:
return f"after each workday, at {self._schedule_at}"
因此,对于如下声明的 DAG:
with DAG(
schedule=SometimeAfterWorkdayTimetable(Time(8)), # 8am.
...,
):
...
Schedule 列将显示 after each workday, at 08:00:00
。
另请参阅
- 模块
airflow.timetables.base
公共接口有详细的文档,解释了子类应该实现什么。
Timetable 描述在 UI 中的显示¶
您还可以通过重写 description
属性为您的 Timetable 实现提供描述。这对于在 UI 中为您的实现提供全面的描述特别有用。例如,对于我们的 SometimeAfterWorkdayTimetable
类,我们可以这样做:
description = "Schedule: after each workday"
如果您想根据参数生成描述,也可以将其放在 __init__
中。
def __init__(self) -> None:
self.description = "Schedule: after each workday, at f{self._schedule_at}"
当您想提供与 summary
属性不同的全面描述时,这特别有用。
因此,对于如下声明的 DAG:
with DAG(
schedule=SometimeAfterWorkdayTimetable(Time(8)), # 8am.
...,
):
...
i 图标会显示 Schedule: after each workday, at 08:00:00
。
另请参阅
- 模块
airflow.timetables.interval
查看
CronDataIntervalTimetable
的 description 实现,它在 UI 中提供了全面的 cron 描述。
更改生成的 run_id
¶
添加于版本 2.4。
自 Airflow 2.4 起,Timetable 也负责生成 DagRun 的 run_id
。
例如,要让 Run ID 显示一个“人类友好”的运行开始日期(即数据区间的结束时间,而不是当前使用的开始时间),您可以向自定义 timetable 添加如下方法:
def generate_run_id(
self,
*,
run_type: DagRunType,
logical_date: DateTime,
data_interval: DataInterval | None,
**extra,
) -> str:
if run_type == DagRunType.SCHEDULED and data_interval:
return data_interval.end.format("YYYY-MM-DD dddd")
return super().generate_run_id(
run_type=run_type, logical_date=logical_date, data_interval=data_interval, **extra
)
请记住,RunID 的长度限制为 250 个字符,并且在同一个 DAG 内必须唯一。