Celery 执行器

注意

从 Airflow 2.7.0 开始,您需要安装 celery 提供程序包才能使用此执行器。这可以通过安装 apache-airflow-providers-celery>=3.3.0 或使用 celery 额外组件安装 Airflow 来完成:pip install 'apache-airflow[celery]'

CeleryExecutor 是扩展工作器数量的方法之一。为此,您需要设置 Celery 后端(RabbitMQRedisRedis Sentinel …),安装所需的依赖项(例如 librabbitmqredis …),并将您的 airflow.cfg 中的 executor 参数更改为指向 CeleryExecutor 并提供相关的 Celery 设置。

有关设置 Celery 代理的更多信息,请参阅Celery 文档中关于该主题的详尽说明

Celery 执行器的配置参数可以在 Celery 提供程序的配置参考中找到。

以下是您的工作器的一些必要条件

  • 需要安装 airflow,并且 CLI 需要在路径中

  • Airflow 配置设置应在整个集群中保持一致

  • 在工作器上执行的操作器需要在其上下文中满足其依赖关系。例如,如果您使用 HiveOperator,则需要在该机器上安装 Hive CLI,或者如果您使用 MySqlOperator,则需要以某种方式在 PYTHONPATH 中提供所需的 Python 库

  • 工作器需要能够访问其 DAGS_FOLDER,并且您需要通过自己的方式同步文件系统。一种常见的设置是将您的 DAGS_FOLDER 存储在 Git 存储库中,并使用 Chef、Puppet、Ansible 或您在环境中用于配置机器的任何工具在机器之间同步。如果您的所有机器都有一个公共挂载点,那么将您的管道文件共享在那里也应该可以

要启动工作器,您需要设置 Airflow 并启动 worker 子命令

airflow celery worker

您的工作器应该会在任务被触发到其方向后立即开始接收任务。要停止在机器上运行的工作器,您可以使用

airflow celery stop

它将尝试通过向 Celery 主进程发送 SIGTERM 信号来正常停止工作器,如Celery 文档中所建议的那样。

请注意,您还可以运行Celery Flower(一个构建在 Celery 之上的 Web UI)来监控您的工作器。您可以使用快捷命令启动 Flower Web 服务器

airflow celery flower

请注意,您必须在系统上已经安装了 flower python 库。推荐的方法是安装 airflow celery 包。

pip install 'apache-airflow[celery]'

一些注意事项

  • 确保使用数据库支持的结果后端

  • 确保在 [celery_broker_transport_options] 中设置的可见性超时超过运行时间最长的任务的 ETA

  • 如果您使用 Redis Sentinel 作为代理并且 Redis 服务器受密码保护,请确保在 [celery_broker_transport_options] 部分中指定 Redis 服务器的密码

  • 确保在 [worker_umask] 中设置 umask,以便为工作器创建的新文件设置权限。

  • 任务可能会消耗资源。确保您的工作器有足够的资源来运行 worker_concurrency 个任务

  • 队列名称限制为 256 个字符,但每个代理后端可能有其自身的限制

有关 Python 和 Airflow 如何管理模块的详细信息,请参阅模块管理

架构

digraph A{ rankdir="TB" node[shape="rectangle", style="rounded"] subgraph cluster { label="Cluster"; {rank = same; dag; database} {rank = same; workers; scheduler; web} workers[label="Workers"] scheduler[label="Scheduler"] web[label="Web server"] database[label="Database"] dag[label="DAG files"] subgraph cluster_queue { label="Celery"; {rank = same; queue_broker; queue_result_backend} queue_broker[label="Queue broker"] queue_result_backend[label="Result backend"] } web->workers[label="1"] web->dag[label="2"] web->database[label="3"] workers->dag[label="4"] workers->database[label="5"] workers->queue_result_backend[label="6"] workers->queue_broker[label="7"] scheduler->dag[label="8"] scheduler->database[label="9"] scheduler->queue_result_backend[label="10"] scheduler->queue_broker[label="11"] } }

