指标配置

可以将 Airflow 设置为将指标发送到 StatsDOpenTelemetry

设置 - StatsD

要使用 StatsD,你必须先安装必需的软件包

pip install 'apache-airflow[statsd]'

然后将以下行添加到你的配置文件中,例如 airflow.cfg

[metrics]
statsd_on = True
statsd_host = localhost
statsd_port = 8125
statsd_prefix = airflow

如果你想使用自定义 StatsD 客户端,而不是 Airflow 提供的默认客户端,则必须将以下键添加到配置文件中,以及自定义 StatsD 客户端的模块路径。此模块必须在你的 PYTHONPATH 中可用。

[metrics]
statsd_custom_client_path = x.y.customclient

有关 Python 和 Airflow 如何管理模块的详细信息,请参阅 模块管理

设置 - OpenTelemetry

要使用 OpenTelemetry,你必须首先安装必需的包

pip install 'apache-airflow[otel]'

将以下行添加到你的配置文件中,例如 airflow.cfg

[metrics]
otel_on = False
otel_host = localhost
otel_port = 8889
otel_prefix = airflow
otel_interval_milliseconds = 30000  # The interval between exports, defaults to 60000
otel_ssl_active = False

启用 Https

要建立与 OpenTelemetry 收集器的 HTTPS 连接,你需要在 OpenTelemetry 收集器的 config.yml 文件中配置 SSL 证书和密钥。

receivers:
  otlp:
    protocols:
      http:
        endpoint: 0.0.0.0:4318
        tls:
          cert_file: "/path/to/cert/cert.crt"
          key_file: "/path/to/key/key.pem"

允许/阻止列表

如果你想避免发送所有可用的指标,你可以配置一个允许列表或阻止列表的前缀,以仅发送或阻止以列表元素开头的指标

[metrics]
metrics_allow_list = scheduler,executor,dagrun
[metrics]
metrics_block_list = scheduler,executor,dagrun

重命名指标

如果你想将指标重定向到不同的名称,可以在 [metrics] 部分中配置 stat_name_handler 选项。它应该指向一个函数,该函数验证统计名称,必要时对统计名称进行更改,并返回转换后的统计名称。该函数可能如下所示

def my_custom_stat_name_handler(stat_name: str) -> str:
    return stat_name.lower()[:32]

其他配置选项

注意

有关指标配置选项的详细列表,请参阅配置参考文档 - [metrics]

指标描述

计数器

名称

描述

<job_name>_start

已启动的 <job_name> 作业数,例如 SchedulerJobLocalTaskJob

<job_name>_end

已结束的 <job_name> 作业数,例如 SchedulerJobLocalTaskJob

<作业名称>_心跳失败

对于 <作业名称> 作业,例如 SchedulerJobLocalTaskJob 的失败心跳次数

local_task_job.task_exit.<作业 ID>.<DAG ID>.<任务 ID>.<返回代码>

运行 DAG <DAG ID> 的任务 <任务 ID> 时,具有 <返回代码>LocalTaskJob 终止次数。

local_task_job.task_exit

运行 DAG <DAG ID> 的任务 <任务 ID> 时,具有 <返回代码>LocalTaskJob 终止次数。具有作业 ID、DAG ID、任务 ID 和返回代码标记的指标。

operator_failures_<操作员名称>

操作员 <操作员名称> 失败

operator_failures

操作员 <操作员名称> 失败。具有操作员名称标记的指标。

operator_successes_<操作员名称>

操作员 <操作员名称> 成功

operator_successes

操作员 <操作员名称> 成功。具有操作员名称标记的指标。

ti_failures

总体任务实例失败。具有 DAG ID 和任务 ID 标记的指标。

ti_successes

总体任务实例成功。具有 DAG ID 和任务 ID 标记的指标。

previously_succeeded

以前成功的任务实例数。具有 DAG ID 和任务 ID 标记的指标。

zombies_killed

已终止的僵尸任务。具有 DAG ID 和任务 ID 标记的指标。

scheduler_heartbeat

调度程序心跳

dag_processing.processes

当前正在运行的 DAG 解析进程的相对数(即,自上次发送指标以来,进程已完成时,此增量为负)。具有文件路径和操作标记的指标。

dag_processing.processor_timeouts

