airflow.providers.cncf.kubernetes.operators.job

执行一个 Kubernetes Job。

属性

log

KubernetesJobOperator

执行一个 Kubernetes Job。

KubernetesDeleteJobOperator

删除一个 Kubernetes Job。

KubernetesPatchJobOperator

更新一个 Kubernetes Job。

模块内容

airflow.providers.cncf.kubernetes.operators.job.log[源码]
class airflow.providers.cncf.kubernetes.operators.job.KubernetesJobOperator(*, job_template_file=None, full_job_spec=None, backoff_limit=None, completion_mode=None, completions=None, manual_selector=None, parallelism=None, selector=None, suspend=None, ttl_seconds_after_finished=None, wait_until_job_complete=False, job_poll_interval=10, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[源码]

基类: airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator

执行一个 Kubernetes Job。

参阅

有关如何使用此运算符的更多信息,请参阅指南:KubernetesJobOperator

注意

如果您使用 Google Kubernetes Engine 并且 Airflow 未在同一集群中运行,请考虑使用 GKEStartJobOperator,它简化了授权过程。

参数:
  • job_template_file (str | None) – job 模板文件的路径 (模板化)

  • full_job_spec (kubernetes.client.models.V1Job | None) – 完整的 JodSpec

  • backoff_limit (int | None) – 指定标记此 job 失败前的重试次数。默认为 6

  • completion_mode (str | None) – CompletionMode 指定 Pod 完成情况的跟踪方式。可以是 NonIndexed(默认)或 Indexed

  • completions (int | None) – 指定此 job 应该运行的成功完成的 pod 的期望数量。

  • manual_selector (bool | None) – manualSelector 控制 pod 标签和 pod 选择器的生成。

  • parallelism (int | None) – 指定此 job 在任何给定时间应运行的最大期望 pod 数量。

  • selector (kubernetes.client.models.V1LabelSelector | None) – 此 V1JobSpec 的选择器。

  • suspend (bool | None) – Suspend 指定 Job 控制器是否应该创建 Pods。

  • ttl_seconds_after_finished (int | None) – ttlSecondsAfterFinished 限制已完成执行(完成或失败)的 Job 的生命周期。

  • wait_until_job_complete (bool) – 是否等待启动的 job 完成执行(完成或失败)。默认为 False。

  • job_poll_interval (float) – 轮询 job 状态的间隔秒数。默认为 10。在参数 wait_until_job_complete 设置为 True 时使用。

  • deferrable (bool) – 在可延迟模式下运行运算符。请注意,参数 wait_until_job_complete 必须设置为 True。

template_fields: collections.abc.Sequence[str][源码]
job_template_file = None[源码]
full_job_spec = None[源码]
job_request_obj: kubernetes.client.models.V1Job | None = None[源码]
job: kubernetes.client.models.V1Job | None = None[源码]
backoff_limit = None[源码]
completion_mode = None[源码]
completions = None[源码]
manual_selector = None[源码]
parallelism = None[源码]
selector = None[源码]
suspend = None[源码]
ttl_seconds_after_finished = None[源码]
wait_until_job_complete = False[源码]
job_poll_interval = 10[源码]
deferrable = True[源码]
property hook: airflow.providers.cncf.kubernetes.hooks.kubernetes.KubernetesHook[源码]
property job_client: kubernetes.client.BatchV1Api[源码]
create_job(job_request_obj)[源码]
execute(context)[源码]

根据 deferrable 参数异步或同步运行 pod。

execute_deferrable()[源码]
execute_complete(context, event, **kwargs)[源码]
static deserialize_job_template_file(path)[源码]

从文件生成 Job。

不幸的是,我们需要访问 kubernetes 客户端的私有方法 _ApiClient__deserialize_model。此问题正在此处跟踪:https://github.com/kubernetes-client/python/issues/977

参数:

path (str) – 文件的路径

返回:

一个 kubernetes.client.models.V1Job

返回类型:

kubernetes.client.models.V1Job

on_kill()[源码]

当任务实例被终止时,重写此方法以清理子进程。

在运算符中使用 threading、subprocess 或 multiprocessing 模块都需要进行清理,否则会留下僵尸进程。

build_job_request_obj(context=None)[源码]

根据 job 模板文件、完整的 job spec 和其他运算符参数返回 V1Job 对象。

V1Job 属性(按优先级顺序)派生自运算符参数、完整的 job spec、job 模板文件。

static reconcile_jobs(base_job, client_job)[源码]

合并 Kubernetes Job 对象。

参数:
  • base_job (kubernetes.client.models.V1Job) – 包含基础属性,如果这些属性存在于 client job 中,则会被覆盖;如果不存在,则保留。

  • client_job (kubernetes.client.models.V1Job | None) – 客户端希望创建的 job。

返回:

合并后的 jobs

返回类型:

kubernetes.client.models.V1Job

这不能递归完成,因为某些字段被覆盖,而另一些字段则被连接。

static reconcile_job_specs(base_spec, client_spec)[源码]

合并 Kubernetes JobSpec 对象。

参数:
  • base_spec (kubernetes.client.models.V1JobSpec | None) – 包含基础属性,如果这些属性存在于 client_spec 中,则会被覆盖;如果不存在,则保留。

  • client_spec (kubernetes.client.models.V1JobSpec | None) – 客户端希望创建的 spec。

返回:

合并后的 specs

返回类型:

kubernetes.client.models.V1JobSpec | None

class airflow.providers.cncf.kubernetes.operators.job.KubernetesDeleteJobOperator(*, name, namespace, kubernetes_conn_id=KubernetesHook.default_conn_name, config_file=None, in_cluster=None, cluster_context=None, delete_on_status=None, wait_for_completion=False, poll_interval=10.0, **kwargs)[源码]

基类: airflow.models.BaseOperator

删除一个 Kubernetes Job。

参阅

有关如何使用此运算符的更多信息,请参阅指南:KubernetesDeleteJobOperator

参数:
  • name (str) – Job 的名称。

  • namespace (str) – 在 kubernetes 中运行的命名空间。

  • kubernetes_conn_id (str | None) – Kubernetes 集群的kubernetes 连接 ID

  • config_file (str | None) – Kubernetes 配置文件路径。(模板化) 如果未指定,默认值为 ~/.kube/config

  • in_cluster (bool | None) – 使用 in_cluster 配置运行 kubernetes 客户端。

  • cluster_context (str | None) – 指向 kubernetes 集群的上下文。in_cluster 为 True 时忽略。如果为 None,则使用 current-context。(模板化)

  • delete_on_status (str | None) – 根据 job 状态执行删除操作的条件。可选值:None - 无论 job 状态如何都删除;“Complete” - 仅删除成功完成的 job;“Failed” - 仅删除失败的 job。(默认: None)

  • wait_for_completion (bool) – 是否等待 job 完成。(默认: False)

  • poll_interval (float) – 轮询 job 状态的间隔秒数。delete_on_status 参数设置时使用。(默认: 10.0)

template_fields: collections.abc.Sequence[str] = ('config_file', 'name', 'namespace', 'cluster_context')[源码]
name[源码]
namespace[源码]
kubernetes_conn_id = 'kubernetes_default'[源码]
config_file = None[源码]
in_cluster = None[源码]
cluster_context = None[源码]
delete_on_status = None[源码]
wait_for_completion = False[源码]
poll_interval = 10.0[源码]
property hook: airflow.providers.cncf.kubernetes.hooks.kubernetes.KubernetesHook[source]
property client: kubernetes.client.BatchV1Api[source]
execute(context)[source]

创建 operator 时进行派生。

context 是与渲染 jinja 模板时使用的相同的字典。

更多 context 请参阅 get_template_context。

class airflow.providers.cncf.kubernetes.operators.job.KubernetesPatchJobOperator(*, name, namespace, body, kubernetes_conn_id=KubernetesHook.default_conn_name, config_file=None, in_cluster=None, cluster_context=None, **kwargs)[source]

基类: airflow.models.BaseOperator

更新一个 Kubernetes Job。

参阅

有关如何使用此 operator 的更多信息,请参阅指南: KubernetesPatchJobOperator

参数:
  • name (str) – Job 的名称

  • namespace (str) – 在 kubernetes 中运行的 namespace

  • body (object) – 包含用于更新参数的 Job json 对象 https://kubernetes.ac.cn/docs/reference/generated/kubernetes-api/v1.25/#job-v1-batch 例如 {"spec": {"suspend": True}}

  • kubernetes_conn_id (str | None) – Kubernetes 集群的kubernetes 连接 ID

  • config_file (str | None) – Kubernetes 配置文件路径。(模板化) 如果未指定,默认值为 ~/.kube/config

  • in_cluster (bool | None) – 使用 in_cluster 配置运行 kubernetes 客户端。

  • cluster_context (str | None) – 指向 kubernetes 集群的上下文。in_cluster 为 True 时忽略。如果为 None,则使用 current-context。(模板化)

template_fields: collections.abc.Sequence[str] = ('config_file', 'name', 'namespace', 'body', 'cluster_context')[source]
name[source]
namespace[source]
body[source]
kubernetes_conn_id = 'kubernetes_default'[source]
config_file = None[source]
in_cluster = None[source]
cluster_context = None[source]
property hook: airflow.providers.cncf.kubernetes.hooks.kubernetes.KubernetesHook[source]
execute(context)[source]

创建 operator 时进行派生。

context 是与渲染 jinja 模板时使用的相同的字典。

更多 context 请参阅 get_template_context。

此条目有帮助吗?