airflow.providers.elasticsearch.log.es_task_handler

模块内容

ElasticsearchTaskHandler

ElasticsearchTaskHandler 是一个 Python 日志处理程序,用于从 Elasticsearch 读取日志。

属性

LOG_LINE_DEFAULTS

EsLogMsgType

USE_PER_RUN_LOG_ID

VALID_ES_CONFIG_KEYS

airflow.providers.elasticsearch.log.es_task_handler.LOG_LINE_DEFAULTS[source]
airflow.providers.elasticsearch.log.es_task_handler.EsLogMsgType[source]
airflow.providers.elasticsearch.log.es_task_handler.USE_PER_RUN_LOG_ID[source]
airflow.providers.elasticsearch.log.es_task_handler.VALID_ES_CONFIG_KEYS[source]
airflow.providers.elasticsearch.log.es_task_handler.get_es_kwargs_from_config()[source]
class airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler(base_log_folder, end_of_log_mark, write_stdout, json_format, json_fields, host_field='host', offset_field='offset', host='https://127.0.0.1:9200', frontend='localhost:5601', index_patterns=conf.get('elasticsearch', 'index_patterns'), index_patterns_callable=conf.get('elasticsearch', 'index_patterns_callable', fallback=''), es_kwargs='default_es_kwargs', **kwargs)[source]

基类: airflow.utils.log.file_task_handler.FileTaskHandler, airflow.utils.log.logging_mixin.ExternalLoggingMixin, airflow.utils.log.logging_mixin.LoggingMixin

ElasticsearchTaskHandler 是一个 Python 日志处理程序,用于从 Elasticsearch 读取日志。

请注意,Airflow 不处理将日志索引到 Elasticsearch 的过程。相反,Airflow 会将日志刷新到本地文件中。需要额外的软件设置才能将日志索引到 Elasticsearch 中,例如使用 Filebeat 和 Logstash。

为了有效地查询和排序 Elasticsearch 结果,此处理程序假设每个日志消息都有一个字段 log_id,它由 ti 主键组成:log_id = {dag_id}-{task_id}-{logical_date}-{try_number} 具有特定 log_id 的日志消息基于 offset 排序,offset 是一个唯一的整数,表示日志消息的顺序。这里的时间戳是不可靠的,因为多个日志消息可能具有相同的时间戳。

参数
  • base_log_folder (str) – 用于在本地存储日志的基本文件夹

  • log_id_template – 日志 ID 模板

  • host (str) – Elasticsearch 主机名

property log_name: str[source]

日志名称。

是否支持外部链接。

PAGE = 0[source]
MAX_LINE_PER_PAGE = 1000[source]
LOG_NAME = 'Elasticsearch'[source]
trigger_should_wrap = True[source]
static format_url(host)[source]

格式化给定的主机字符串,以确保它以 “http” 开头,并检查它是否代表有效的 URL。

参数 host

要格式化和检查的主机字符串。

emit(record)[source]

执行任何操作来实际记录指定的日志记录。

此版本旨在由子类实现,因此会引发 NotImplementedError。

set_context(ti, *, identifier=None)[source]

向 Airflow 任务处理器提供 task_instance 上下文。

参数
close()[source]

整理处理程序使用的任何资源。

此版本从内部处理程序映射 _handlers 中删除处理程序,该映射用于按名称查找处理程序。 子类应确保从重写的 close() 方法中调用它。

get_external_log_url(task_instance, try_number)[source]

为外部日志收集服务创建地址。

参数
返回

外部日志收集服务的 URL

返回类型

str

此条目是否有帮助?