使用命令行界面

本文档旨在概述使用 CLI 时所有常见任务。

注意

有关 CLI 命令的更多信息,请参阅 命令行界面和环境变量参考

设置 Bash/Zsh 自动补全

当使用 bash(或 zsh)作为 shell 时,airflow 可以使用 argcomplete 进行自动补全。

对于所有已启用 argcomplete 的 python 应用程序的 全局激活 运行

sudo activate-global-python-argcomplete

对于永久(但不是全局)airflow 激活,请使用

register-python-argcomplete airflow >> ~/.bashrc

对于仅针对 airflow 的一次性 argcomplete 激活,请使用

eval "$(register-python-argcomplete airflow)"
../_images/cli_completion.gif

如果您正在使用 zsh,请将以下内容添加到您的 .zshrc

autoload bashcompinit
bashcompinit
eval "$(register-python-argcomplete airflow)"

创建连接

有关使用 CLI 创建连接的信息,请参阅 从 CLI 创建连接

将 DAG 结构导出为图像

Airflow 允许您将 DAG 结构打印或保存为图像。这对于记录或共享您的 DAG 结构非常有用。您需要安装 Graphviz

例如,要将 example_complex DAG 打印到终端

airflow dags show example_complex

这将以 DOT 格式将渲染的 DAG 结构(类似于 图形)打印到屏幕上。

支持多种文件格式。要使用它们,请添加参数 --save [filename].[format]

example_complex DAG 保存为 PNG 文件

airflow dags show example_complex --save example_complex.png

这会将以下图像另存为文件

../_images/usage_cli_export.png

示例 DAG 表示

支持以下文件格式

  • bmp

  • canon, dot, gv, xdot, xdot1.2, xdot1.4

  • cgimage

  • cmap

  • eps

  • exr

  • fig

  • gd, gd2

  • gif

  • gtk

  • ico

  • imap, cmapx

  • imap_np, cmapx_np

  • ismap

  • jp2

  • jpg, jpeg, jpe

  • json, json0, dot_json, xdot_json

  • pct, pict

  • pdf

  • pic

  • plain, plain-ext

  • png

  • pov

  • ps

  • ps2

  • psd

  • sgi

  • svg, svgz

  • tga

  • tif, tiff

  • tk

  • vml, vmlz

  • vrml

  • wbmp

  • webp

  • xlib

  • x11

默认情况下,Airflow 在 airflow.cfg 文件的 [core] 部分中 dags_folder 选项指定的目录中查找 DAG。您可以使用 --subdir 参数选择一个新目录。

显示 DAG 结构

有时,您会处理包含复杂依赖项的 DAG。然后,预览 DAG 以查看它是否正确是有帮助的。

如果您有 macOS,您可以将 iTerm2imgcat 脚本一起使用,以在控制台中显示 DAG 结构。您还需要安装 Graphviz

其他终端不支持显示高质量图形。您可以将图像转换为文本形式,但其分辨率会阻止您阅读它。

为此,您应该在 airflow dags show 命令中使用 --imgcat 开关。例如,如果您想显示 example_bash_operator DAG,则可以使用以下命令

airflow dags show example_bash_operator --imgcat

您将看到与以下屏幕截图中类似的结果。

../_images/usage_cli_imgcat.png

iTerm2 中 DAG 的预览

格式化命令输出

一些 Airflow 命令(如 airflow dags listairflow tasks states-for-dag-run)支持 --output 标志,允许用户更改命令输出的格式。可能选项

  • table - 以纯文本表格的形式呈现信息

  • simple - 以标准 Linux 实用程序可以解析的简单表格的形式呈现信息

  • json - 以 json 字符串的形式呈现信息

  • yaml - 以有效的 yaml 形式呈现信息

jsonyaml 格式都可以使用命令行工具(如 jqyq)更轻松地操作数据。例如

