任务日志

Airflow 以允许您在 Airflow UI 中单独查看每个任务的日志的方式写入任务的日志。核心 Airflow 提供了一个接口 FileTaskHandler,它将任务日志写入文件,并包含一个在任务运行时从 worker 提供日志的机制。Apache Airflow 社区还为许多服务发布了提供程序(提供程序包),其中一些提供程序提供了扩展 Apache Airflow 日志记录功能的处理程序。您可以在写入日志中看到所有这些提供程序。

当使用 S3、GCS、WASB 或 OSS 远程日志服务时,您可以在上传到远程位置后删除本地日志文件,方法是设置配置

[logging]
remote_logging = True
remote_base_log_folder = schema://path/to/remote/log
delete_local_logs = True

配置日志记录

对于默认处理程序 FileTaskHandler,您可以使用 base_log_folderairflow.cfg 中指定放置日志文件的目录。默认情况下,日志放置在 AIRFLOW_HOME 目录中。

注意

有关设置配置的更多信息,请参阅设置配置选项

在为任务命名日志文件时遵循默认模式

  • 对于普通任务:dag_id={dag_id}/run_id={run_id}/task_id={task_id}/attempt={try_number}.log

  • 对于动态映射的任务:dag_id={dag_id}/run_id={run_id}/task_id={task_id}/map_index={map_index}/attempt={try_number}.log

这些模式可以通过log_filename_template进行调整。

此外,您可以提供一个远程位置来存储当前日志和备份。

从代码写入任务日志

Airflow 使用标准的 Python logging 框架写入日志,并且在任务执行期间,根记录器被配置为写入任务的日志。

大多数运算符将自动将日志写入任务日志。这是因为它们有一个 log 记录器,您可以使用它来写入任务日志。此记录器由所有运算符都派生自的 LoggingMixin 创建和配置。但由于根记录器处理,任何将日志记录传播到根的(使用默认设置的)标准记录器也将写入任务日志。

因此,如果您想从您的自定义代码记录到任务日志,您可以执行以下任何操作

  • 使用 BaseOperator 中的 self.log 记录器进行记录

  • 使用标准的 print 语句打印到 stdout (不推荐,但在某些情况下可能很有用)

  • 使用标准记录器方法,即使用 Python 模块名称创建一个记录器并使用它写入任务日志

这是通常在 Python 代码中直接使用记录器的方式

import logging

logger = logging.getLogger(__name__)
logger.info("This is a log message")

日志行的分组

2.9.0 版本新增功能。

像 CI 管道一样,Airflow 日志也可能非常大,并且难以阅读。因此,有时将日志区域的部分分组并提供文本区域的折叠以隐藏不相关的内容会很有用。因此,Airflow 实现了与 GithubAzure DevOps 类似的日志消息分组,这样可以折叠文本区域。实现的方案是兼容的,这样在 CI 中生成输出的工具可以直接在 Airflow 中利用相同的体验。

通过添加带有开始和结束位置的日志标记,例如下面的示例,可以将日志消息分组

print("Here is some standard text.")
print("::group::Non important details")
print("bla")
print("debug messages...")
print("::endgroup::")
print("Here is again some standard text.")

当在 Web UI 中显示日志时,日志的显示将被压缩

[2024-03-08, 23:30:18 CET] {logging_mixin.py:188} INFO - Here is some standard text.
[2024-03-08, 23:30:18 CET] {logging_mixin.py:188} ⯈ Non important details
[2024-03-08, 23:30:18 CET] {logging_mixin.py:188} INFO - Here is again some standard text.

如果您单击日志文本标签,将显示详细的日志谎言。

[2024-03-08, 23:30:18 CET] {logging_mixin.py:188} INFO - Here is some standard text.
[2024-03-08, 23:30:18 CET] {logging_mixin.py:188} ⯆ Non important details
[2024-03-08, 23:30:18 CET] {logging_mixin.py:188} INFO - bla
[2024-03-08, 23:30:18 CET] {logging_mixin.py:188} INFO - debug messages...
[2024-03-08, 23:30:18 CET] {logging_mixin.py:188} ⯅⯅⯅ Log group end
[2024-03-08, 23:30:18 CET] {logging_mixin.py:188} INFO - Here is again some standard text.

日志的交错

Airflow 的远程任务日志处理程序大致可以分为两类:流式处理程序(例如 ElasticSearch、AWS Cloudwatch 和 GCP 操作日志记录,以前称为 stackdriver)和 blob 存储处理程序(例如 S3、GCS、WASB)。

