airflow.providers.cncf.kubernetes.utils.pod_manager

启动 POD。

模块内容

PodPhase

可能的 Pod 阶段。

PodOperatorHookProtocol

协议,用于定义 KubernetesPodOperator 依赖的方法。

PodLoggingStatus

fetch_container_logs 退出时返回 Pod 的状态和上次日志时间。

PodManager

创建、监控以及以其他方式与 Kubernetes Pod 交互,以便与 KubernetesPodOperator 一起使用。

OnFinishAction

Pod 完成时采取的动作。

函数

should_retry_start_pod(exception)

检查异常是否指示瞬态错误并值得重试。

get_container_status(pod, container_name)

检索容器状态。

container_is_running(pod, container_name)

检查 V1Pod pod 以确定 container_name 是否正在运行。

container_is_completed(pod, container_name)

检查 V1Pod pod 以确定 container_name 是否已完成。

container_is_succeeded(pod, container_name)

检查 V1Pod pod 以确定 container_name 是否已完成并成功。

container_is_wait(pod, container_name)

检查 V1Pod pod 以确定 container_name 是否正在等待。

container_is_terminated(pod, container_name)

检查 V1Pod pod 以确定 container_name 是否已终止。

get_container_termination_message(pod, container_name)

check_exception_is_kubernetes_api_unauthorized(exc)

is_log_group_marker(line)

检查该行是否为日志组标记,例如 ::group::::endgroup::

exception airflow.providers.cncf.kubernetes.utils.pod_manager.PodLaunchFailedException[source]

基类:airflow.exceptions.AirflowException

当 pod 在 KubernetesPodOperator 中启动失败时。

airflow.providers.cncf.kubernetes.utils.pod_manager.should_retry_start_pod(exception)[source]

检查异常是否指示瞬态错误并值得重试。

class airflow.providers.cncf.kubernetes.utils.pod_manager.PodPhase[source]

可能的 Pod 阶段。

请参阅 https://kubernetes.ac.cn/docs/concepts/workloads/pods/pod-lifecycle/#pod-phase

PENDING = 'Pending'[source]
RUNNING = 'Running'[source]
FAILED = 'Failed'[source]
SUCCEEDED = 'Succeeded'[source]
terminal_states[source]
class airflow.providers.cncf.kubernetes.utils.pod_manager.PodOperatorHookProtocol[source]

基类:Protocol

协议,用于定义 KubernetesPodOperator 依赖的方法。

KubernetesPodOperator 的子类(例如 GKEStartPodOperator)可能会使用不扩展 KubernetesHook 的 hook。 我们使用此协议来记录 KPO 使用的方法,并确保这些方法存在于其他此类 hook 中。

property core_v1_client: kubernetes.client.CoreV1Api[source]

获取经过身份验证的客户端对象。

property is_in_cluster: bool[source]

公开 hook 是否配置了 load_incluster_config

get_pod(name, namespace)[source]

从 Kubernetes API 读取 pod 对象。

get_namespace()[source]

返回连接中定义的命名空间。

get_xcom_sidecar_container_image()[source]

返回连接中定义的 xcom sidecar 镜像。

get_xcom_sidecar_container_resources()[source]

返回连接中定义的 xcom sidecar 资源。

airflow.providers.cncf.kubernetes.utils.pod_manager.get_container_status(pod, container_name)[source]

检索容器状态。

airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_running(pod, container_name)[source]

检查 V1Pod pod 以确定 container_name 是否正在运行。

如果该容器存在且正在运行,则返回 True。否则返回 False。

airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_completed(pod, container_name)[source]

检查 V1Pod pod 以确定 container_name 是否已完成。

如果该容器存在且已完成,则返回 True。否则返回 False。

airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_succeeded(pod, container_name)[source]

检查 V1Pod pod 以确定 container_name 是否已完成并成功。

如果该容器存在、已完成且成功,则返回 True。否则返回 False。

airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_wait(pod, container_name)[source]

检查 V1Pod pod 以确定 container_name 是否正在等待。

如果该容器存在且正在等待,则返回 True。否则返回 False。

airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_terminated(pod, container_name)[source]

检查 V1Pod pod 以确定 container_name 是否已终止。

如果该容器存在且已终止,则返回 True。否则返回 False。

