使用时间表自定义 DAG 调度

对于我们的示例,假设一家公司希望在每个工作日结束后运行一个作业,以处理工作日收集的数据。对此最直观的回答是 schedule="0 0 * * 1-5"(星期一至星期五的午夜),但这意味着星期五收集的数据不会在星期五结束后立即处理,而是在下星期一处理,并且该运行的时间间隔将从星期五午夜到星期一午夜。此外,上述调度字符串无法跳过在节假日时的处理。我们想要的是

  • 为每个星期一、星期二、星期三、星期四和星期五安排一次运行。运行的数据间隔将涵盖从每一天的午夜到次日的午夜(例如,2021-01-01 00:00:00 到 2021-01-02 00:00:00)。

  • 每个运行将在数据间隔结束后立即创建。涵盖星期一的运行在星期二午夜发生,依此类推。涵盖星期五的运行在星期六午夜发生。在星期日和星期一午夜不会发生任何运行。

  • 不要在定义的节假日安排运行。

为简单起见,在本示例中,我们只处理 UTC 日期时间。

注意

自定义时间表返回的所有日期时间值必须为“感知”,即包含时区信息。此外,它们必须使用 pendulum 的日期时间和时区类型。

时间表注册

时间表必须是 Timetable 的子类,并作为 插件 的一部分进行注册。以下是我们实现新时间表的框架

from airflow.plugins_manager import AirflowPlugin
from airflow.timetables.base import Timetable


class AfterWorkdayTimetable(Timetable):
    pass


class WorkdayTimetablePlugin(AirflowPlugin):
    name = "workday_timetable_plugin"
    timetables = [AfterWorkdayTimetable]

接下来,我们将开始将代码放入 AfterWorkdayTimetable。在实现完成后,我们应该能够在 DAG 文件中使用时间表

import pendulum

from airflow import DAG
from airflow.example_dags.plugins.workday import AfterWorkdayTimetable


with DAG(
    dag_id="example_after_workday_timetable_dag",
    start_date=pendulum.datetime(2021, 3, 10, tz="UTC"),
    schedule=AfterWorkdayTimetable(),
    tags=["example", "timetable"],
):
    ...

定义调度逻辑

当 Airflow 的调度程序遇到 DAG 时,它将调用这两种方法之一来了解何时调度 DAG 的下一次运行。

  • next_dagrun_info:调度程序使用此方法了解时间表的常规计划,即我们示例中的“每个工作日,在工作日结束时运行”部分。

  • infer_manual_data_interval:当 DAG 运行被手动触发(例如,从 Web UI),调度程序使用此方法了解如何反向推断非计划运行的数据间隔。

我们将从 infer_manual_data_interval 开始,因为它比较简单

airflow/example_dags/plugins/workday.py[源代码]

def infer_manual_data_interval(self, run_after: DateTime) -> DataInterval:
    start = DateTime.combine((run_after - timedelta(days=1)).date(), Time.min).replace(tzinfo=UTC)
    # Skip backwards over weekends and holidays to find last run
    start = self.get_next_workday(start, incr=-1)
    return DataInterval(start=start, end=(start + timedelta(days=1)))

该方法接受一个参数 run_after,一个 pendulum.DateTime 对象,表示 DAG 何时被外部触发。由于我们的时间表为每个完整的工作日创建一个数据间隔,因此此处推断的数据间隔通常应从 run_after 前一天的午夜开始,但如果 run_after 恰逢星期日或星期一(即前一天是星期六或星期日),则应将其进一步推回到上个星期五。一旦我们知道了间隔的开始,结束时间就是开始时间之后的一个完整的一天。然后,我们创建一个 DataInterval 对象来描述此间隔。

接下来是 next_dagrun_info 的实现

airflow/example_dags/plugins/workday.py[源代码]

def next_dagrun_info(
    self,
    *,
    last_automated_data_interval: DataInterval | None,
    restriction: TimeRestriction,
) -> DagRunInfo | None:
    if last_automated_data_interval is not None:  # There was a previous run on the regular schedule.
        last_start = last_automated_data_interval.start
        next_start = DateTime.combine((last_start + timedelta(days=1)).date(), Time.min)
    # Otherwise this is the first ever run on the regular schedule...
    elif (earliest := restriction.earliest) is None:
        return None  # No start_date. Don't schedule.
    elif not restriction.catchup:
        # If the DAG has catchup=False, today is the earliest to consider.
        next_start = max(earliest, DateTime.combine(Date.today(), Time.min))
    elif earliest.time() != Time.min:
        # If earliest does not fall on midnight, skip to the next day.
        next_start = DateTime.combine(earliest.date() + timedelta(days=1), Time.min)
    else:
        next_start = earliest
    # Skip weekends and holidays
    next_start = self.get_next_workday(next_start.replace(tzinfo=UTC))

    if restriction.latest is not None and next_start > restriction.latest:
        return None  # Over the DAG's scheduled end; don't schedule.
    return DagRunInfo.interval(start=next_start, end=(next_start + timedelta(days=1)))

