指标配置

可以将 Airflow 设置为将指标发送到 StatsDOpenTelemetry

设置 - StatsD

要使用 StatsD,您必须首先安装所需的软件包

pip install 'apache-airflow[statsd]'

然后将以下行添加到您的配置文件中,例如 airflow.cfg

[metrics]
statsd_on = True
statsd_host = localhost
statsd_port = 8125
statsd_prefix = airflow

如果您想使用自定义的 StatsD 客户端而不是 Airflow 提供的默认客户端,则必须将以下键添加到配置文件中,并包含自定义 StatsD 客户端的模块路径。此模块必须在您的 PYTHONPATH 中可用。

[metrics]
statsd_custom_client_path = x.y.customclient

有关 Python 和 Airflow 如何管理模块的详细信息,请参阅 模块管理

设置 - OpenTelemetry

要使用 OpenTelemetry,您必须首先安装所需的软件包

pip install 'apache-airflow[otel]'

将以下行添加到您的配置文件中,例如 airflow.cfg

[metrics]
otel_on = True
otel_host = localhost
otel_port = 8889
otel_prefix = airflow
otel_interval_milliseconds = 30000  # The interval between exports, defaults to 60000
otel_ssl_active = False

启用 Https

要建立与 OpenTelemetry 收集器的 HTTPS 连接,您需要在 OpenTelemetry 收集器的 config.yml 文件中配置 SSL 证书和密钥。

receivers:
  otlp:
    protocols:
      http:
        endpoint: 0.0.0.0:4318
        tls:
          cert_file: "/path/to/cert/cert.crt"
          key_file: "/path/to/key/key.pem"

允许/阻止列表

如果您想避免发送所有可用的指标,您可以配置一个允许列表或阻止列表,仅发送或阻止以列表元素开头的指标。

[metrics]
metrics_allow_list = scheduler,executor,dagrun,pool,triggerer,celery
[metrics]
metrics_block_list = scheduler,executor,dagrun,pool,triggerer,celery

重命名指标

如果您想将指标重定向到不同的名称,您可以在 [metrics] 部分配置 stat_name_handler 选项。它应该指向一个验证统计名称的函数,在必要时更改统计名称,并返回转换后的统计名称。该函数可能如下所示

def my_custom_stat_name_handler(stat_name: str) -> str:
    return stat_name.lower()[:32]

其他配置选项

注意

有关指标配置选项的详细列表,请参阅配置参考文档 - [metrics]

指标描述

计数器

名称

描述

<job_name>_start

启动的 <job_name> 作业的数量,例如 SchedulerJobLocalTaskJob

<job_name>_end

结束的 <job_name> 作业的数量,例如 SchedulerJobLocalTaskJob

<job_name>_heartbeat_failure

<job_name> 作业的心跳失败次数,例如 SchedulerJobLocalTaskJob

local_task_job.task_exit.<job_id>.<dag_id>.<task_id>.<return_code>

当运行 DAG <dag_id> 的任务 <task_id> 时,LocalTaskJob<return_code> 终止的次数。

local_task_job.task_exit

当运行 DAG <dag_id> 的任务 <task_id> 时,LocalTaskJob<return_code> 终止的次数。具有 job_id、dag_id、task_id 和 return_code 标记的指标。

operator_failures_<operator_name>

操作符 <operator_name> 失败次数

operator_failures

操作符 <operator_name> 失败次数。具有 operator_name 标记的指标。

operator_successes_<operator_name>

操作符 <operator_name> 成功次数

operator_successes

操作符 <operator_name> 成功次数。具有 operator_name 标记的指标。

ti_failures

总体任务实例失败次数。具有 dag_id 和 task_id 标记的指标。

ti_successes

总体任务实例成功次数。具有 dag_id 和 task_id 标记的指标。

previously_succeeded

先前成功的任务实例数。具有 dag_id 和 task_id 标记的指标。

zombies_killed

被杀死的僵尸任务。具有 dag_id 和 task_id 标记的指标。

scheduler_heartbeat

调度器心跳

dag_processing.processes

当前正在运行的 DAG 解析进程的相对数量(即,当自上次发送指标以来进程已完成时,此增量为负)。具有 file_path 和 action 标记的指标。

dag_processing.processor_timeouts

由于耗时过长而被杀死的进程的数量。具有 file_path 标记的指标。

dag_processing.sla_callback_count

收到的 SLA 回调数

dag_processing.other_callback_count

收到的非 SLA 回调数

dag_processing.file_path_queue_update_count

我们扫描文件系统并将所有现有 DAG 排入队列的次数

