Celery 执行器¶
注意
从 Airflow 2.7.0 开始,您需要安装 celery
提供程序包才能使用此执行器。可以通过安装 apache-airflow-providers-celery>=3.3.0
或通过安装带有 celery
额外功能的 Airflow 来完成:pip install 'apache-airflow[celery]'
。
CeleryExecutor
是扩展工作进程数量的方式之一。为了使其工作,您需要设置 Celery 后端(RabbitMQ,Redis,Redis Sentinel ...),安装所需的依赖项(例如 librabbitmq
,redis
...),并将您的 airflow.cfg
中的执行器参数指向 CeleryExecutor
并提供相关的 Celery 设置。
有关设置 Celery 代理的更多信息,请参阅关于此主题的详尽 Celery 文档。
Celery 执行器的配置参数可以在 Celery 提供程序的 配置参考 中找到。
以下是您的工作进程的一些必要要求
需要安装
airflow
,并且 CLI 需要在路径中Airflow 配置设置在整个集群中应该是同质的
在工作进程上执行的 Operator 需要在该上下文中满足其依赖项。例如,如果您使用
HiveOperator
,则需要在该框上安装 Hive CLI,或者如果您使用MySqlOperator
,则需要以某种方式在PYTHONPATH
中提供所需的 Python 库工作进程需要访问其
DAGS_FOLDER
,并且您需要自行同步文件系统。一种常见的设置是将您的DAGS_FOLDER
存储在 Git 存储库中,并使用 Chef、Puppet、Ansible 或您在环境中用来配置计算机的任何工具在机器之间同步它。如果您的所有框都有一个公共挂载点,那么在那里共享您的管道文件也应该可以工作
要启动工作进程,您需要设置 Airflow 并启动工作进程子命令
airflow celery worker
您的工作进程应该在任务被发送到其方向后立即开始接收任务。要停止在机器上运行的工作进程,您可以使用
airflow celery stop
它会尝试通过向主 Celery 进程发送 SIGTERM
信号来优雅地停止工作进程,正如 Celery 文档 建议的那样。
请注意,您还可以运行 Celery Flower,这是一个基于 Celery 构建的 Web UI,用于监视您的工作进程。您可以使用快捷命令启动 Flower Web 服务器
airflow celery flower
请注意,您必须在您的系统上安装 flower
Python 库。推荐的方法是安装 airflow celery 包。
pip install 'apache-airflow[celery]'
一些注意事项
请确保使用数据库支持的结果后端
请确保在
[celery_broker_transport_options]
中设置的可见性超时时间超过您运行时间最长的任务的 ETA如果您使用 Redis Sentinel 作为您的代理并且 Redis 服务器受密码保护,请确保在
[celery_broker_transport_options]
部分中指定 Redis 服务器的密码请确保在
[worker_umask]
中设置 umask,以便为工作进程创建的新文件设置权限。任务会消耗资源。请确保您的工作进程有足够的资源来运行
worker_concurrency
个任务队列名称限制为 256 个字符,但每个代理后端可能有其自身的限制
有关 Python 和 Airflow 如何管理模块的详细信息,请参阅 模块管理。
架构¶
Airflow 由多个组件组成
工作进程 - 执行分配的任务
调度器 - 负责将必要的任务添加到队列中
Web 服务器 - HTTP 服务器提供对 DAG/任务状态信息的访问
数据库 - 包含有关任务、DAG、变量、连接等状态的信息
Celery - 队列机制
请注意,Celery 的队列由两个组件组成
代理 - 存储要执行的命令
结果后端 - 存储已完成命令的状态
组件在许多地方相互通信
[1] Web 服务器 –> 工作进程 - 获取任务执行日志
[2] Web 服务器 –> DAG 文件 - 显示 DAG 结构
[3] Web 服务器 –> 数据库 - 获取任务的状态
[4] 工作进程 –> DAG 文件 - 显示 DAG 结构并执行任务
[5] 工作进程 –> 数据库 - 获取和存储有关连接配置、变量和 XCOM 的信息。
[6] 工作进程 –> Celery 的结果后端 - 保存任务的状态
[7] 工作进程 –> Celery 的代理 - 存储要执行的命令
[8] 调度器 –> DAG 文件 - 显示 DAG 结构并执行任务
[9] 调度器 –> 数据库 - 存储 DAG 运行和相关任务
[10] 调度器 –> Celery 的结果后端 - 获取有关已完成任务状态的信息
[11] 调度器 –> Celery 的代理 - 放入要执行的命令
任务执行过程¶
最初,两个进程正在运行
SchedulerProcess - 处理任务并使用 CeleryExecutor 运行
WorkerProcess - 观察队列,等待新任务出现
WorkerChildProcess - 等待新任务
还有两个数据库可用
QueueBroker
ResultBackend
在此过程中,会创建两个进程
LocalTaskJobProcess - 它的逻辑由 LocalTaskJob 描述。它正在监视 RawTaskProcess。使用 TaskRunner 启动新进程。
RawTaskProcess - 这是一个带有用户代码的进程,例如
execute()
。
execute_command()
。它创建一个新进程 - LocalTaskJobProcess。LocalTaskJob
类描述。它使用 TaskRunner 启动新进程。队列¶
当使用 CeleryExecutor 时,可以指定任务发送到的 Celery 队列。queue
是 BaseOperator 的属性,因此任何任务都可以分配给任何队列。环境的默认队列在 airflow.cfg
的 operators -> default_queue
中定义。这定义了未指定时任务分配到的队列,以及 Airflow 工作进程启动时监听的队列。
工作进程可以监听一个或多个任务队列。当启动工作进程(使用命令 airflow celery worker
)时,可以给出用逗号分隔的一组队列名称(没有空格)(例如 airflow celery worker -q spark,quark
)。然后,此工作进程将仅接收连接到指定队列的任务。
如果您需要专业的工作进程,无论从资源的角度(例如,对于非常轻量级的任务,一个工作进程可以毫无问题地处理数千个任务),还是从环境的角度(您希望工作进程在 Spark 集群本身内运行,因为它需要非常特定的环境和安全权限),这都很有用。