airflow.providers.opensearch.log.os_task_handler
¶
模块内容¶
类¶
OpensearchTaskHandler 是一个 Python 日志处理程序,用于读取和写入 OpenSearch 的日志。 |
属性¶
- class airflow.providers.opensearch.log.os_task_handler.OpensearchTaskHandler(base_log_folder, end_of_log_mark, write_stdout, json_format, json_fields, host, port, username, password, host_field='host', offset_field='offset', index_patterns=conf.get('opensearch', 'index_patterns', fallback='_all'), index_patterns_callable=conf.get('opensearch', 'index_patterns_callable', fallback=''), os_kwargs='default_os_kwargs')[source]¶
基类:
airflow.utils.log.file_task_handler.FileTaskHandler
、airflow.utils.log.logging_mixin.ExternalLoggingMixin
、airflow.utils.log.logging_mixin.LoggingMixin
OpensearchTaskHandler 是一个 Python 日志处理程序,用于读取和写入 OpenSearch 的日志。
与 ElasticsearchTaskHandler 类似,Airflow 本身不处理日志索引。相反,日志会被刷新到本地文件,可能需要额外的软件(例如,Filebeat、Logstash)将日志发送到 OpenSearch。然后,此处理程序可以从 OpenSearch 获取和显示日志。
为了有效地查询和排序 Elasticsearch 结果,此处理程序假定每个日志消息都有一个字段 log_id,其中包含 ti 主键:log_id = {dag_id}-{task_id}-{logical_date}-{try_number}。具有特定 log_id 的日志消息基于 offset 排序,这是一个唯一的整数,表示日志消息的顺序。此处的时间戳不可靠,因为多个日志消息可能具有相同的时间戳。
- 参数
base_log_folder (str) – 用于本地存储日志的基本文件夹。
end_of_log_mark (str) – 表示日志结束的标记字符串。
write_stdout (bool) – 是否也将日志写入标准输出。
json_format (bool) – 是否将日志格式化为 JSON。
json_fields (str) – 要包含在 JSON 日志输出中的字段的逗号分隔列表。
host (str) – OpenSearch 主机名。
port (int) – OpenSearch 端口。
username (str) – 用于 OpenSearch 身份验证的用户名。
password (str) – 用于 OpenSearch 身份验证的密码。
host_field (str) – 日志中主机的字段名称(默认为“host”)。
offset_field (str) – 日志偏移量的字段名称(默认为“offset”)。
index_patterns (str) – 用于存储日志的索引模式或模板。
index_patterns_callable (str) – 可调用对象,用于根据上下文动态生成索引模式。
os_kwargs (dict | None | Literal[default_os_kwargs]) – 其他 OpenSearch 客户端选项。可以将其设置为“default_os_kwargs”,以从 Airflow 设置加载默认配置。
- set_context(ti, *, identifier=None)[source]¶
向 airflow 任务处理程序提供 task_instance 上下文。
- 参数
ti (airflow.models.taskinstance.TaskInstance) – 任务实例对象
identifier (str | None) – 如果设置,则标识从与任务实例相关的异常场景中中继日志的 Airflow 组件