将日志写入 Elasticsearch¶
可以将 Airflow 配置为从 Elasticsearch 读取任务日志,并且可以选择以标准或 JSON 格式将日志写入 stdout。 这些日志稍后可以使用 fluentd、logstash 或其他工具收集并转发到 Elasticsearch 集群。
您可以选择将 worker 的所有任务日志输出到最高的父级进程,而不是标准的 文件位置。 这在 Kubernetes 等容器环境中提供了一些额外的灵活性,在这些环境中,容器 stdout 已经记录到主机节点。 从那里可以使用日志传输工具将它们转发到 Elasticsearch。 要使用此功能,请在 airflow.cfg
中设置 write_stdout
选项。 您还可以选择使用 json_format
选项以 JSON 格式输出日志。 Airflow 使用标准的 Python 日志模块,JSON 字段直接从 LogRecord 对象中提取。 要使用此功能,请在 airflow.cfg
中设置 json_fields
选项。 将您要收集的日志的字段添加到逗号分隔的字符串中。 这些字段来自 logging
模块中的 LogRecord 对象。 有关不同属性的文档可以在这里找到。
首先,要使用处理程序,必须如下配置 airflow.cfg
[logging]
remote_logging = True
[elasticsearch]
host = <host>:<port>
要以 JSON 格式将任务日志输出到 stdout,可以使用以下配置
[logging]
remote_logging = True
[elasticsearch]
host = <host>:<port>
write_stdout = True
json_format = True
通过 TLS 将日志写入 Elasticsearch¶
要向 ElasticSearch 添加自定义配置(例如,启用 ssl_verify
,添加自定义自签名证书等),请在您的 airflow.cfg
中使用 elasticsearch_configs
设置
[logging]
remote_logging = True
[elasticsearch_configs]
verify_certs=True
ca_certs=/path/to/CA_certs
Elasticsearch 外部链接¶
用户可以配置 Airflow 以显示指向 Elasticsearch 日志查看系统(例如 Kibana)的链接。
要启用它,必须按照以下示例在 airflow.cfg
中进行配置。 请注意 URL 中所需的 {log_id}
,在构建外部链接时,Airflow 会将此参数替换为用于写入日志的相同 log_id_template
(请参阅 将日志写入 Elasticsearch)。
[elasticsearch]
# Qualified URL for an elasticsearch frontend (like Kibana) with a template argument for log_id
# Code will construct log_id using the log_id template from the argument above.
# NOTE: scheme will default to https if one is not provided
frontend = <host_port>/{log_id}
更改 [elasticsearch] log_id_template
¶
如果您需要更改 [elasticsearch] log_id_template
,Airflow 2.3.0+ 能够跟踪旧值,因此仍然可以获取您现有的任务运行日志。一旦您使用了 Airflow 2.3.0+,通常您可以随意更改 log_id_template
,Airflow 会跟踪更改。
但是,当您升级到 2.3.0+ 时,Airflow 可能无法正确保存您之前的 log_id_template
。如果在升级后发现您的任务日志不再可访问,请尝试在 log_template
表中添加一行,其中 id=0
包含您之前的 log_id_template
。例如,如果您在 2.2.5 中使用了默认值
INSERT INTO log_template (id, filename, elasticsearch_id, created_at) VALUES (0, '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log', '{dag_id}-{task_id}-{execution_date}-{try_number}', NOW());