airflow.providers.amazon.aws.log.s3_task_handler

模块内容

S3TaskHandler

S3TaskHandler 是一个 Python 日志处理程序,用于处理和读取任务实例日志。

class airflow.providers.amazon.aws.log.s3_task_handler.S3TaskHandler(base_log_folder, s3_log_folder, **kwargs)[源代码]

基类:airflow.utils.log.file_task_handler.FileTaskHandler, airflow.utils.log.logging_mixin.LoggingMixin

S3TaskHandler 是一个 Python 日志处理程序,用于处理和读取任务实例日志。

它扩展了 airflow FileTaskHandler 并上传到 S3 远程存储并从中读取。

trigger_should_wrap = True[源代码]
hook()[源代码]

返回 S3Hook。

set_context(ti, *, identifier=None)[源代码]

向 airflow 任务处理程序提供 task_instance 上下文。

一般来说返回 None。但如果属性 maintain_propagate 已设置为传播,则返回 sentinel MAINTAIN_PROPAGATE。这具有覆盖默认行为的效果,即在调用 set_context 时将 propagate 设置为 False。在撰写本文时,此功能仅在单元测试中使用。

参数
  • ti (airflow.models.taskinstance.TaskInstance) – 任务实例对象

  • identifier (str | None) – 如果设置,则将后缀添加到日志文件中。用于将异常消息从任务或触发器运行以外的上下文传递到任务日志时

close()[源代码]

关闭本地日志文件并将其上传到远程存储 S3。

s3_log_exists(remote_log_location)[源代码]

检查远程存储中是否存在 remote_log_location。

参数

remote_log_location (str) – 日志在远程存储中的位置

返回

如果位置存在则为 True,否则为 False

返回类型

bool

s3_read(remote_log_location, return_error=False)[源代码]

返回在 remote_log_location 找到的日志,如果没有找到日志或发生错误,则返回 ''。

参数
  • remote_log_location (str) – 日志在远程存储中的位置

  • return_error (bool) – 如果为 True,则在发生错误时返回字符串错误消息。否则,当发生错误时返回 ''。

返回

在 remote_log_location 找到的日志

返回类型

str

s3_write(log, remote_log_location, append=True, max_retry=1)[源代码]

将日志写入 remote_log_location;返回 True 或静默失败并返回 False

参数
  • log (str) – 要写入 remote_log_location 的日志

  • remote_log_location (str) – 日志在远程存储中的位置

  • append (bool) – 如果为 False,则覆盖任何现有日志文件。如果为 True,则新日志将附加到任何现有日志。

  • max_retry (int) – 上传失败时最大重试次数

返回

日志是否成功写入远程位置。

返回类型

bool

此条目是否有帮助?