指标配置¶
可以将 Airflow 设置为将指标发送到 StatsD 或 OpenTelemetry。
设置 - StatsD¶
要使用 StatsD,您必须首先安装所需的软件包
pip install 'apache-airflow[statsd]'
然后将以下行添加到您的配置文件中,例如 airflow.cfg
[metrics]
statsd_on = True
statsd_host = localhost
statsd_port = 8125
statsd_prefix = airflow
如果您想使用自定义的 StatsD 客户端而不是 Airflow 提供的默认客户端,则必须将以下键添加到配置文件中,并包含自定义 StatsD 客户端的模块路径。此模块必须在您的 PYTHONPATH
中可用。
[metrics]
statsd_custom_client_path = x.y.customclient
有关 Python 和 Airflow 如何管理模块的详细信息,请参阅 模块管理。
设置 - OpenTelemetry¶
要使用 OpenTelemetry,您必须首先安装所需的软件包
pip install 'apache-airflow[otel]'
将以下行添加到您的配置文件中,例如 airflow.cfg
[metrics]
otel_on = True
otel_host = localhost
otel_port = 8889
otel_prefix = airflow
otel_interval_milliseconds = 30000 # The interval between exports, defaults to 60000
otel_ssl_active = False
启用 Https¶
要建立与 OpenTelemetry 收集器的 HTTPS 连接,您需要在 OpenTelemetry 收集器的 config.yml
文件中配置 SSL 证书和密钥。
receivers:
otlp:
protocols:
http:
endpoint: 0.0.0.0:4318
tls:
cert_file: "/path/to/cert/cert.crt"
key_file: "/path/to/key/key.pem"
允许/阻止列表¶
如果您想避免发送所有可用的指标,您可以配置一个允许列表或阻止列表,仅发送或阻止以列表元素开头的指标。
[metrics]
metrics_allow_list = scheduler,executor,dagrun,pool,triggerer,celery
[metrics]
metrics_block_list = scheduler,executor,dagrun,pool,triggerer,celery
重命名指标¶
如果您想将指标重定向到不同的名称,您可以在 [metrics]
部分配置 stat_name_handler
选项。它应该指向一个验证统计名称的函数,在必要时更改统计名称,并返回转换后的统计名称。该函数可能如下所示
def my_custom_stat_name_handler(stat_name: str) -> str:
return stat_name.lower()[:32]
指标描述¶
计数器¶
名称 |
描述 |
---|---|
|
启动的 |
|
结束的 |
|
|
|
当运行 DAG |
|
当运行 DAG |
|
操作符 |
|
操作符 |
|
操作符 |
|
操作符 |
|
总体任务实例失败次数。具有 dag_id 和 task_id 标记的指标。 |
|
总体任务实例成功次数。具有 dag_id 和 task_id 标记的指标。 |
|
先前成功的任务实例数。具有 dag_id 和 task_id 标记的指标。 |
|
被杀死的僵尸任务。具有 dag_id 和 task_id 标记的指标。 |
|
调度器心跳 |
|
当前正在运行的 DAG 解析进程的相对数量(即,当自上次发送指标以来进程已完成时,此增量为负)。具有 file_path 和 action 标记的指标。 |
|
由于耗时过长而被杀死的进程的数量。具有 file_path 标记的指标。 |
|
收到的 SLA 回调数 |
|
收到的非 SLA 回调数 |
|
我们扫描文件系统并将所有现有 DAG 排入队列的次数 |
|
(已弃用)与 |
|
停止的 |
|
加载任何 DAG 文件失败的次数 |
|
外部杀死的任务数量。具有 dag_id 和 task_id 标记的指标。 |
|
调度器清除的孤立任务数量 |
|
调度器采用的孤立任务数量 |
|
调度器进程尝试获取对关键部分(将任务发送到执行器所需)的锁,并发现它被另一个进程锁定的次数。 |
|
错过的 SLA 数量。具有 dag_id 和 task_id 标记的指标。 |
|
失败的 SLA 错过回调通知尝试的次数。具有 dag_id 和 func_name 标记的指标。 |
|
失败的 SLA 错过电子邮件通知尝试的次数。具有 dag_id 标记的指标。 |
|
给定 DAG 中启动的任务数量。类似于 <job_name>_start,但适用于任务 |
|
给定 DAG 中启动的任务数量。类似于 <job_name>_start,但适用于任务。具有 dag_id 和 task_id 标记的指标。 |
|
给定 DAG 中完成的任务数量。类似于 <job_name>_end,但适用于任务 |
|
给定 DAG 中完成的任务数量。类似于 <job_name>_end,但适用于任务。具有 dag_id 和 task_id 标记的指标。 |
|
从 DAG 回调引发的异常数量。当发生这种情况时,意味着 DAG 回调不起作用。具有 dag_id 标记的指标 |
|
将任务发布到 Celery 代理时引发的 |
|
来自 Celery 任务的非零退出代码的数量。 |
|
为给定 DAG 删除的任务数量(即,DAG 中不再存在该任务)。 |
|
为给定 DAG 删除的任务数量(即,DAG 中不再存在该任务)。具有 dag_id 和 run_type 标记的指标。 |
|
为给定 DAG 恢复的任务数量(即,先前在数据库中处于 REMOVED 状态的任务实例已添加到 DAG 文件中) |
|
为给定 DAG 恢复的任务数量(即,先前在数据库中处于 REMOVED 状态的任务实例已添加到 DAG 文件中)。具有 dag_id 和 run_type 标记的指标。 |
|
为给定操作符创建的任务实例数量 |
|
为给定操作符创建的任务实例数量。带有 dag_id 和 run_type 标签的指标。 |
|
触发器心跳 |
|
阻塞主线程的触发器数量(可能是因为没有完全异步) |
|
在触发事件之前出错的触发器数量 |
|
至少触发了一个事件的触发器数量 |
|
更新的数据集数量 |
|
由于在 DAG 计划参数或任务出口中不再被引用而被标记为孤立的数据集数量 |
|
由数据集更新触发的 DAG 运行次数 |
仪表盘¶
名称 |
描述 |
---|---|
|
调度器根据其配置运行扫描时找到的 DAG 数量 |
|
尝试解析 DAG 文件时出现的错误数量 |
|
扫描和导入 |
|
下次扫描要考虑的 DAG 文件数量 |
|
自上次处理 |
|
在解析每个 |
|
由于池中没有空闲槽而无法调度的任务数量 |
|
根据池限制、DAG 并发、执行器状态和优先级,准备好执行(设置为已排队)的任务数量。 |
|
特定执行器上的空闲槽数量。仅在配置了多个执行器时发出。 |
|
执行器上的空闲槽数量 |
|
特定执行器上的已排队任务数量。仅在配置了多个执行器时发出。 |
|
执行器上的已排队任务数量 |
|
特定执行器上正在运行的任务数量。仅在配置了多个执行器时发出。 |
|
执行器上正在运行的任务数量 |
|
池中的空闲槽数量 |
|
池中的空闲槽数量。带有 pool_name 标签的指标。 |
|
池中的已排队槽数量 |
|
池中的已排队槽数量。带有 pool_name 标签的指标。 |
|
池中正在运行的槽数量 |
|
池中正在运行的槽数量。带有 pool_name 标签的指标。 |
|
池中的延迟槽数量 |
|
池中的延迟槽数量。带有 pool_name 标签的指标。 |
|
池中的已调度槽数量 |
|
池中的已调度槽数量。带有 pool_name 标签的指标。 |
|
池中饥饿的任务数量 |
|
池中饥饿的任务数量。带有 pool_name 标签的指标。 |
|
任务使用的 CPU 百分比 |
|
任务使用的内存百分比 |
|
触发器(由主机名描述)当前正在运行的触发器数量 |
|
触发器(由主机名描述)当前正在运行的触发器数量。带有主机名标签的指标。 |
计时器¶
名称 |
描述 |
---|---|
|
检查 DAG 依赖项所用的毫秒数 |
|
检查 DAG 依赖项所用的毫秒数。带有 dag_id 标签的指标。 |
|
运行任务所用的毫秒数 |
|
运行任务所用的毫秒数。带有 dag_id 和 task_id 标签的指标。 |
|
任务在“已调度”状态下花费的毫秒数,然后被置为“已排队”状态 |
|
任务在“已调度”状态下花费的毫秒数,然后被置为“已排队”状态。带有 dag_id 和 task_id 标签的指标。 |
|
任务在“已排队”状态下花费的毫秒数,然后被置为“正在运行”状态 |
|
任务在“已排队”状态下花费的毫秒数,然后被置为“正在运行”状态。带有 dag_id 和 task_id 标签的指标。 |
|
加载给定 DAG 文件所用的毫秒数 |
|
加载给定 DAG 文件所用的毫秒数。带有 file_name 标签的指标。 |
|
DagRun 达到成功状态所用的毫秒数 |
|
DagRun 达到成功状态所用的毫秒数。带有 dag_id 和 run_type 标签的指标。 |
|
DagRun 达到失败状态所用的毫秒数 |
|
DagRun 达到失败状态所用的毫秒数。带有 dag_id 和 run_type 标签的指标。 |
|
计划的 DagRun 开始日期和实际 DagRun 开始日期之间的延迟毫秒数 |
|
计划的 DagRun 开始日期和实际 DagRun 开始日期之间的延迟毫秒数。带有 dag_id 标签的指标。 |
|
在调度器循环的关键部分花费的毫秒数 - 一次只有一个调度器可以进入此循环 |
|
运行关键部分任务实例查询所花费的毫秒数 |
|
运行一个调度器循环所花费的毫秒数 |
|
第一个任务的 start_date 和 dagrun 预期开始之间经过的毫秒数 |
|
第一个任务的 start_date 和 dagrun 预期开始之间经过的毫秒数。带有 dag_id 和 run_type 标签的指标。 |
|
从数据库中获取所有序列化的 DAG 所用的毫秒数 |
|
在 Kubernetes 执行器中清除未启动的已排队任务所用的毫秒数 |
|
在 Kubernetes 执行器中采用任务实例所用的毫秒数 |