此方法接受两个参数。 last_automated_data_interval 是一个 DataInterval 实例,表示此 DAG 的上一个非手动触发的运行的数据间隔,或者如果这是 DAG 第一次被调度,则为 Nonerestriction 封装了 DAG 及其任务如何指定计划,并包含三个属性

  • 最早:DAG 可能被安排的 earliest 时间。这是一个 pendulum.DateTime,它从 DAG 及其任务中的所有 start_date 参数计算而来,或者如果根本没有找到 start_date 参数,则为 None

  • 最晚:类似于 earliest,这是 DAG 可能被安排的 latest 时间,它从 end_date 参数计算而来。

  • 追赶:一个布尔值,反映 DAG 的 catchup 参数。

注意

earliestlatest 都适用于 DAG 运行的逻辑日期(数据间隔的开始),而不是运行的安排时间(通常在数据间隔结束之后)。

如果之前安排了一次运行,我们现在应该通过循环遍历后续日期来安排下一次非节假日工作日,找到一个不是星期六、星期日或美国节假日的日期。但是,如果没有之前安排的运行,我们会在 restriction.earliest 之后选择下一个非节假日工作日的午夜。还需要考虑 restriction.catchup——如果它是 False,即使 start_date 值在过去,我们也不能安排在当前时间之前。最后,如果我们计算出的数据间隔晚于 restriction.latest,我们必须尊重它,并且不通过返回 None 来安排运行。

如果我们决定安排一次运行,我们需要使用 DagRunInfo 来描述它。此类型有两个参数和属性

  • data_interval:一个 DataInterval 实例,用于描述下一次运行的数据间隔。

  • run_after:一个 pendulum.DateTime 实例,它告诉调度器何时可以安排 DAG 运行。

可以像这样创建一个 DagRunInfo

info = DagRunInfo(
    data_interval=DataInterval(start=start, end=end),
    run_after=run_after,
)

由于我们通常希望在数据间隔结束时立即安排运行,因此上面的 endrun_after 通常相同。因此,DagRunInfo 为此提供了一个快捷方式

info = DagRunInfo.interval(start=start, end=end)
assert info.data_interval.end == info.run_after  # Always True.

作为参考,以下是我们的插件和 DAG 文件的完整内容

airflow/example_dags/plugins/workday.py[源代码]

from pendulum import UTC, Date, DateTime, Time

from airflow.plugins_manager import AirflowPlugin
from airflow.timetables.base import DagRunInfo, DataInterval, Timetable

if TYPE_CHECKING:
    from airflow.timetables.base import TimeRestriction

log = logging.getLogger(__name__)
try:
    from pandas.tseries.holiday import USFederalHolidayCalendar

    holiday_calendar = USFederalHolidayCalendar()
except ImportError:
    log.warning("Could not import pandas. Holidays will not be considered.")
    holiday_calendar = None  # type: ignore[assignment]


class AfterWorkdayTimetable(Timetable):
    def get_next_workday(self, d: DateTime, incr=1) -> DateTime:
        next_start = d
        while True:
            if next_start.weekday() not in (5, 6):  # not on weekend
                if holiday_calendar is None:
                    holidays = set()
                else:
                    holidays = holiday_calendar.holidays(start=next_start, end=next_start).to_pydatetime()
                if next_start not in holidays:
                    break
            next_start = next_start.add(days=incr)
        return next_start
    def infer_manual_data_interval(self, run_after: DateTime) -> DataInterval:
        start = DateTime.combine((run_after - timedelta(days=1)).date(), Time.min).replace(tzinfo=UTC)
        # Skip backwards over weekends and holidays to find last run
        start = self.get_next_workday(start, incr=-1)
        return DataInterval(start=start, end=(start + timedelta(days=1)))
    def next_dagrun_info(
        self,
        *,
        last_automated_data_interval: DataInterval | None,
        restriction: TimeRestriction,
    ) -> DagRunInfo | None:
        if last_automated_data_interval is not None:  # There was a previous run on the regular schedule.
            last_start = last_automated_data_interval.start
            next_start = DateTime.combine((last_start + timedelta(days=1)).date(), Time.min)
        # Otherwise this is the first ever run on the regular schedule...
        elif (earliest := restriction.earliest) is None:
            return None  # No start_date. Don't schedule.
        elif not restriction.catchup:
            # If the DAG has catchup=False, today is the earliest to consider.
            next_start = max(earliest, DateTime.combine(Date.today(), Time.min))
        elif earliest.time() != Time.min:
            # If earliest does not fall on midnight, skip to the next day.
            next_start = DateTime.combine(earliest.date() + timedelta(days=1), Time.min)
        else:
            next_start = earliest
        # Skip weekends and holidays
        next_start = self.get_next_workday(next_start.replace(tzinfo=UTC))

        if restriction.latest is not None and next_start > restriction.latest:
            return None  # Over the DAG's scheduled end; don't schedule.
        return DagRunInfo.interval(start=next_start, end=(next_start + timedelta(days=1)))