dag_file_processor_timeouts

(已弃用)与 dag_processing.processor_timeouts 相同的行为

dag_processing.manager_stalls

停止的 DagFileProcessorManager 的数量

dag_file_refresh_error

加载任何 DAG 文件失败的次数

scheduler.tasks.killed_externally

外部杀死的任务数量。具有 dag_id 和 task_id 标记的指标。

scheduler.orphaned_tasks.cleared

调度器清除的孤立任务数量

scheduler.orphaned_tasks.adopted

调度器采用的孤立任务数量

scheduler.critical_section_busy

调度器进程尝试获取对关键部分(将任务发送到执行器所需)的锁,并发现它被另一个进程锁定的次数。

sla_missed

错过的 SLA 数量。具有 dag_id 和 task_id 标记的指标。

sla_callback_notification_failure

失败的 SLA 错过回调通知尝试的次数。具有 dag_id 和 func_name 标记的指标。

sla_email_notification_failure

失败的 SLA 错过电子邮件通知尝试的次数。具有 dag_id 标记的指标。

ti.start.<dag_id>.<task_id>

给定 DAG 中启动的任务数量。类似于 <job_name>_start,但适用于任务

ti.start

给定 DAG 中启动的任务数量。类似于 <job_name>_start,但适用于任务。具有 dag_id 和 task_id 标记的指标。

ti.finish.<dag_id>.<task_id>.<state>

给定 DAG 中完成的任务数量。类似于 <job_name>_end,但适用于任务

ti.finish

给定 DAG 中完成的任务数量。类似于 <job_name>_end,但适用于任务。具有 dag_id 和 task_id 标记的指标。

dag.callback_exceptions

从 DAG 回调引发的异常数量。当发生这种情况时,意味着 DAG 回调不起作用。具有 dag_id 标记的指标

celery.task_timeout_error

将任务发布到 Celery 代理时引发的 AirflowTaskTimeout 错误数量。

celery.execute_command.failure

来自 Celery 任务的非零退出代码的数量。

task_removed_from_dag.<dag_id>

为给定 DAG 删除的任务数量(即,DAG 中不再存在该任务)。

task_removed_from_dag

为给定 DAG 删除的任务数量(即,DAG 中不再存在该任务)。具有 dag_id 和 run_type 标记的指标。

task_restored_to_dag.<dag_id>

为给定 DAG 恢复的任务数量(即,先前在数据库中处于 REMOVED 状态的任务实例已添加到 DAG 文件中)

task_restored_to_dag.<dag_id>

为给定 DAG 恢复的任务数量(即,先前在数据库中处于 REMOVED 状态的任务实例已添加到 DAG 文件中)。具有 dag_id 和 run_type 标记的指标。

task_instance_created_<operator_name>

为给定操作符创建的任务实例数量

task_instance_created

为给定操作符创建的任务实例数量。带有 dag_id 和 run_type 标签的指标。

triggerer_heartbeat

触发器心跳

triggers.blocked_main_thread

阻塞主线程的触发器数量(可能是因为没有完全异步)

triggers.failed

在触发事件之前出错的触发器数量

triggers.succeeded

至少触发了一个事件的触发器数量

dataset.updates

更新的数据集数量

dataset.orphaned

由于在 DAG 计划参数或任务出口中不再被引用而被标记为孤立的数据集数量

dataset.triggered_dagruns

由数据集更新触发的 DAG 运行次数

仪表盘

名称

描述

dagbag_size

调度器根据其配置运行扫描时找到的 DAG 数量

dag_processing.import_errors

尝试解析 DAG 文件时出现的错误数量

dag_processing.total_parse_time

扫描和导入 dag_processing.file_path_queue_size 个 DAG 文件所用的秒数

dag_processing.file_path_queue_size

下次扫描要考虑的 DAG 文件数量

dag_processing.last_run.seconds_ago.<dag_file>

自上次处理 <dag_file> 以来经过的秒数

dag_processing.last_num_of_db_queries.<dag_file>

在解析每个 <dag_file> 文件期间,对 Airflow 数据库的查询次数

scheduler.tasks.starving

由于池中没有空闲槽而无法调度的任务数量

scheduler.tasks.executable

根据池限制、DAG 并发、执行器状态和优先级,准备好执行(设置为已排队)的任务数量。

executor.open_slots.<executor_class_name>

特定执行器上的空闲槽数量。仅在配置了多个执行器时发出。

executor.open_slots

执行器上的空闲槽数量

executor.queued_tasks.<executor_class_name>

特定执行器上的已排队任务数量。仅在配置了多个执行器时发出。

