常见问题

调度/DAG 文件解析

为什么任务未得到调度?

导致任务未得到调度的可能原因有很多。以下是部分常见原因

  • 您的脚本是否“编译”,Airflow 引擎是否可以解析它并找到您的 DAG 对象?要测试这一点,您可以运行 airflow dags list 并确认您的 DAG 显示在列表中。您还可以运行 airflow tasks list foo_dag_id --tree 并确认您的任务按预期显示在列表中。如果您使用 CeleryExecutor,您可能希望确认这在调度程序运行的位置和工作程序运行的位置均有效。

  • 包含您的 DAG 的文件是否在内容的某个位置包含字符串“airflow”和“DAG”?在搜索 DAG 目录时,Airflow 会忽略不包含“airflow”和“DAG”的文件,以防止 DagBag 解析导入与用户 DAG 并置的所有 Python 文件。

  • 您的 start_date 是否设置正确?对于基于时间的 DAG,任务将不会触发,直到开始日期之后的第一个调度间隔过去。

  • 您的 schedule 参数是否设置正确?默认值是一天 (datetime.timedelta(1))。您必须直接向您实例化的 DAG 对象指定不同的 schedule,而不是作为 default_param,因为任务实例不会覆盖其父 DAG 的 schedule

  • 您的 start_date 是否超出了您在 UI 中可以看到的范围?如果您将 start_date 设置为比如 3 个月前的时间,您将无法在 UI 的主视图中看到它,但您应该能够在 菜单 -> 浏览 ->任务 实例 中看到它。

  • 任务的依赖项是否满足?任务的上游任务实例需要处于 成功 状态。此外,如果您已设置 depends_on_past=True,则上一个任务实例需要已成功或已跳过(除非这是该任务的首次运行)。此外,如果 wait_for_downstream=True,请确保您理解它的含义 - 紧接 上一个 任务实例下游的所有任务都必须已成功或已跳过。您可以从任务的 任务 实例 详情 页面查看如何设置这些属性。

  • 您需要的 DagRun 是否已创建并处于活动状态?DagRun 表示整个 DAG 的特定执行,并具有状态(正在运行、成功、失败等)。调度程序在向前移动时会创建新的 DagRun,但绝不会回溯时间来创建新的 DagRun。调度程序仅评估 正在运行 的 DagRun,以查看它可以触发哪些任务实例。请注意,清除任务实例(从 UI 或 CLI)会将 DagRun 的状态重新设置为正在运行。您可以通过单击 DAG 的计划标记来批量查看 DagRun 列表并更改状态。

  • 您的 DAG 的 并发性 参数是否已达到?并发性 定义了 DAG 允许有多少个 正在运行 的任务实例,超出此点后,任务将被排队。

  • 您的 DAG 的 max_active_runs 参数是否已达到?max_active_runs 定义了允许有多少个 正在运行 的 DAG 并发实例。

您可能还想阅读有关 调度程序 的信息,并确保您完全了解调度程序周期。

如何提高 DAG 性能?

有一些 Airflow 配置可以允许更大的调度容量和频率

DAG 具有提高效率的配置

操作员或任务也具有提高效率和调度优先级的配置

  • max_active_tis_per_dag:此参数控制每个任务的 dag_runs 中并发运行的任务实例数。

  • pool:参见

  • priority_weight:参见 优先级权重

  • queue:仅适用于 CeleryExecutor 部署,参见 队列

如何减少 DAG 调度延迟/任务延迟?

Airflow 2.0 开箱即用的 DAG 调度延迟较低(尤其是与 Airflow 1.10.x 相比),但是,如果您需要更高的吞吐量,您可以 启动多个调度程序

如何根据另一个任务的失败触发任务?

您可以使用触发规则实现此目的。

如何控制不同 DAG 文件的 DAG 文件解析超时?

(仅对 Airflow >= 2.3.0 有效)

您可以在airflow_local_settings.py中添加一个get_dagbag_import_timeout函数,该函数在解析 DAG 文件之前立即调用。您可以根据 DAG 文件返回不同的超时值。当返回值小于或等于 0 时,表示在 DAG 解析期间没有超时。

