配置参考¶
此页面包含 apache-airflow-providers-celery
提供程序的所有可用 Airflow 配置列表,这些配置可以在 airflow.cfg
文件中设置,也可以使用环境变量设置。
注意
从 Airflow 2.7.0 开始,提供程序包中嵌入的配置开始使用。以前,配置在 Airflow 核心包中描述和配置 - 因此,如果您使用的是低于 2.7.0 的 Airflow,请查看 Airflow 文档,了解 Airflow 核心中可用的配置选项列表。
注意
有关更多信息,请参阅 设置配置选项。
[celery]¶
仅当您在上面的 [core]
节中使用 CeleryExecutor 时,此节才适用
broker_url¶
Celery 代理 URL。Celery 支持 RabbitMQ、Redis,以及实验性的 sqlalchemy 数据库。有关更多信息,请参阅 Celery 文档。
- 类型
字符串
- 默认值
redis://redis:6379/0
- 环境变量
AIRFLOW__CELERY__BROKER_URL
AIRFLOW__CELERY__BROKER_URL_CMD
AIRFLOW__CELERY__BROKER_URL_SECRET
celery_app_name¶
Celery 将使用的应用程序名称
- 类型
字符串
- 默认值
airflow.providers.celery.executors.celery_executor
- 环境变量
AIRFLOW__CELERY__CELERY_APP_NAME
celery_config_options¶
Celery 配置选项的导入路径
- 类型
字符串
- 默认值
airflow.providers.celery.executors.default_celery.DEFAULT_CELERY_CONFIG
- 环境变量
AIRFLOW__CELERY__CELERY_CONFIG_OPTIONS
flower_basic_auth¶
使用基本身份验证保护 Flower,接受用逗号分隔的 user:password 对
- 类型
字符串
- 默认值
''
- 环境变量
AIRFLOW__CELERY__FLOWER_BASIC_AUTH
AIRFLOW__CELERY__FLOWER_BASIC_AUTH_CMD
AIRFLOW__CELERY__FLOWER_BASIC_AUTH_SECRET
- 示例
user1:password1,user2:password2
flower_host¶
Celery Flower 是 Celery 的一个简洁 UI。Airflow 有一个启动它的快捷方式 airflow celery flower
。这定义了 Celery Flower 运行的 IP
- 类型
字符串
- 默认值
0.0.0.0
- 环境变量
AIRFLOW__CELERY__FLOWER_HOST
operation_timeout¶
在 send_task_to_executor
或 fetch_celery_task_state
操作超时之前等待的秒数。
- 类型
浮点数
- 默认值
1.0
- 环境变量
AIRFLOW__CELERY__OPERATION_TIMEOUT
pool¶
Celery 池实现。选项包括:prefork
(默认)、eventlet
、gevent
或 solo
。请参阅:https://docs.celeryq.dev/en/latest/userguide/workers.html#concurrency https://docs.celeryq.dev/en/latest/userguide/concurrency/eventlet.html
- 类型
字符串
- 默认值
prefork
- 环境变量
AIRFLOW__CELERY__POOL
result_backend¶
Celery result_backend。当作业完成时,它需要更新作业的元数据。因此,它将在消息总线上发布消息,或将其插入数据库(取决于后端)。此状态由调度程序用于更新任务的状态。强烈建议使用数据库。如果未指定,将使用带有 db+ 方案前缀的 sql_alchemy_conn https://docs.celeryq.dev/en/latest/userguide/configuration.html#task-result-backend-settings
- 类型
字符串
- 默认值
无
- 环境变量
AIRFLOW__CELERY__RESULT_BACKEND
AIRFLOW__CELERY__RESULT_BACKEND_CMD
AIRFLOW__CELERY__RESULT_BACKEND_SECRET
- 示例
db+postgresql://postgres:airflow@postgres/airflow
result_backend_sqlalchemy_engine_options¶
可选的配置字典,传递给 Celery 结果后端 SQLAlchemy 引擎。
- 类型
字符串
- 默认值
''
- 环境变量
AIRFLOW__CELERY__RESULT_BACKEND_SQLALCHEMY_ENGINE_OPTIONS
- 示例
{"pool_recycle": 1800}
ssl_active¶
- 类型
字符串
- 默认值
假
- 环境变量
AIRFLOW__CELERY__SSL_ACTIVE
sync_parallelism¶
CeleryExecutor 用于同步任务状态的进程数。0 表示使用 max(1, 核心数 - 1) 个进程。
- 类型
字符串
- 默认值
0
- 环境变量
AIRFLOW__CELERY__SYNC_PARALLELISM
task_acks_late¶
3.6.0 版本中的新功能。
如果 Airflow 任务的执行时间超过 visibility_timeout,Celery 会将该任务重新分配给 Celery 工作人员,即使原始任务仍在成功运行。然后,新的任务实例与原始任务并发运行,并且 Airflow UI 和日志仅显示错误消息:“任务实例未运行”失败:任务处于运行状态”。将 task_acks_late 设置为 True 将强制 Celery 等待任务完成,然后才能分配新的任务实例。这实际上覆盖了可见性超时。另请参阅:https://docs.celeryq.dev/en/stable/reference/celery.app.task.html#celery.app.task.Task.acks_late
- 类型
布尔值
- 默认值
真
- 环境变量
AIRFLOW__CELERY__TASK_ACKS_LATE
- 示例
真
task_publish_max_retries¶
由于 AirflowTaskTimeout
错误而导致任务消息发布到代理失败时的最大重试次数,然后放弃并将任务标记为失败。
- 类型
整数
- 默认值
3
- 环境变量
AIRFLOW__CELERY__TASK_PUBLISH_MAX_RETRIES
task_track_started¶
当任务由工作进程执行时,Celery 任务将报告其状态为“已启动”。这在 Airflow 中用于跟踪正在运行的任务,如果重新启动调度程序或在 HA 模式下运行调度程序,它可以采用先前 SchedulerJob 启动的孤立任务。
- 类型
布尔值
- 默认值
真
- 环境变量
AIRFLOW__CELERY__TASK_TRACK_STARTED
worker_autoscale¶
用于基于负载动态调整池大小的池进程的最大和最小数量。通过使用 airflow celery worker
命令提供 max_concurrency 和 min_concurrency 来启用自动缩放(始终保持最小进程数,但如果需要,则增加到最大值)。根据工作进程框上的资源和任务的性质来选择这些数字。如果 autoscale 选项可用,则会忽略 worker_concurrency。https://docs.celeryq.dev/en/latest/reference/celery.bin.worker.html#cmdoption-celery-worker-autoscale
- 类型
字符串
- 默认值
无
- 环境变量
AIRFLOW__CELERY__WORKER_AUTOSCALE
- 示例
16,12
worker_concurrency¶
使用 airflow celery worker
命令启动工作进程时将使用的并发。这定义了工作进程将执行的任务实例数,因此请根据工作进程框上的资源和任务的性质调整工作进程的大小
- 类型
字符串
- 默认值
16
- 环境变量
AIRFLOW__CELERY__WORKER_CONCURRENCY
worker_enable_remote_control¶
指定是否启用对工作进程的远程控制。在某些情况下,当代理不支持远程控制时,Celery 会创建许多 .*reply-celery-pidbox
队列。您可以通过将其设置为 false 来阻止这种情况。但是,禁用此功能后,Flower 将无法工作。https://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/index.html#broker-overview
- 类型
布尔值
- 默认值
真
- 环境变量
AIRFLOW__CELERY__WORKER_ENABLE_REMOTE_CONTROL
worker_prefetch_multiplier¶
用于增加工作进程预取的任务数量,从而提高性能。进程数乘以 worker_prefetch_multiplier 就是一个工作进程预取的任务数量。如果存在多个工作进程,并且一个工作进程预取了位于长时间运行任务后面的任务,而另一个工作进程有未使用的进程却无法处理已经声明的被阻塞任务,那么大于 1 的值可能会导致任务不必要的阻塞。 https://docs.celeryq.dev/en/stable/userguide/optimizing.html#prefetch-limits
- 类型
整数
- 默认值
1
- 环境变量
AIRFLOW__CELERY__WORKER_PREFETCH_MULTIPLIER
[celery_broker_transport_options]¶
本节用于指定可以传递给底层 Celery 消息中间件传输的选项。请参阅:https://docs.celeryq.dev/en/latest/userguide/configuration.html#std:setting-broker_transport_options
sentinel_kwargs¶
sentinel_kwargs 参数允许将其他选项传递给 Sentinel 客户端。在 Redis Sentinel 用作消息中间件,并且 Redis 服务器受密码保护的典型场景中,需要通过此参数传递密码。尽管它的类型是字符串,但需要传递符合字典格式的字符串。请参阅:https://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/redis.html#configuration
- 类型
字符串
- 默认值
无
- 环境变量
AIRFLOW__CELERY_BROKER_TRANSPORT_OPTIONS__SENTINEL_KWARGS
AIRFLOW__CELERY_BROKER_TRANSPORT_OPTIONS__SENTINEL_KWARGS_CMD
AIRFLOW__CELERY_BROKER_TRANSPORT_OPTIONS__SENTINEL_KWARGS_SECRET
- 示例
{"password": "password_for_redis_server"}
visibility_timeout¶
可见性超时定义了在消息被重新传递给另一个工作进程之前,等待工作进程确认任务的秒数。请确保增加可见性超时,以匹配您计划使用的最长 ETA 时间。可见性超时仅支持 Redis 和 SQS Celery 消息中间件。请参阅:https://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/redis.html#visibility-timeout
- 类型
字符串
- 默认值
无
- 环境变量
AIRFLOW__CELERY_BROKER_TRANSPORT_OPTIONS__VISIBILITY_TIMEOUT
- 示例
21600
[celery_kubernetes_executor]¶
本节仅适用于您在上面的 [core]
部分中使用 CeleryKubernetesExecutor
的情况
kubernetes_queue¶
定义在使用 CeleryKubernetesExecutor
时何时将任务发送到 KubernetesExecutor
。当任务的队列是 kubernetes_queue
的值(默认为 kubernetes
)时,该任务将通过 KubernetesExecutor
执行,否则通过 CeleryExecutor
执行
- 类型
字符串
- 默认值
kubernetes
- 环境变量
AIRFLOW__CELERY_KUBERNETES_EXECUTOR__KUBERNETES_QUEUE