对于 blob 存储处理程序,根据任务的状态,日志可能位于许多不同的位置和多个不同的文件中。因此,我们需要检查所有位置并交错我们找到的内容。为此,我们需要能够解析每一行的时间戳。如果您正在使用自定义格式化程序,则可能需要通过在 Airflow 设置 [logging] interleave_timestamp_parser 中提供可调用名称来覆盖默认解析器。

对于流式处理程序,无论任务阶段或执行位置如何,所有日志消息都可以使用相同的标识符发送到日志服务,因此通常不需要检查多个源和交错。

故障排除

如果您想检查当前设置了哪个任务处理程序,可以使用 airflow info 命令,如下例所示。

$ airflow info

Apache Airflow
version                | 2.9.0.dev0
executor               | LocalExecutor
task_logging_handler   | airflow.utils.log.file_task_handler.FileTaskHandler
sql_alchemy_conn       | postgresql+psycopg2://postgres:airflow@postgres/airflow
dags_folder            | /files/dags
plugins_folder         | /root/airflow/plugins
base_log_folder        | /root/airflow/logs
remote_base_log_folder |

[skipping the remaining outputs for brevity]

上面的 airflow info 输出被截断,仅显示与日志记录配置相关的部分。您还可以运行 airflow config list 来检查日志记录配置选项是否具有有效值。

高级配置

您可以配置高级功能 - 包括添加您自己的自定义任务日志处理程序(还有所有 airflow 组件的日志处理程序),以及为每个运算符、钩子和任务创建自定义日志处理程序。

从 worker 和触发器提供日志

大多数任务处理程序在任务完成后发送日志。为了实时查看日志,Airflow 启动一个 HTTP 服务器以在以下情况下提供日志

  • 如果使用 SequentialExecutorLocalExecutor,则在 airflow scheduler 运行时。

  • 如果使用 CeleryExecutor,则在 airflow worker 运行时。

在触发器中,除非使用 --skip-serve-logs 选项启动服务,否则会提供日志。

服务器在 [logging] 部分中的 worker_log_server_port 选项指定的端口上运行,触发器在 triggerer_log_server_port 选项指定的端口上运行。默认值分别为 8793 和 8794。Web 服务器和 worker 之间的通信使用 [webserver] 部分中 secret_key 选项指定的密钥进行签名。您必须确保密钥匹配,以便通信可以顺利进行。

我们正在使用 Gunicorn 作为 WSGI 服务器。其配置选项可以使用 GUNICORN_CMD_ARGS 环境变量覆盖。有关详细信息,请参阅 Gunicorn 设置

实现自定义文件任务处理程序

注意

这是一个高级主题,大多数用户应该能够直接使用写入日志中的现有处理程序。

在我们的提供程序中,我们有各种主要云提供商的多种选择。但是,如果您需要使用不同的服务实现日志记录,并且如果您决定实现自定义 FileTaskHandler,则需要注意一些设置,尤其是在触发器日志记录的上下文中。

触发器需要改变日志记录的设置方式。与任务相比,许多触发器在同一进程中运行,并且由于触发器在 asyncio 中运行,我们必须注意不要通过日志处理程序引入阻塞调用。并且由于处理程序行为的变化(有些写入文件,有些上传到 blob 存储,有些在到达时通过网络发送消息,有些在线程中这样做),我们需要某种方式让触发器知道如何使用它们。

为了实现这一点,我们有一些属性可以设置在处理器上,可以是实例级别也可以是类级别。这些参数不遵循继承,因为 FileTaskHandler 的子类在相关特性上可能与它不同。这些参数描述如下:

  • trigger_should_wrap:控制此处理器是否应由 TriggerHandlerWrapper 包装。当处理器的每个实例创建一个文件处理器,并将所有消息写入该文件时,这是必要的。

  • trigger_should_queue:控制触发器是否应在事件循环和处理器之间放置一个 QueueListener,以确保处理器中的阻塞 IO 不会中断事件循环。

  • trigger_send_end_marker:控制触发器完成时是否应向记录器发送 END 信号。它用于告知包装器关闭并删除特定于刚刚完成的触发器的单个文件处理器。

  • trigger_supported:如果 trigger_should_wraptrigger_should_queue 都不是 True,我们通常假设该处理器不支持触发器。但如果在这种情况下,处理器将 trigger_supported 设置为 True,那么我们仍然会在触发器启动时将处理器移动到根目录,以便它可以处理触发器消息。本质上,对于“原生”支持触发器的处理器,这应该为 true。StackdriverTaskHandler 就是一个这样的例子。

此条目是否有帮助?