airflow_local_settings.py
 def get_dagbag_import_timeout(dag_file_path: str) -> Union[int, float]:
     """
     This setting allows to dynamically control the DAG file parsing timeout.

     It is useful when there are a few DAG files requiring longer parsing times, while others do not.
     You can control them separately instead of having one value for all DAG files.

     If the return value is less than or equal to 0, it means no timeout during the DAG parsing.
     """
     if "slow" in dag_file_path:
         return 90
     if "no-timeout" in dag_file_path:
         return 0
     return conf.getfloat("core", "DAGBAG_IMPORT_TIMEOUT")

有关如何配置本地设置的详细信息,请参阅配置本地设置

当有大量(>1000)DAG 文件时,如何加快新文件的解析速度?

(仅对 Airflow >= 2.1.1 有效)

file_parsing_sort_mode更改为modified_time,将min_file_process_interval提高到600(10 分钟)、6000(100 分钟)或更高的值。

如果文件最近修改过,DAG 解析器将跳过min_file_process_interval检查。

如果从单独的文件导入/创建 DAG,此方法可能不起作用。示例:dag_file.py导入dag_loader.py,其中 DAG 文件的实际逻辑如下所示。在这种情况下,如果dag_loader.py已更新,但dag_file.py未更新,则更改将不会反映,直到达到min_file_process_interval,因为 DAG 解析器将查找dag_file.py文件的修改时间。

dag_file.py
 from dag_loader import create_dag

 globals()[dag.dag_id] = create_dag(dag_id, schedule, dag_number, default_args)
dag_loader.py
 from airflow import DAG
 from airflow.decorators import task

 import pendulum


 def create_dag(dag_id, schedule, dag_number, default_args):
     dag = DAG(
         dag_id,
         schedule=schedule,
         default_args=default_args,
         pendulum.datetime(2021, 9, 13, tz="UTC"),
     )

     with dag:

         @task()
         def hello_world():
             print("Hello World")
             print(f"This is DAG: {dag_number}")

         hello_world()

     return dag

DAG 构造

start_date 是怎么回事?

start_date 部分继承自 DagRun 时代之前,但它在很多方面仍然相关。在创建新 DAG 时,您可能希望为任务设置全局 start_date。这可以通过在 DAG() 对象中直接声明 start_date 来完成。DAG 的第一个 DagRun 将基于 start_date 之后的第一个完整的 data_interval 创建。例如,对于 start_date=datetime(2024, 1, 1)schedule="0 0 3 * *" 的 DAG,第一个 DAG 运行将在 2024-02-03 午夜触发,其中 data_interval_start=datetime(2024, 1, 3)data_interval_end=datetime(2024, 2, 3)。从那时起,调度程序将基于您的 schedule 创建新的 DagRun,并且相应的任务实例将在您的依赖项得到满足时运行。在向 DAG 引入新任务时,您需要特别注意 start_date,并且可能希望重新激活非活动 DagRun 以正确地加入新任务。

我们建议不要使用动态值作为 start_date,尤其是 datetime.now(),因为它可能会非常混乱。当周期关闭后任务将被触发,并且理论上 @hourly DAG 永远不会达到当前时间后的一小时,因为 now() 会不断变化。

以前,我们还建议使用与 DAG 的 schedule 相关的已舍入 start_date。这意味着 @hourly 将在 00:00 分钟:秒,@daily 作业在午夜,@monthly 作业在当月的第一天。这不再是必需的。Airflow 现在将自动对齐 start_dateschedule,通过使用 start_date 作为开始查找的时刻。

您可以使用任何传感器或 TimeDeltaSensor 来延迟调度间隔内任务的执行。虽然 schedule 确实允许指定 datetime.timedelta 对象,但我们建议使用宏或 cron 表达式,因为它强制执行此舍入调度理念。

在使用 depends_on_past=True 时,重要的是特别注意 start_date,因为过去依赖关系不仅在为任务指定的 start_date 的特定调度中强制执行。在引入新的 depends_on_past=True 时,及时观察 DagRun 活动状态也很重要,除非您计划对新任务运行回填。

同样重要的是要注意,任务的 start_date 在回填 CLI 命令的上下文中,会被回填的 start_date 命令覆盖。这允许对具有 depends_on_past=True 的任务进行回填以实际启动。如果不是这种情况,回填将不会启动。