airflow tasks states-for-dag-run example_complex 2020-11-13T00:00:00+00:00 --output json | jq ".[] | {sd: .start_date, ed: .end_date}"
{
  "sd": "2020-11-29T14:53:46.811030+00:00",
  "ed": "2020-11-29T14:53:46.974545+00:00"
}
{
  "sd": "2020-11-29T14:53:56.926441+00:00",
  "ed": "2020-11-29T14:53:57.118781+00:00"
}
{
  "sd": "2020-11-29T14:53:56.915802+00:00",
  "ed": "2020-11-29T14:53:57.125230+00:00"
}
{
  "sd": "2020-11-29T14:53:56.922131+00:00",
  "ed": "2020-11-29T14:53:57.129091+00:00"
}
{
  "sd": "2020-11-29T14:53:56.931243+00:00",
  "ed": "2020-11-29T14:53:57.126306+00:00"
}

从元数据数据库中清除历史记录

注意

强烈建议在运行 db clean 命令之前备份元数据数据库。

db clean 命令通过从每个表中删除早于所提供的 --clean-before-timestamp 的记录来工作。

你可以选择提供要执行删除操作的表列表。如果没有提供表列表,则将包含所有表。

你可以使用 --dry-run 选项来打印要清理的主要表中的行数。

默认情况下,db clean 会将已清除的行存档到 _airflow_deleted__<table>__<timestamp> 形式的表中。如果你不希望以这种方式保留数据,你可以提供参数 --skip-archive

从存档表中导出已清除的记录

db export-archived 命令将 db clean 命令创建的存档表的内容导出到指定格式(默认情况下导出到 CSV 文件)。导出的文件将包含在 db clean 过程中从主要表中清除的记录。

你可以使用 --export-format 选项指定导出格式。默认格式为 csv,目前也是唯一支持的格式。

你必须使用 --output-path 选项指定要将数据导出的路径位置。此位置必须存在。

其他选项包括:--tables 用于指定要导出的表,--drop-archives 用于在导出后删除归档表。

删除已归档的表

如果在 db clean 过程中,你没有使用 --skip-archive 选项(该选项会删除已归档的表),你仍然可以使用 db drop-archived 命令删除已归档的表。此操作不可逆,建议你在删除已归档的表之前使用 db export-archived 命令将表备份到磁盘。

你可以使用 --tables 选项指定要删除的表。如果没有指定表,则将删除所有已归档的表。

注意级联删除

请记住,某些表具有使用 ON DELETE CASCADE 定义的外键关系,因此在一个表中删除可能会触发在其他表中删除。例如,task_instance 表键入 dag_run 表,因此如果删除了 DagRun 记录,其所有关联的任务实例也将被删除。

对 DAG 运行的特殊处理

通常,Airflow 通过查找最新的 DagRun 来确定接下来运行哪个 DagRun。如果你删除了所有 DAG 运行,Airflow 可能会调度一个已经完成的旧 DAG 运行,例如,如果你设置了 catchup=True。因此,db clean 命令将保留最新的非手动触发的 DAG 运行,以保持调度的连续性。

可回填 DAG 的注意事项

并非所有 DAG 都设计为可与 Airflow 的回填命令配合使用。但对于那些可以配合使用的 DAG,需要特别注意。如果您删除 DAG 运行,并且在包含已删除 DAG 运行的日期范围内运行回填,则这些运行将被重新创建并再次运行。因此,如果您有属于此类别的 DAG,您可能希望避免删除 DAG 运行,而仅清理其他大型表,例如任务实例和日志等。

升级 Airflow

运行 airflow db migrate --help 以获取用法详细信息。

手动运行迁移

如果需要,您可以生成升级的 SQL 语句,并手动逐个应用每个升级迁移。为此,您可以使用 --range(用于 Airflow 版本)或 --revision-range(用于 Alembic 修订版)选项与 db migrate 结合使用。不要 跳过运行 Alembic 修订版 ID 更新命令;这是 Airflow 将知道您下次需要从何处升级的方式。有关修订版和版本之间的映射,请参阅 数据库迁移参考

降级 Airflow

注意

建议您在运行 db downgrade 或任何其他数据库操作之前备份您的数据库。

您可以使用 db downgrade 命令将 Airflow 降级到特定版本。或者,您可以提供要降级到的 Alembic 修订版 ID。

