将日志写入 Elasticsearch

可以将 Airflow 配置为从 Elasticsearch 读取任务日志,并可以选择以标准或 JSON 格式将日志写入标准输出。然后,可以使用 fluentd、logstash 或其他工具收集这些日志并将其转发到 Elasticsearch 集群。

您可以选择将来自工作器的所有任务日志输出到最高父级进程,而不是标准文件位置。这在 Kubernetes 等容器环境中提供了一些额外的灵活性,在这些环境中,容器标准输出已经记录到主机节点。然后,可以使用日志传送工具将它们转发到 Elasticsearch。要使用此功能,请在 airflow.cfg 中设置 write_stdout 选项。您还可以选择使用 json_format 选项以 JSON 格式输出日志。Airflow 使用标准的 Python 日志记录模块,并且 JSON 字段直接从 LogRecord 对象中提取。要使用此功能,请在 airflow.cfg 中设置 json_fields 选项。将要为日志收集的字段添加到逗号分隔的字符串中。这些字段来自 logging 模块中的 LogRecord 对象。有关不同属性的文档,请参见此处

首先,要使用处理程序,必须按如下方式配置 airflow.cfg

[logging]
# Airflow can store logs remotely in AWS S3, Google Cloud Storage or Elastic Search.
# Users must supply an Airflow connection id that provides access to the storage
# location. If remote_logging is set to true, see UPDATING.md for additional
# configuration requirements.
remote_logging = True

[elasticsearch]
host = <host>:<port>
log_id_template = {{dag_id}}-{{task_id}}-{{execution_date}}-{{try_number}}
end_of_log_mark = end_of_log
write_stdout =
json_fields =

要以 JSON 格式将任务日志输出到标准输出,可以使用以下配置

[logging]
# Airflow can store logs remotely in AWS S3, Google Cloud Storage or Elastic Search.
# Users must supply an Airflow connection id that provides access to the storage
# location. If remote_logging is set to true, see UPDATING.md for additional
# configuration requirements.
remote_logging = True

[elasticsearch]
host = <host>:<port>
log_id_template = {{dag_id}}-{{task_id}}-{{execution_date}}-{{try_number}}
end_of_log_mark = end_of_log
write_stdout = True
json_format = True
json_fields = asctime, filename, lineno, levelname, message

通过 TLS 将日志写入 Elasticsearch

要将自定义配置添加到 ElasticSearch(例如,启用 ssl_verify、添加自定义自签名证书等),请在 airflow.cfg 中使用 elasticsearch_configs 设置

[logging]
# Airflow can store logs remotely in AWS S3, Google Cloud Storage or Elastic Search.
# Users must supply an Airflow connection id that provides access to the storage
# location. If remote_logging is set to true, see UPDATING.md for additional
# configuration requirements.
remote_logging = True

[elasticsearch_configs]
use_ssl=True
verify_certs=True
ca_certs=/path/to/CA_certs

此条目有帮助吗?