因耗时过长而被杀死的文件处理程序数量。带有 file_path 标记的指标。

dag_processing.sla_callback_count

收到的 SLA 回调数量

dag_processing.other_callback_count

收到的非 SLA 回调数量

dag_processing.file_path_queue_update_count

扫描文件系统并对所有现有 dag 排队的次数

dag_file_processor_timeouts

(已弃用)与 dag_processing.processor_timeouts 相同的行为

dag_processing.manager_stalls

已停止的 DagFileProcessorManager 数量

dag_file_refresh_error

加载任何 DAG 文件的失败次数

scheduler.tasks.killed_externally

外部杀死的任务数量。带有 dag_id 和 task_id 标记的指标。

scheduler.orphaned_tasks.cleared

调度程序清除的孤立任务数量

scheduler.orphaned_tasks.adopted

调度程序采用的孤立任务数量

scheduler.critical_section_busy

调度程序进程尝试获取关键部分的锁(需要将任务发送到执行器)并发现它被另一个进程锁定的次数。

sla_missed

SLA 错过的数量。带有 dag_id 和 task_id 标记的指标。

sla_callback_notification_failure

SLA 错过回调通知尝试失败的次数。带有 dag_id 和 func_name 标记的指标。

sla_email_notification_failure

SLA 错过电子邮件通知尝试失败的次数。带有 dag_id 标记的指标。

ti.start.<dag_id>.<task_id>

给定 dag 中已启动的任务数量。类似于 <job_name>_start,但针对任务

ti.start

给定 dag 中已启动的任务数量。类似于 <job_name>_start,但针对任务。带有 dag_id 和 task_id 标记的指标。

ti.finish.<dag_id>.<task_id>.<state>

给定 dag 中已完成的任务数量。类似于 <job_name>_end,但针对任务

ti.finish

给定 dag 中已完成的任务数量。类似于 <job_name>_end,但针对任务。带有 dag_id 和 task_id 标记的指标。

dag.callback_exceptions

从 DAG 回调引发的异常数量。发生这种情况时,表示 DAG 回调不起作用。带有 dag_id 标记的指标

celery.task_timeout_error

将任务发布到 Celery Broker 时引发的 AirflowTaskTimeout 错误数量。

celery.execute_command.failure

Celery 任务的非零退出代码数量。

task_removed_from_dag.<dag_id>

为给定 dag 删除的任务数量(即任务不再存在于 DAG 中)。

task_removed_from_dag

已从给定 DAG 中移除的任务数(即任务不再存在于 DAG 中)。具有 dag_id 和 run_type 标记的指标。

task_restored_to_dag.<dag_id>

已为给定 DAG 恢复的任务数(即先前在数据库中处于 REMOVED 状态的任务实例已添加到 DAG 文件中)

task_restored_to_dag.<dag_id>

已为给定 DAG 恢复的任务数(即先前在数据库中处于 REMOVED 状态的任务实例已添加到 DAG 文件中)。具有 dag_id 和 run_type 标记的指标。

task_instance_created_<operator_name>

为给定算子创建的任务实例数

task_instance_created

为给定算子创建的任务实例数。具有 dag_id 和 run_type 标记的指标。

triggerer_heartbeat

触发器心跳

triggers.blocked_main_thread

阻塞主线程的触发器数(可能是由于没有完全异步)

triggers.failed

在触发事件之前出错的触发器数

triggers.succeeded

已触发至少一个事件的触发器数

dataset.updates

更新的数据集数

dataset.orphaned

标记为孤立的数据集数,因为它们不再在 DAG 调度参数或任务出口中被引用

dataset.triggered_dagruns

由数据集更新触发的 DAG 运行数

仪表

名称

描述

dagbag_size

当调度程序基于其配置运行扫描时找到的 DAG 数

dag_processing.import_errors

尝试解析 DAG 文件时出现的错误数

dag_processing.total_parse_time

扫描和导入 dag_processing.file_path_queue_size DAG 文件所用的秒数

dag_processing.file_path_queue_size

要考虑用于下次扫描的 DAG 文件数

dag_processing.last_run.seconds_ago.<dag_file>

自上次处理 <dag_file> 以来经过的秒数

scheduler.tasks.starving

由于池中没有开放槽而无法调度的任务数