如果您想要预览命令但不执行它们,请使用选项 --show-sql-only

选项 --from-revision--from-version 只能与 --show-sql-only 选项结合使用,因为在实际运行迁移时,我们应该始终从当前修订版降级。

有关 Airflow 版本和 Alembic 修订版之间的映射,请参阅 数据库迁移参考

导出连接

您可以使用 CLI 从数据库导出连接。支持的文件格式为 jsonyamlenv

您可以将目标文件指定为参数

airflow connections export connections.json

或者,您可以指定 file-format 参数以覆盖文件格式

airflow connections export /tmp/connections --file-format yaml

您还可以指定 - 表示 STDOUT

airflow connections export -

JSON 格式包含一个对象,其中键包含连接 ID,值包含连接的定义。在此格式中,连接被定义为 JSON 对象。以下是一个 JSON 文件示例。

{
  "airflow_db": {
    "conn_type": "mysql",
    "host": "mysql",
    "login": "root",
    "password": "plainpassword",
    "schema": "airflow",
    "port": null,
    "extra": null
  },
  "druid_broker_default": {
    "conn_type": "druid",
    "host": "druid-broker",
    "login": null,
    "password": null,
    "schema": null,
    "port": 8082,
    "extra": "{\"endpoint\": \"druid/v2/sql\"}"
  }
}

YAML 文件结构类似于 JSON。连接 ID 的键值对和一个或多个连接的定义。在此格式中,连接被定义为 YAML 对象。以下是一个 YAML 文件示例。

airflow_db:
  conn_type: mysql
  extra: null
  host: mysql
  login: root
  password: plainpassword
  port: null
  schema: airflow
druid_broker_default:
  conn_type: druid
  extra: '{"endpoint": "druid/v2/sql"}'
  host: druid-broker
  login: null
  password: null
  port: 8082
  schema: null

您还可以以 .env 格式导出连接。键是连接 ID,值是连接的序列化表示形式,可以使用 Airflow 的连接 URI 格式或 JSON。要使用 JSON,请提供选项 --serialization-format=json,否则将使用 Airflow 连接 URI 格式。以下是使用这两种格式的 .env 文件示例。

URI 示例

airflow_db=mysql://root:plainpassword@mysql/airflow
druid_broker_default=druid://druid-broker:8082?endpoint=druid%2Fv2%2Fsql

JSON 示例输出

airflow_db={"conn_type": "mysql", "login": "root", "password": "plainpassword", "host": "mysql", "schema": "airflow"}
druid_broker_default={"conn_type": "druid", "host": "druid-broker", "port": 8082, "extra": "{\"endpoint\": \"druid/v2/sql\"}"}

测试 DAG 导入错误

CLI 可用于通过 list-import-errors 子命令检查任何发现的 DAG 是否有导入错误。可以通过检查命令输出(特别是与 --output 一起使用以生成标准文件格式时)创建一个自动化步骤,该步骤在任何 DAG 无法导入时失败。例如,当没有错误时的默认输出为 No data found,json 输出为 []。然后可以在 CI 或预提交中运行该检查,以加快审查过程和测试。

如果有任何错误,则失败的示例命令,使用 jq 解析输出

airflow dags list-import-errors --output=json | jq -e 'select(type=="array" and length == 0)'

可以按原样将该行添加到自动化中,或者如果您想要打印输出,可以使用 tee

airflow dags list-import-errors | tee import_errors.txt && jq -e 'select(type=="array" and length == 0)' import_errors.txt

Jenkins 管道中的示例

stage('All DAGs are loadable') {
    steps {
        sh 'airflow dags list-import-errors | tee import_errors.txt && jq -e \'select(type=="array" and length == 0)\' import_errors.txt'
    }
}

注意

为了准确地进行此操作,你必须确保 Airflow 不会将任何附加文本记录到 stdout。例如,你可能需要修复任何弃用警告,将 2>/dev/null 添加到你的命令中,或者如果加载插件时会生成日志,则在 Airflow 配置中设置 lazy_load_plugins = True

此条目是否有用?