airflow.providers.celery.executors.celery_kubernetes_executor

CeleryKubernetesExecutor

CeleryKubernetesExecutor 由 CeleryExecutor 和 KubernetesExecutor 组成。

模块内容

class airflow.providers.celery.executors.celery_kubernetes_executor.CeleryKubernetesExecutor(celery_executor, kubernetes_executor)[source]

基类: airflow.executors.base_executor.BaseExecutor

CeleryKubernetesExecutor 由 CeleryExecutor 和 KubernetesExecutor 组成。

它根据任务上定义的队列选择要使用的执行器。当队列是配置中 [celery_kubernetes_executor] 部分的 kubernetes_queue 值(默认值:kubernetes)时,选择 KubernetesExecutor 运行任务;否则,使用 CeleryExecutor。

supports_ad_hoc_ti_run: bool = True[source]
supports_pickling: bool = True[source]
supports_sentry: bool = False[source]
is_local: bool = False[source]
is_single_threaded: bool = False[source]
is_production: bool = True[source]
serve_logs: bool = False[source]
callback_sink: airflow.callbacks.base_callback_sink.BaseCallbackSink | None = None[source]
property kubernetes_queue: str[source]
celery_executor[source]
kubernetes_executor[source]
property queued_tasks: dict[airflow.models.taskinstancekey.TaskInstanceKey, airflow.executors.base_executor.QueuedTaskInstanceType][source]

返回 Celery 和 Kubernetes 执行器中的排队任务。

property running: set[airflow.models.taskinstancekey.TaskInstanceKey][source]

返回 Celery 和 Kubernetes 执行器中正在运行的任务。

property job_id: int | str | None[source]

从 BaseExecutor 继承的属性。

由于这并非真正的执行器,而是执行器的包装器,因此我们将其实现为属性,以便能够拥有自定义 setter。

start()[source]

启动 Celery 和 Kubernetes 执行器。

property slots_available: int[source]

此执行器实例可以接受的新任务数量。

property slots_occupied[source]

此执行器实例当前管理的任务数量。

queue_command(task_instance, command, priority=1, queue=None)[source]

通过 Celery 或 Kubernetes 执行器将命令加入队列。

queue_task_instance(task_instance, mark_success=False, ignore_all_deps=False, ignore_depends_on_past=False, wait_for_past_depends_before_skipping=False, ignore_task_deps=False, ignore_ti_state=False, pool=None, cfg_path=None, **kwargs)[source]

通过 Celery 或 Kubernetes 执行器将任务实例加入队列。

get_task_log(ti, try_number)[source]

从 Kubernetes 执行器获取任务日志。

has_task(task_instance)[source]

检查任务是在 Celery 执行器中排队还是运行,或是在 Kubernetes 执行器中排队还是运行。

参数:

task_instance (airflow.models.taskinstance.TaskInstance) – 任务实例

返回:

如果此执行器知晓该任务,则返回 True

返回类型:

bool

heartbeat()[source]

发送心跳信号以触发 Celery 和 Kubernetes 执行器中的新任务。

get_event_buffer(dag_ids=None)[source]

返回并刷新 Celery 和 Kubernetes 执行器中的事件缓冲区。

参数:

dag_ids (list[str] | None) – 要返回事件的 dag_ids,如果为 None 则返回所有

返回:

一个事件字典

返回类型:

dict[airflow.models.taskinstancekey.TaskInstanceKey, airflow.executors.base_executor.EventBufferValueType]

try_adopt_task_instances(tis)[source]

尝试接管因调度器作业停止而遗弃的正在运行的任务实例。

任何未被接管的任务将由调度器清除(然后变得可重新调度)

返回:

未能被接管的任务实例

返回类型:

collections.abc.Sequence[airflow.models.taskinstance.TaskInstance]

cleanup_stuck_queued_tasks(tis)[source]
revoke_task(*, ti)[source]

尝试从执行器中移除任务。

应尝试确保任务不再在 worker 上运行,并确保其从内部数据结构中清除。

它*不应*更改 Airflow 中任务的状态,或向事件缓冲区添加任何事件。

不应引发任何错误。

参数:

ti (airflow.models.taskinstance.TaskInstance) – 要移除的任务实例

end()[source]

结束 Celery 和 Kubernetes 执行器。

terminate()[source]

终止 Celery 和 Kubernetes 执行器。

debug_dump()[source]

调试转储;由调度器在收到 SIGUSR2 信号时调用。

send_callback(request)[source]

发送回调以执行。

参数:

request (airflow.callbacks.callback_requests.CallbackRequest) – 要执行的回调请求。

static get_cli_commands()[source]

提供要包含在 Airflow CLI 中的 CLI 命令。

覆盖此方法以通过 Airflow CLI 公开管理此执行器的命令。这可以是用于设置/拆卸执行器、检查状态等的命令。确保为这些命令选择唯一的名称,以避免冲突。

此条目是否有用?