executor.queued_tasks

执行器上的已排队任务数量

executor.running_tasks.<executor_class_name>

特定执行器上正在运行的任务数量。仅在配置了多个执行器时发出。

executor.running_tasks

执行器上正在运行的任务数量

pool.open_slots.<pool_name>

池中的空闲槽数量

pool.open_slots

池中的空闲槽数量。带有 pool_name 标签的指标。

pool.queued_slots.<pool_name>

池中的已排队槽数量

pool.queued_slots

池中的已排队槽数量。带有 pool_name 标签的指标。

pool.running_slots.<pool_name>

池中正在运行的槽数量

pool.running_slots

池中正在运行的槽数量。带有 pool_name 标签的指标。

pool.deferred_slots.<pool_name>

池中的延迟槽数量

pool.deferred_slots

池中的延迟槽数量。带有 pool_name 标签的指标。

pool.scheduled_slots.<pool_name>

池中的已调度槽数量

pool.scheduled_slots

池中的已调度槽数量。带有 pool_name 标签的指标。

pool.starving_tasks.<pool_name>

池中饥饿的任务数量

pool.starving_tasks

池中饥饿的任务数量。带有 pool_name 标签的指标。

task.cpu_usage.<dag_id>.<task_id>

任务使用的 CPU 百分比

task.mem_usage.<dag_id>.<task_id>

任务使用的内存百分比

triggers.running.<hostname>

触发器(由主机名描述)当前正在运行的触发器数量

triggers.running

触发器(由主机名描述)当前正在运行的触发器数量。带有主机名标签的指标。

计时器

名称

描述

dagrun.dependency-check.<dag_id>

检查 DAG 依赖项所用的毫秒数

dagrun.dependency-check

检查 DAG 依赖项所用的毫秒数。带有 dag_id 标签的指标。

dag.<dag_id>.<task_id>.duration

运行任务所用的毫秒数

task.duration

运行任务所用的毫秒数。带有 dag_id 和 task_id 标签的指标。

dag.<dag_id>.<task_id>.scheduled_duration

任务在“已调度”状态下花费的毫秒数,然后被置为“已排队”状态

task.scheduled_duration

任务在“已调度”状态下花费的毫秒数,然后被置为“已排队”状态。带有 dag_id 和 task_id 标签的指标。

dag.<dag_id>.<task_id>.queued_duration

任务在“已排队”状态下花费的毫秒数,然后被置为“正在运行”状态

task.queued_duration

任务在“已排队”状态下花费的毫秒数,然后被置为“正在运行”状态。带有 dag_id 和 task_id 标签的指标。

dag_processing.last_duration.<dag_file>

加载给定 DAG 文件所用的毫秒数

dag_processing.last_duration

加载给定 DAG 文件所用的毫秒数。带有 file_name 标签的指标。

dagrun.duration.success.<dag_id>

DagRun 达到成功状态所用的毫秒数

dagrun.duration.success

DagRun 达到成功状态所用的毫秒数。带有 dag_id 和 run_type 标签的指标。

dagrun.duration.failed.<dag_id>

DagRun 达到失败状态所用的毫秒数

dagrun.duration.failed

DagRun 达到失败状态所用的毫秒数。带有 dag_id 和 run_type 标签的指标。

dagrun.schedule_delay.<dag_id>

计划的 DagRun 开始日期和实际 DagRun 开始日期之间的延迟毫秒数

dagrun.schedule_delay

计划的 DagRun 开始日期和实际 DagRun 开始日期之间的延迟毫秒数。带有 dag_id 标签的指标。

scheduler.critical_section_duration

在调度器循环的关键部分花费的毫秒数 - 一次只有一个调度器可以进入此循环

scheduler.critical_section_query_duration

运行关键部分任务实例查询所花费的毫秒数

scheduler.scheduler_loop_duration

运行一个调度器循环所花费的毫秒数

dagrun.<dag_id>.first_task_scheduling_delay

第一个任务的 start_date 和 dagrun 预期开始之间经过的毫秒数

dagrun.first_task_scheduling_delay

第一个任务的 start_date 和 dagrun 预期开始之间经过的毫秒数。带有 dag_id 和 run_type 标签的指标。

collect_db_dags

从数据库中获取所有序列化的 DAG 所用的毫秒数

kubernetes_executor.clear_not_launched_queued_tasks.duration

在 Kubernetes 执行器中清除未启动的已排队任务所用的毫秒数

kubernetes_executor.adopt_task_instances.duration

在 Kubernetes 执行器中采用任务实例所用的毫秒数

此条目是否有帮助?