使用时区

创建时区感知日期时间(例如 DAG 的 start_date)非常简单。只需确保使用 pendulum 提供时区感知日期即可。不要尝试使用标准库 时区,因为它们已知存在限制,我们明确禁止在 DAG 中使用它们。

execution_date 的含义是什么?

执行日期execution_date 是一个历史名称,表示所谓的逻辑日期,通常也是 DAG 运行所表示的数据区间的开始。

Airflow 被开发为 ETL 需求的解决方案。在 ETL 世界中,您通常会汇总数据。因此,如果您想汇总 2016-02-19 的数据,您将在 UTC 时间 2016-02-20 午夜进行汇总,这恰好是 2016-02-19 的所有数据可用之后。午夜之间的这个区间 2016-02-192016-02-20 称为数据区间,因为它表示 2016-02-19 日期的数据,因此该日期也称为运行的逻辑日期,或此 DAG 运行执行的日期,因此称为执行日期

为了向后兼容,日期时间值 execution_date 仍然是 Jinja 模板化字段和 Airflow 的 Python API 中具有各种格式的 模板变量。它还包含在提供给 Operator 的执行函数的上下文词典中。

class MyOperator(BaseOperator):
    def execute(self, context):
        logging.info(context["execution_date"])

但是,如果可能,您应始终使用 data_interval_startdata_interval_end,因为这些名称在语义上更正确,不易引起误解。

请注意,dsdata_interval_start 的 YYYY-MM-DD 形式)指的是日期*字符串*,而不是日期*开始*,这可能会让一些人感到困惑。

提示

有关 logical date 的更多信息,请参阅 数据间隔运行 DAG

如何动态创建 DAG?

Airflow 在您的 DAGS_FOLDER 中查找包含 DAG 对象的模块及其全局名称空间,并将找到的对象添加到 DagBag 中。了解这一点后,我们只需要一种方法来动态分配全局名称空间中的变量。使用标准库的 globals() 函数可以轻松地在 Python 中完成此操作,该函数的行为类似于一个简单的字典。

def create_dag(dag_id):
    """
    A function returning a DAG object.
    """

    return DAG(dag_id)


for i in range(10):
    dag_id = f"foo_{i}"
    globals()[dag_id] = DAG(dag_id)

    # or better, call a function that returns a DAG object!
    other_dag_id = f"bar_{i}"
    globals()[other_dag_id] = create_dag(other_dag_id)

尽管 Airflow 支持每个 Python 文件有多个 DAG 定义(无论是否动态生成),但并不推荐这样做,因为从故障和部署的角度来看,Airflow 希望 DAG 之间有更好的隔离,而同一文件中有多个 DAG 违背了这一点。

是否允许顶级 Python 代码?

虽然不建议在定义 Airflow 构造之外编写任何代码,但 Airflow 支持任何任意 Python 代码,只要它不破坏 DAG 文件处理器或将文件处理时间延长到 dagbag_import_timeout 值之后即可。

一个常见的示例是构建动态 DAG 时违反时间限制,这通常需要从另一个服务(如数据库)查询数据。与此同时,请求的服务被大量 DAG 文件处理器请求淹没,要求处理文件的数据。这些意外的交互可能会导致服务恶化,并最终导致 DAG 文件处理失败。

有关更多信息,请参阅DAG 编写最佳实践

宏是否在另一个 Jinja 模板中解析?

无法在另一个 Jinja 模板中呈现或任何 Jinja 模板。这通常在user_defined_macros中尝试。

dag = DAG(
    # ...
    user_defined_macros={"my_custom_macro": "day={{ ds }}"}
)

bo = BashOperator(task_id="my_task", bash_command="echo {{ my_custom_macro }}", dag=dag)

对于一个data_interval_start为 2020-01-01 00:00:00 的 DAG 运行,这将显示“day={{ ds }}”,而不是“day=2020-01-01”。

bo = BashOperator(task_id="my_task", bash_command="echo day={{ ds }}", dag=dag)

通过在 template_field 中直接使用 ds 宏,呈现的值将变为“day=2020-01-01”。

