airflow.providers.elasticsearch.log.es_task_handler
¶
模块内容¶
-
class
airflow.providers.elasticsearch.log.es_task_handler.
ElasticsearchTaskHandler
(base_log_folder: str, filename_template: str, log_id_template: str, end_of_log_mark: str, write_stdout: bool, json_format: bool, json_fields: str, host: str = 'localhost:9200', frontend: str = 'localhost:5601', es_kwargs: Optional[dict] = conf.getsection('elasticsearch_configs'))[source]¶ 基类:
airflow.utils.log.file_task_handler.FileTaskHandler
,airflow.utils.log.logging_mixin.LoggingMixin
ElasticsearchTaskHandler 是一个 Python 日志处理器,用于从 Elasticsearch 读取日志。注意日志不会直接索引到 Elasticsearch 中。相反,它将日志刷新到本地文件中。将日志索引到 Elasticsearch 中需要额外的软件设置,例如使用 Filebeat 和 Logstash。为了高效地查询和排序 Elasticsearch 结果,我们假设每条日志消息都有一个由任务实例主键组成的 log_id 字段:log_id = {dag_id}-{task_id}-{execution_date}-{try_number}。具有特定 log_id 的日志消息基于 offset 进行排序,offset 是一个唯一的整数,指示日志消息的顺序。此处的时间戳不可靠,因为多条日志消息可能具有相同的时间戳。
-
static
_clean_execution_date
(execution_date: datetime)[source]¶ 清理执行日期,使其在 Elasticsearch 中安全可查询,方法是移除保留字符。# https://elastic.ac.cn/guide/en/elasticsearch/reference/current/query-dsl-query-string-query.html#_reserved_characters
- 参数
execution_date -- DAG 运行的执行日期。
-
_read
(self, ti: TaskInstance, try_number: int, metadata: Optional[dict] = None)[source]¶ 流式日志端点。
- 参数
ti -- 任务实例对象
try_number -- 任务实例的 try_number
metadata -- 日志元数据,可用于流式日志读取和自动跟踪。
- 返回
一个包含主机、日志文档和元数据的元组列表。
-
static