airflow.providers.elasticsearch.log.es_task_handler

模块内容

airflow.providers.elasticsearch.log.es_task_handler.EsLogMsgType[source]
class airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler(base_log_folder: str, filename_template: str, log_id_template: str, end_of_log_mark: str, write_stdout: bool, json_format: bool, json_fields: str, host: str = 'localhost:9200', frontend: str = 'localhost:5601', es_kwargs: Optional[dict] = conf.getsection('elasticsearch_configs'))[source]

基类: airflow.utils.log.file_task_handler.FileTaskHandler, airflow.utils.log.logging_mixin.LoggingMixin

ElasticsearchTaskHandler 是一个 Python 日志处理器,用于从 Elasticsearch 读取日志。注意日志不会直接索引到 Elasticsearch 中。相反,它将日志刷新到本地文件中。将日志索引到 Elasticsearch 中需要额外的软件设置,例如使用 Filebeat 和 Logstash。为了高效地查询和排序 Elasticsearch 结果,我们假设每条日志消息都有一个由任务实例主键组成的 log_id 字段:log_id = {dag_id}-{task_id}-{execution_date}-{try_number}。具有特定 log_id 的日志消息基于 offset 进行排序,offset 是一个唯一的整数,指示日志消息的顺序。此处的时间戳不可靠,因为多条日志消息可能具有相同的时间戳。

PAGE = 0[source]
MAX_LINE_PER_PAGE = 1000[source]
LOG_NAME = Elasticsearch[source]
log_name[source]

日志名称

_render_log_id(self, ti: TaskInstance, try_number: int)[source]
static _clean_execution_date(execution_date: datetime)[source]

清理执行日期,使其在 Elasticsearch 中安全可查询,方法是移除保留字符。# https://elastic.ac.cn/guide/en/elasticsearch/reference/current/query-dsl-query-string-query.html#_reserved_characters

参数

execution_date -- DAG 运行的执行日期。

static _group_logs_by_host(logs)[source]
_read_grouped_logs(self)[source]
_read(self, ti: TaskInstance, try_number: int, metadata: Optional[dict] = None)[source]

流式日志端点。

参数
  • ti -- 任务实例对象

  • try_number -- 任务实例的 try_number

  • metadata -- 日志元数据,可用于流式日志读取和自动跟踪。

返回

一个包含主机、日志文档和元数据的元组列表。

_format_msg(self, log_line)[source]

当与 json_format 一起使用时,格式化 ES 记录以匹配 settings.LOG_FORMAT

es_read(self, log_id: str, offset: str, metadata: dict)[source]

返回 Elasticsearch 中匹配 log_id 的日志和下一个 offset。如果未找到日志或发生错误,则返回 ''。

参数
  • log_id (str) -- 要读取的日志的 log_id。

  • offset (str) -- 开始读取日志的 offset。

  • metadata (dict) -- 日志元数据,用于流式日志下载。

set_context(self, ti: TaskInstance)[source]

为 airflow 任务处理器提供 task_instance 上下文。

参数

ti -- 任务实例对象

close(self)[source]
get_external_log_url(self, task_instance: TaskInstance, try_number: int)[source]

创建一个外部日志收集服务的地址。

参数
  • task_instance -- 任务实例对象

  • try_number (Optional[int]) -- 要读取日志的任务实例 try_number。

类型

task_instance: TaskInstance

返回

外部日志收集服务的 URL

返回类型

str

class airflow.providers.elasticsearch.log.es_task_handler._ESJsonLogFmt(**kwargs)[source]

读取 ES 日志并重新格式化以匹配 settings.LOG_FORMAT 的辅助类

本条目有帮助吗?