class WorkdayTimetablePlugin(AirflowPlugin):
    name = "workday_timetable_plugin"
    timetables = [AfterWorkdayTimetable]


import pendulum

from airflow import DAG
from airflow.example_dags.plugins.workday import AfterWorkdayTimetable
from airflow.operators.empty import EmptyOperator


with DAG(
    dag_id="example_workday_timetable",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    schedule=AfterWorkdayTimetable(),
    tags=["example", "timetable"],
):
    EmptyOperator(task_id="run_this")

参数化时间表

有时我们需要将一些运行时参数传递给时间表。继续我们的 AfterWorkdayTimetable 示例,也许我们有在不同时区运行的 DAG,并且我们希望在第二天上午 8 点安排一些 DAG,而不是在午夜。我们希望执行类似以下操作,而不是为每个目的创建单独的时间表

class SometimeAfterWorkdayTimetable(Timetable):
    def __init__(self, schedule_at: Time) -> None:
        self._schedule_at = schedule_at

    def next_dagrun_info(self, last_automated_dagrun, restriction):
        ...
        end = start + timedelta(days=1)
        return DagRunInfo(
            data_interval=DataInterval(start=start, end=end),
            run_after=DateTime.combine(end.date(), self._schedule_at).replace(tzinfo=UTC),
        )

但是,由于时间表是 DAG 的一部分,因此我们需要告诉 Airflow 如何使用我们在 __init__ 中提供的上下文对其进行序列化。这是通过在我们的时间表类上实现另外两种方法来完成的

class SometimeAfterWorkdayTimetable(Timetable):
    ...

    def serialize(self) -> dict[str, Any]:
        return {"schedule_at": self._schedule_at.isoformat()}

    @classmethod
    def deserialize(cls, value: dict[str, Any]) -> Timetable:
        return cls(Time.fromisoformat(value["schedule_at"]))

当 DAG 被序列化时,调用 serialize 以获取 JSON 可序列化值。当调度程序访问序列化的 DAG 以重建时间表时,该值将传递给 deserialize

UI 中的时间表显示

默认情况下,自定义时间表在 UI 中以其类名显示(例如,“DAG”表中的时间表列)。可以通过覆盖 summary 属性来自定义此属性。这对于参数化时间表尤其有用,以便包含在 __init__ 中提供的参数。例如,对于我们的 SometimeAfterWorkdayTimetable 类,我们可以拥有

@property
def summary(self) -> str:
    return f"after each workday, at {self._schedule_at}"

因此,对于这样声明的 DAG

with DAG(
    schedule=SometimeAfterWorkdayTimetable(Time(8)),  # 8am.
    ...,
):
    ...

时间表列将显示 after each workday, at 08:00:00

另请参见

模块 airflow.timetables.base

公共接口经过大量记录,以解释子类应该实现什么。

UI 中的时间表说明显示

您还可以通过覆盖 description 属性为您的时间表实现提供说明。这对于在 UI 中为您的实现提供全面说明特别有用。例如,对于我们的 SometimeAfterWorkdayTimetable 类,我们可以有

description = "Schedule: after each workday"

如果您想派生说明,您还可以在 __init__ 中包装此说明。

def __init__(self) -> None:
    self.description = "Schedule: after each workday, at f{self._schedule_at}"

当您想提供与 summary 属性不同的全面说明时,这特别有用。

因此,对于这样声明的 DAG

with DAG(
    schedule=SometimeAfterWorkdayTimetable(Time(8)),  # 8am.
    ...,
):
    ...

i 图标将显示,Schedule: after each workday, at 08:00:00

另请参见

模块 airflow.timetables.interval

检查 CronDataIntervalTimetable 说明实现,它在 UI 中提供全面的 cron 说明。

更改生成的 run_id

2.4 版中的新增功能。

自 Airflow 2.4 起,时间表还负责为 DagRun 生成 run_id

例如,为了让 Run ID 显示运行开始的“对人友好的”日期(即数据间隔的结束,而不是当前使用的日期的开始),您可以向自定义时间表添加如下方法

def generate_run_id(
    self,
    *,
    run_type: DagRunType,
    logical_date: DateTime,
    data_interval: DataInterval | None,
    **extra,
) -> str:
    if run_type == DagRunType.SCHEDULED and data_interval:
        return data_interval.end.format("YYYY-MM-DD dddd")
    return super().generate_run_id(
        run_type=run_type, logical_date=logical_date, data_interval=data_interval, **extra
    )

请记住,RunID 限制为 250 个字符,并且在 DAG 中必须是唯一的。

此条目是否有帮助?