airflow.providers.cncf.kubernetes.executors.local_kubernetes_executor

LocalKubernetesExecutor

根据任务上定义的队列,在 LocalExecutor 和 KubernetesExecutor 之间进行选择。

模块内容

class airflow.providers.cncf.kubernetes.executors.local_kubernetes_executor.LocalKubernetesExecutor(local_executor, kubernetes_executor)[source]

Bases: airflow.executors.base_executor.BaseExecutor

根据任务上定义的队列,在 LocalExecutor 和 KubernetesExecutor 之间进行选择。

当任务的队列是配置中 [local_kubernetes_executor] 部分的 kubernetes_queue 值时(默认值:kubernetes),选择 KubernetesExecutor 运行任务;否则,使用 LocalExecutor。

supports_ad_hoc_ti_run: bool = True[source]
supports_pickling: bool = False[source]
supports_sentry: bool = False[source]
is_local: bool = False[source]
is_single_threaded: bool = False[source]
is_production: bool = True[source]
serve_logs: bool = True[source]
callback_sink: airflow.callbacks.base_callback_sink.BaseCallbackSink | None = None[source]
KUBERNETES_QUEUE[source]
local_executor[source]
kubernetes_executor[source]
property queued_tasks: dict[airflow.models.taskinstance.TaskInstanceKey, airflow.executors.base_executor.QueuedTaskInstanceType][source]

返回本地和 kubernetes 执行器的排队任务。

property running: set[airflow.models.taskinstance.TaskInstanceKey][source]

返回本地和 kubernetes 执行器的运行任务。

property job_id: int | str |None[source]

继承自 BaseExecutor 的属性。

因为它并非真正的执行器,而是执行器的包装器,我们将其实现为属性,以便我们可以拥有自定义设置器。

start()[source]

启动本地和 kubernetes 执行器。

property slots_available: int[source]

此执行器实例可以接受的新任务数量。

property slots_occupied[source]

此执行器实例当前正在管理的任务数量。

queue_command(task_instance, command, priority=1, queue=None)[source]

通过本地或 kubernetes 执行器对命令进行排队。

queue_task_instance(task_instance, mark_success=False, ignore_all_deps=False, ignore_depends_on_past=False, wait_for_past_depends_before_skipping=False, ignore_task_deps=False, ignore_ti_state=False, pool=None, cfg_path=None, **kwargs)[source]

通过本地或 kubernetes 执行器对任务实例进行排队。

get_task_log(ti, try_number)[source]

从 kubernetes 执行器获取任务日志。

has_task(task_instance)[source]

检查任务是在本地还是 kubernetes 执行器中处于排队或运行状态。

参数:

task_instance (airflow.models.taskinstance.TaskInstance) – 任务实例

返回:

如果此执行器知道该任务,则返回 True

返回类型:

bool

heartbeat()[source]

发送心跳以触发本地和 kubernetes 执行器中的新作业。

get_event_buffer(dag_ids=None)[source]

返回并刷新本地和 kubernetes 执行器的事件缓冲区。

参数:

dag_ids (list[str] | None) – 要返回事件的 dag_ids,如果为 None 则返回所有

返回:

一个事件字典

返回类型:

dict[airflow.models.taskinstance.TaskInstanceKey, airflow.executors.base_executor.EventBufferValueType]

try_adopt_task_instances(tis)[source]

尝试接管被 SchedulerJob 死亡而遗弃的运行中任务实例。

任何未被接管的任务都将由调度器清除(然后变得有资格重新调度)

返回:

任何未能被接管的任务实例

返回类型:

collections.abc.Sequence[airflow.models.taskinstance.TaskInstance]

cleanup_stuck_queued_tasks(tis)[source]
revoke_task(*, ti)[source]

尝试从执行器中移除任务。

它应尝试确保任务不再在 worker 上运行,并确保从内部数据结构中清除该任务。

它不应更改 Airflow 中任务的状态,或向事件缓冲区添加任何事件。

它不应引发任何错误。

参数:

ti (airflow.models.taskinstance.TaskInstance) – 要移除的任务实例

end()[source]

结束本地和 kubernetes 执行器。

terminate()[source]

终止本地和 kubernetes 执行器。

debug_dump()[source]

调试转储;响应调度器发出的 SIGUSR2 信号时调用。

send_callback(request)[source]

发送回调以执行。

参数:

request (airflow.callbacks.callback_requests.CallbackRequest) – 要执行的回调请求。

static get_cli_commands()[source]

提供要包含在 Airflow CLI 中的 CLI 命令。

覆盖此方法以通过 Airflow CLI 公开命令来管理此执行器。这可以是用于设置/拆卸执行器、检查状态等的命令。请确保为这些命令选择唯一的名称,以避免冲突。

此条目有帮助吗?