Airflow 由几个组件组成

  • **工作器** - 执行分配的任务

  • **调度器** - 负责将必要的任务添加到队列中

  • **Web 服务器** - HTTP 服务器提供对 DAG/任务状态信息的访问

  • **数据库** - 包含有关任务、DAG、变量、连接等的狀態信息。

  • **Celery** - 队列机制

请注意,Celery 中的队列由两个组件组成

  • **代理** - 存储要执行的命令

  • **结果后端** - 存储已完成命令的状态

组件在许多地方相互通信

  • [1] **Web 服务器** –> **工作器** - 获取任务执行日志

  • [2] **Web 服务器** –> **DAG 文件** - 显示 DAG 结构

  • [3] **Web 服务器** –> **数据库** - 获取任务的状态

  • [4] **工作器** –> **DAG 文件** - 显示 DAG 结构并执行任务

  • [5] **工作器** –> **数据库** - 获取和存储有关连接配置、变量和 XCOM 的信息。

  • [6] **工作器** –> **Celery 的结果后端** - 保存任务的状态

  • [7] **工作器** –> **Celery 的代理** - 存储要执行的命令

  • [8] **调度器** –> **DAG 文件** - 显示 DAG 结构并执行任务

  • [9] **调度器** –> **数据库** - 存储 DAG 运行和相关任务

  • [10] **调度器** –> **Celery 的结果后端** - 获取有关已完成任务状态的信息

  • [11] **调度器** –> **Celery 的代理** - 放置要执行的命令

任务执行过程

_images/run_task_on_celery_executor.png

序列图 - 任务执行过程

最初,有两个进程正在运行

  • SchedulerProcess - 处理任务并使用 CeleryExecutor 运行

  • WorkerProcess - 观察队列,等待新任务出现

  • WorkerChildProcess - 等待新任务

还有两个数据库可用

  • QueueBroker

  • ResultBackend

在此过程中,创建了两个 2 进程

  • LocalTaskJobProcess - 它的逻辑由 LocalTaskJob 描述。它正在监控 RawTaskProcess。使用 TaskRunner 启动新进程。

  • RawTaskProcess - 它是具有用户代码的进程,例如 execute()

[1] **SchedulerProcess** 处理任务,当它找到需要完成的任务时,将其发送到 **QueueBroker**。
[2] **SchedulerProcess** 还开始定期查询 **ResultBackend** 以获取任务的状态。
[3] **QueueBroker** 在意识到任务时,会将有关任务的信息发送给一个 WorkerProcess。
[4] **WorkerProcess** 将单个任务分配给一个 **WorkerChildProcess**。
[5] **WorkerChildProcess** 执行适当的任务处理函数 - execute_command()。它会创建一个新进程 - **LocalTaskJobProcess**。
[6] LocalTaskJobProcess 逻辑由 LocalTaskJob 类描述。它使用 TaskRunner 启动新进程。
[7][8] 进程 **RawTaskProcess** 和 **LocalTaskJobProcess** 在完成工作后停止。
[10][12] **WorkerChildProcess** 通知主进程 - **WorkerProcess** 任务结束和后续任务的可用性。
[11] **WorkerProcess** 将状态信息保存在 **ResultBackend** 中。
[13] 当 **SchedulerProcess** 再次询问 **ResultBackend** 状态时,它将获得有关任务状态的信息。

队列

使用 CeleryExecutor 时,可以指定发送任务的 Celery 队列。queue 是 BaseOperator 的一个属性,因此任何任务都可以分配给任何队列。环境的默认队列在 airflow.cfgoperators -> default_queue 中定义。这定义了未指定时任务分配到的队列,以及 Airflow 工作器启动时侦听的队列。

工作器可以侦听一个或多个任务队列。启动工作器时(使用命令 airflow celery worker),可以指定一组逗号分隔的队列名称(不带空格)(例如 airflow celery worker -q spark,quark)。然后,该工作器将仅接收连接到指定队列的任务。

如果您需要专门的工作器,无论是从资源角度(例如,对于一个工作器可以毫无问题地处理数千个任务的非常轻量级的任务),还是从环境角度(您希望工作器从 Spark 集群内部运行,因为它需要非常特定的环境和安全权限),这都非常有用。

此条目有帮助吗?