airflow.providers.cncf.kubernetes.utils.pod_manager.get_container_termination_message(pod, container_name)[source]
airflow.providers.cncf.kubernetes.utils.pod_manager.check_exception_is_kubernetes_api_unauthorized(exc)[source]
exception airflow.providers.cncf.kubernetes.utils.pod_manager.PodLaunchTimeoutException[source]

基类:airflow.exceptions.AirflowException

当 Pod 在指定的超时时间内未离开 Pending 阶段时。

exception airflow.providers.cncf.kubernetes.utils.pod_manager.PodNotFoundException[source]

基类:airflow.exceptions.AirflowException

预期的 Pod 在 kube-api 中不存在。

class airflow.providers.cncf.kubernetes.utils.pod_manager.PodLoggingStatus[source]

fetch_container_logs 退出时返回 Pod 的状态和上次日志时间。

running: bool[source]
last_log_time: pendulum.DateTime | None[source]
class airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager(kube_client, callbacks=None)[source]

基类: airflow.utils.log.logging_mixin.LoggingMixin

创建、监控以及以其他方式与 Kubernetes Pod 交互,以便与 KubernetesPodOperator 一起使用。

run_pod_async(pod, **kwargs)[source]

异步运行 POD。

delete_pod(pod)[source]

删除 POD。

create_pod(pod)[source]

异步启动 Pod。

await_pod_start(pod, startup_timeout=120, startup_check_interval=1)[source]

等待 Pod 达到 Pending 以外的阶段。

参数
  • pod (kubernetes.client.models.v1_pod.V1Pod) –

  • startup_timeout (int) – Pod 启动的超时时间(以秒为单位)(如果 Pod 处于 Pending 状态时间过长,则任务失败)

  • startup_check_interval (int) – 检查之间的间隔(以秒为单位)

返回

返回类型

None

await_container_completion(pod, container_name)[source]

等待给定 Pod 中的给定容器完成。

参数
  • pod (kubernetes.client.models.v1_pod.V1Pod) – 将被监控的 Pod 规范

  • container_name (str) – Pod 中要监控的容器的名称

await_pod_completion(pod, istio_enabled=False, container_name='base')[source]

监控 Pod 并返回最终状态。

参数
  • istio_enabled (bool) – 命名空间中是否启用了 istio

  • pod (kubernetes.client.models.v1_pod.V1Pod) – 将被监控的 Pod 规范

  • container_name (str) – pod 内容器的名称

返回

tuple[State, str | None]

返回类型

kubernetes.client.models.v1_pod.V1Pod

parse_log_line(line)[source]

解析 K8s 日志行并返回最终状态。

参数

line (str) – k8s 日志行

返回

时间戳和日志消息

返回类型

tuple[pendulum.DateTime | None, str]

container_is_running(pod, container_name)[source]

读取 pod 并检查容器是否正在运行。

container_is_terminated(pod, container_name)[source]

读取 pod 并检查容器是否已终止。

read_pod_logs(pod, container_name, tail_lines=None, timestamps=False, since_seconds=None, follow=True, post_termination_timeout=120, **kwargs)[source]

从 POD 读取日志。

read_pod_events(pod)[source]

从 POD 读取事件。

read_pod(pod)[source]

读取 POD 信息。

await_xcom_sidecar_container_start(pod, timeout=900, log_interval=30)[source]

在执行 do_xcom_push 之前,检查 sidecar 容器是否已达到“正在运行”状态。

extract_xcom(pod)[source]

检索 XCom 值并终止 xcom sidecar 容器。

extract_xcom_json(pod)[source]

检索 XCom 值并检查 xcom json 是否有效。

extract_xcom_kill(pod)[source]

终止 xcom sidecar 容器。

class airflow.providers.cncf.kubernetes.utils.pod_manager.OnFinishAction[source]

基类: str, enum.Enum

Pod 完成时采取的动作。

KEEP_POD = 'keep_pod'[source]
DELETE_POD = 'delete_pod'[source]
DELETE_SUCCEEDED_POD = 'delete_succeeded_pod'[source]
airflow.providers.cncf.kubernetes.utils.pod_manager.is_log_group_marker(line)[source]

检查该行是否为日志组标记,例如 ::group::::endgroup::

此条目是否有帮助?