将日志写入 Elasticsearch

可以将 Airflow 配置为从 Elasticsearch 读取任务日志,并可以选择以标准或 JSON 格式将日志写入标准输出。 这些日志稍后可以使用 fluentd、logstash 或其他工具收集并转发到 Elasticsearch 集群。

您可以选择将来自工作器的所有任务日志输出到最高父级进程,而不是标准文件位置。 这在 Kubernetes 等容器环境中提供了一些额外的灵活性,其中容器标准输出已经记录到主机节点。 从那里可以使用日志传送工具将它们转发到 Elasticsearch。 要使用此功能,请在 airflow.cfg 中设置 write_stdout 选项。 您还可以选择使用 json_format 选项以 JSON 格式输出日志。 Airflow 使用标准的 Python 日志记录模块,并且 JSON 字段直接从 LogRecord 对象中提取。 要使用此功能,请在 airflow.cfg 中设置 json_fields 选项。 将字段添加到要为日志收集的逗号分隔字符串中。 这些字段来自 logging 模块中的 LogRecord 对象。 有关不同属性的文档,请参见此处

首先,要使用处理程序,必须按如下方式配置 airflow.cfg

[logging]
remote_logging = True

[elasticsearch]
host = <host>:<port>

要以 JSON 格式将任务日志输出到标准输出,可以使用以下配置

[logging]
remote_logging = True

[elasticsearch]
host = <host>:<port>
write_stdout = True
json_format = True

通过 TLS 将日志写入 Elasticsearch

要将自定义配置添加到 ElasticSearch(例如,打开 ssl_verify,添加自定义自签名证书等),请在您的 airflow.cfg 中使用 elasticsearch_configs 设置

[logging]
remote_logging = True

[elasticsearch_configs]
verify_certs=True
ca_certs=/path/to/CA_certs

[elasticsearch] log_id_template 的更改

如果您需要对 [elasticsearch] log_id_template 进行更改,Airflow 2.3.0+ 能够跟踪旧值,以便仍然可以获取您现有的任务运行日志。 一旦您使用的是 Airflow 2.3.0+,通常您可以随意更改 log_id_template,Airflow 会跟踪这些更改。

但是,当您升级到 2.3.0+ 时,Airflow 可能无法正确保存您之前的 log_id_template。 如果升级后您发现无法再访问您的任务日志,请尝试在 log_template 表中添加一行,其中 id=0 包含您之前的 log_id_template。 例如,如果您在 2.2.5 中使用默认值

INSERT INTO log_template (id, filename, elasticsearch_id, created_at) VALUES (0, '{{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log', '{dag_id}-{task_id}-{execution_date}-{try_number}', NOW());

此条目有帮助吗?