scheduler.tasks.executable

准备执行的任务数量(设置为排队),针对池限制、DAG 并发性、执行程序状态和优先级。

executor.open_slots

执行程序上的空闲插槽数量

executor.queued_tasks

执行程序上的排队任务数量

executor.running_tasks

执行程序上的正在运行的任务数量

pool.open_slots.<pool_name>

池中的空闲插槽数量

pool.open_slots

池中的空闲插槽数量。带有 pool_name 标记的指标。

pool.queued_slots.<pool_name>

池中的排队插槽数量

pool.queued_slots

池中的排队插槽数量。带有 pool_name 标记的指标。

pool.running_slots.<pool_name>

池中的正在运行的插槽数量

pool.running_slots

池中的正在运行的插槽数量。带有 pool_name 标记的指标。

pool.deferred_slots.<pool_name>

池中的延迟插槽数量

pool.deferred_slots

池中的延迟插槽数量。带有 pool_name 标记的指标。

pool.starving_tasks.<pool_name>

池中的饥饿任务数量

pool.starving_tasks

池中的饥饿任务数量。带有 pool_name 标记的指标。

triggers.running.<hostname>

触发器(由主机名描述)当前运行的触发器数量

triggers.running

触发器(由主机名描述)当前运行的触发器数量。带有主机名标记的指标。

计时器

名称

描述

dagrun.dependency-check.<dag_id>

检查 DAG 依赖项所花费的毫秒数

dagrun.dependency-check

检查 DAG 依赖项所花费的毫秒数。带有 dag_id 标记的指标。

dag.<dag_id>.<task_id>.duration

运行任务所花费的秒数

task.duration

运行任务所花费的秒数。带有 dag_id 和 task_id 标记的指标。

dag.<dag_id>.<task_id>.scheduled_duration

任务在排队之前在已计划状态中花费的秒数

task.scheduled_duration

任务在排队之前在已计划状态中花费的秒数。带有 dag_id 和 task_id 标记的指标。

dag.<dag_id>.<task_id>.queued_duration

任务在运行之前在已排队状态中花费的秒数

task.queued_duration

任务在运行之前在已排队状态中花费的秒数。带有 dag_id 和 task_id 标记的指标。

dag_processing.last_duration.<dag_file>

加载给定 DAG 文件所花费的秒数

dag_processing.last_duration

加载给定 DAG 文件所花费的秒数。带有 file_name 标记的指标。

dagrun.duration.success.<dag_id>

DagRun 达到成功状态所花费的秒数

dagrun.duration.success

DagRun 达到成功状态所花费的秒数。带有 dag_id 和 run_type 标记的指标。

dagrun.duration.failed.<dag_id>

DagRun 达到失败状态所花费的时间(秒)

dagrun.duration.failed

DagRun 达到失败状态所花费的时间(秒)。具有 dag_id 和 run_type 标记的指标。

dagrun.schedule_delay.<dag_id>

计划的 DagRun 开始日期和实际 DagRun 开始日期之间的延迟(毫秒)

dagrun.schedule_delay

计划的 DagRun 开始日期和实际 DagRun 开始日期之间的延迟(毫秒)。具有 dag_id 标记的指标。

scheduler.critical_section_duration

在调度器循环的关键部分花费的时间(毫秒)——一次只能有一个调度器进入此循环

scheduler.critical_section_query_duration

运行关键部分任务实例查询所花费的时间(毫秒)

scheduler.scheduler_loop_duration

运行一个调度器循环所花费的时间(毫秒)

dagrun.<dag_id>.first_task_scheduling_delay

第一个任务的 start_date 和 dagrun 预期开始时间之间的间隔(秒)

dagrun.first_task_scheduling_delay

第一个任务的 start_date 和 dagrun 预期开始时间之间的间隔(秒)。具有 dag_id 和 run_type 标记的指标。

collect_db_dags

从数据库中获取所有序列化 Dag 所花费的时间(毫秒)

kubernetes_executor.clear_not_launched_queued_tasks.duration

在 Kubernetes Executor 中清除未启动的已排队任务所花费的时间(毫秒)

kubernetes_executor.adopt_task_instances.duration

在 Kubernetes Executor 中采用任务实例所花费的时间(毫秒)

此条目是否有帮助?