为什么next_dsprev_ds可能不包含预期值?

  • 在调度 DAG 时,next_ds next_ds_nodash prev_ds prev_ds_nodash使用logical_date和 DAG 的调度(如果适用)计算。如果您将schedule设置为None@once,则next_dsnext_ds_nodashprev_dsprev_ds_nodash值将设置为None

  • 在手动触发 DAG 时,将忽略调度,并且prev_ds == next_ds == ds

任务执行交互

TemplateNotFound是什么意思?

TemplateNotFound 错误通常是由于在传递触发 Jinja 模板化的运算符路径时与用户期望不一致造成的。一个常见的情况是使用 BashOperators

另一个经常被忽略的事实是,这些文件相对于管道文件所在的位置进行解析。你可以将其他目录添加到 DAG 对象的 template_searchpath 中,以允许其他非相对位置。

如何根据另一个任务的失败触发任务?

对于通过依赖关系关联的任务,你可以将 trigger_rule 设置为 TriggerRule.ALL_FAILED,如果任务执行依赖于其所有上游任务的失败,或者设置为 TriggerRule.ONE_FAILED,如果仅依赖于其中一个上游任务的失败。

import pendulum

from airflow.decorators import dag, task
from airflow.exceptions import AirflowException
from airflow.utils.trigger_rule import TriggerRule


@task()
def a_func():
    raise AirflowException


@task(
    trigger_rule=TriggerRule.ALL_FAILED,
)
def b_func():
    pass


@dag(schedule="@once", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"))
def my_dag():
    a = a_func()
    b = b_func()

    a >> b


dag = my_dag()

有关更多信息,请参阅 触发规则

如果任务未通过依赖关系关联,则需要 构建一个自定义运算符

Airflow UI

为什么我的任务在 UI 中失败,但没有日志?

日志 通常在任务达到终端状态时提供。有时,任务的正常生命周期会中断,并且任务的工作进程无法写入任务的日志。这通常出于以下两个原因之一

  1. 僵尸任务.

  2. 任务在排队后卡住后失败(Airflow 2.6.0+)。在 scheduler.task_queued_timeout 中排队时间超过的任务将被标记为失败,并且 Airflow UI 中将没有任务日志。

为每个任务设置重试次数可以极大地降低上述问题对工作流的影响。

如何阻止每个 Web 服务器多次发生同步权限?

airflow.cfgupdate_fab_perms 配置的值设置为 False

如何减少 Airflow UI 页面加载时间?

如果 DAG 加载时间较长,可以将 airflow.cfgdefault_dag_run_display_number 配置的值减小。此配置控制在 UI 中显示的 DAG 运行数,默认值为 25

为什么暂停 DAG 切换按钮变为红色?

如果由于任何原因暂停或取消暂停 DAG 失败,DAG 切换按钮将恢复到其前一个状态并变为红色。如果您观察到此行为,请尝试再次暂停 DAG,或者如果问题再次发生,请检查控制台或服务器日志。

MySQL 及 MySQL 变体数据库

“MySQL 服务器已消失”是什么意思?

您可能偶尔会遇到带有消息“MySQL 服务器已消失”的 OperationalError。这是由于连接池将连接保持打开状态的时间过长,并且您获得的旧连接已过期。为确保连接有效,您可以设置 sql_alchemy_pool_recycle(已弃用) 以确保在该秒数后使连接失效并创建新的连接。

Airflow 是否支持扩展 ASCII 或 Unicode 字符?

如果您打算在 Airflow 中使用扩展 ASCII 或 Unicode 字符,则必须向 MySQL 数据库提供正确的连接字符串,因为它们明确定义了字符集。

sql_alchemy_conn = mysql://airflow@localhost:3306/airflow?charset=utf8

您将遇到 WTForms 模板和其他 Airflow 模块抛出的 UnicodeDecodeError,如下所示。

'ascii' codec can't decode byte 0xae in position 506: ordinal not in range(128)

如何修复异常:全局变量 explicit_defaults_for_timestamp 需要为 (1)?

这意味着 explicit_defaults_for_timestamp 在您的 MySQL 服务器中已禁用,您需要通过以下方式启用它:

  1. my.cnf 文件的 mysqld 部分下设置 explicit_defaults_for_timestamp = 1

  2. 重启 MySQL 服务器。

此条目是否有用?