使用命令行界面¶
本文档旨在概述使用 CLI 时的所有常见任务。
注意
有关 CLI 命令的更多信息,请参阅 命令行界面和环境变量参考
设置 Bash/Zsh 自动补全¶
当使用 bash(或 zsh
)作为 shell 时,airflow
可以使用 argcomplete 进行自动补全。
对于所有启用 argcomplete 的 Python 应用程序的全局激活,运行
sudo activate-global-python-argcomplete
对于永久(非全局)Airflow 激活,使用
register-python-argcomplete airflow >> ~/.bashrc
对于仅 Airflow 的 argcomplete 一次性激活,使用
eval "$(register-python-argcomplete airflow)"

如果你使用 zsh
,将以下内容添加到你的 .zshrc
中
autoload bashcompinit
bashcompinit
eval "$(register-python-argcomplete airflow)"
创建连接¶
有关使用 CLI 创建连接的信息,请参阅 从 CLI 创建连接
将 DAG 结构导出为图片¶
Airflow 允许你将 DAG 结构打印或保存为图片。这对于记录或分享你的 DAG 结构很有用。你需要安装 Graphviz。
例如,将 example_complex
DAG 打印到终端
airflow dags show example_complex
这将把渲染后的 DAG 结构以 DOT 格式打印到屏幕上。
支持多种文件格式。要使用它们,请添加参数 --save [filename].[format]
。
将 example_complex
DAG 保存为 PNG 文件
airflow dags show example_complex --save example_complex.png
这将把以下图片保存为一个文件

DAG 示例表示¶
支持以下文件格式
bmp
canon
,dot
,gv
,xdot
,xdot1.2
,xdot1.4
cgimage
cmap
eps
exr
fig
gd
,gd2
gif
gtk
ico
imap
,cmapx
imap_np
,cmapx_np
ismap
jp2
jpg
,jpeg
,jpe
json
,json0
,dot_json
,xdot_json
pct
,pict
pic
plain
,plain-ext
png
pov
ps
ps2
psd
sgi
svg
,svgz
tga
tif
,tiff
tk
vml
,vmlz
vrml
wbmp
webp
xlib
x11
默认情况下,Airflow 在 airflow.cfg
文件 [core]
部分的 dags_folder
选项指定的目录中查找 DAG。可以使用 --subdir
参数选择新目录。
显示 DAG 结构¶
有时你会处理包含复杂依赖关系的 DAG。此时预览 DAG 以检查其是否正确会很有帮助。
如果你使用 macOS,可以结合 imgcat 脚本使用 iTerm2 在控制台中显示 DAG 结构。你还需要安装 Graphviz。
其他终端不支持显示高质量图形。你可以将图片转换为文本形式,但其分辨率会让你无法阅读。
为此,你应该在 airflow dags show
命令中使用 --imgcat
开关。例如,如果你想显示 example_bash_operator
DAG,可以使用以下命令
airflow dags show example_bash_operator --imgcat
你将看到与下方截图类似的结果。

在 iTerm2 中预览 DAG¶
格式化命令输出¶
一些 Airflow 命令,例如 airflow dags list
或 airflow tasks states-for-dag-run
支持 --output
标志,允许用户更改命令输出的格式。可能的选项有
table
- 将信息渲染为纯文本表格
simple
- 将信息渲染为简单表格,可由标准 Linux 工具解析
json
- 将信息渲染为 JSON 字符串形式
yaml
- 将信息渲染为有效的 YAML 形式
json
和 yaml
格式都使得使用命令行工具(如 jq 或 yq)处理数据更加容易。例如
airflow tasks states-for-dag-run example_complex 2020-11-13T00:00:00+00:00 --output json | jq ".[] | {sd: .start_date, ed: .end_date}"
{
"sd": "2020-11-29T14:53:46.811030+00:00",
"ed": "2020-11-29T14:53:46.974545+00:00"
}
{
"sd": "2020-11-29T14:53:56.926441+00:00",
"ed": "2020-11-29T14:53:57.118781+00:00"
}
{
"sd": "2020-11-29T14:53:56.915802+00:00",
"ed": "2020-11-29T14:53:57.125230+00:00"
}
{
"sd": "2020-11-29T14:53:56.922131+00:00",
"ed": "2020-11-29T14:53:57.129091+00:00"
}
{
"sd": "2020-11-29T14:53:56.931243+00:00",
"ed": "2020-11-29T14:53:57.126306+00:00"
}
清除元数据数据库中的历史记录¶
注意
强烈建议你在运行 db clean
命令之前备份元数据数据库。
db clean
命令通过从每个表中删除早于提供的 --clean-before-timestamp
的记录来工作。
你可以选择提供一个要执行删除操作的表列表。如果未提供表列表,将包含所有表。
可以使用 --dry-run
选项打印要清理的主表中的行数。
默认情况下,db clean
会将清除的行存档到形如 _airflow_deleted__<table>__<timestamp>
的表中。如果你不想以这种方式保留数据,可以提供参数 --skip-archive
。
当你在未使用 --skip-archive
的情况下遇到错误时,_airflow_deleted__<table>__<timestamp>
仍会存在于数据库中。可以使用 db drop-archived
命令手动删除这些表。
从存档表中导出已清除的记录¶
db export-archived
命令将 db clean
命令创建的存档表内容导出为指定格式,默认为 CSV 文件。导出的文件将包含在 db clean
过程中从主表中清除的记录。
可以使用 --export-format
选项指定导出格式。默认格式为 csv,目前也是唯一支持的格式。
还必须使用 --output-path
选项指定要导出数据到的路径位置。该位置必须存在。
其他选项包括:使用 --tables
指定要导出的表,使用 --drop-archives
在导出后删除存档表。
删除存档表¶
如果在 db clean
过程中未使用删除存档表的 --skip-archive
选项,仍然可以使用 db drop-archived
命令删除存档表。此操作不可逆,建议在删除表之前使用 db export-archived
命令将表备份到磁盘。
可以使用 --tables
选项指定要删除的表。如果未指定表,将删除所有存档表。
警惕级联删除¶
请记住,有些表定义了带有 ON DELETE CASCADE
的外键关系,因此在一个表中的删除可能会触发其他表中的删除。例如,task_instance
表与 dag_run
表关联,因此如果一个 DagRun 记录被删除,所有关联的任务实例也将被删除。
DAG 运行的特殊处理¶
通常,Airflow 通过查找最新的 DagRun 来确定下一个要运行的 DagRun。如果你删除所有 DAG 运行,Airflow 可能会调度一个已完成的旧 DagRun,例如如果你设置了 catchup=True
。因此,db clean
命令将保留最新的非手动触发的 DAG 运行,以保持调度的连续性。
可回填 DAG 的注意事项¶
并非所有 DAG 都设计用于 Airflow 的回填命令。但对于那些适用的 DAG,需要特别注意。如果你删除 DAG 运行,并且在包含已删除 DAG 运行的日期范围内运行回填,这些运行将被重新创建并再次运行。因此,如果你的 DAG 属于此类,你可能希望避免删除 DAG 运行,而只清理其他大型表,例如任务实例和日志等。
升级 Airflow¶
运行 airflow db migrate --help
获取使用详情。
手动运行迁移¶
如果需要,可以为升级生成 SQL 语句,并手动逐个应用每个升级迁移。为此,可以在 db migrate
命令中使用 --range
(针对 Airflow 版本)或 --revision-range
(针对 Alembic revision)选项。切勿跳过运行 Alembic revision ID 更新命令;Airflow 通过此信息知道下次需要从何处升级。有关 revision 和版本之间的映射,请参阅 数据库迁移参考。
降级 Airflow¶
注意
建议在运行 db downgrade
或任何其他数据库操作之前备份数据库。
可以使用 db downgrade
命令降级到特定的 Airflow 版本。或者,可以提供一个 Alembic revision ID 进行降级。
如果只想预览命令而不执行它们,请使用 --show-sql-only
选项。
选项 --from-revision
和 --from-version
只能与 --show-sql-only
选项结合使用,因为在实际运行迁移时,我们应该总是从当前 revision 降级。
有关 Airflow 版本和 Alembic revision 之间的映射,请参阅 数据库迁移参考。
注意
强烈建议你在完成 Airflow 环境降级后(即在你降级了 Python 环境中安装的 Airflow 版本后,而不是立即在降级数据库后)使用 dags reserialize
命令重新序列化你的 DAG。这是为了确保序列化的 DAG 与降级后的 Airflow 版本兼容。
导出连接¶
可以使用 CLI 从数据库导出连接。支持的文件格式有 json
、yaml
和 env
。
可以将目标文件指定为参数
airflow connections export connections.json
或者可以指定 file-format
参数来覆盖文件格式
airflow connections export /tmp/connections --file-format yaml
也可以指定 -
表示输出到标准输出 (STDOUT)
airflow connections export -
JSON 格式包含一个对象,其中键是连接 ID,值是连接的定义。在这种格式中,连接被定义为一个 JSON 对象。以下是一个示例 JSON 文件。
{
"airflow_db": {
"conn_type": "mysql",
"host": "mysql",
"login": "root",
"password": "plainpassword",
"schema": "airflow",
"port": null,
"extra": null
},
"druid_broker_default": {
"conn_type": "druid",
"host": "druid-broker",
"login": null,
"password": null,
"schema": null,
"port": 8082,
"extra": "{\"endpoint\": \"druid/v2/sql\"}"
}
}
YAML 文件结构与 JSON 类似。它是一个由连接 ID 和一个或多个连接定义组成的键值对。在这种格式中,连接被定义为一个 YAML 对象。以下是一个示例 YAML 文件。
airflow_db:
conn_type: mysql
extra: null
host: mysql
login: root
password: plainpassword
port: null
schema: airflow
druid_broker_default:
conn_type: druid
extra: '{"endpoint": "druid/v2/sql"}'
host: druid-broker
login: null
password: null
port: 8082
schema: null
还可以以 .env
格式导出连接。键是连接 ID,值是连接的序列化表示,可以使用 Airflow 的 Connection URI 格式或 JSON 格式。要使用 JSON,请提供选项 --serialization-format=json
,否则将使用 Airflow Connection URI 格式。以下是使用这两种格式的示例 .env
文件。
URI 示例
airflow_db=mysql://root:plainpassword@mysql/airflow
druid_broker_default=druid://druid-broker:8082?endpoint=druid%2Fv2%2Fsql
JSON 示例输出
airflow_db={"conn_type": "mysql", "login": "root", "password": "plainpassword", "host": "mysql", "schema": "airflow"}
druid_broker_default={"conn_type": "druid", "host": "druid-broker", "port": 8082, "extra": "{\"endpoint\": \"druid/v2/sql\"}"}
测试 DAG 导入错误¶
CLI 可用于通过 list-import-errors
子命令检查发现的任何 DAG 是否存在导入错误。可以创建一个自动化步骤,通过检查命令输出(特别是在与 --output
结合使用生成标准文件格式时),如果任何 DAG 无法导入则使其失败。例如,没有错误时的默认输出是 No data found
,而 json 输出是 []
。然后可以在 CI 或 pre-commit 中运行此检查,以加快评审和测试过程。
使用 jq 解析输出,当存在任何错误时会失败的示例命令
airflow dags list-import-errors --output=json | jq -e 'select(type=="array" and length == 0)'
该行可以直接添加到自动化流程中,或者如果你想打印输出,可以使用 tee
命令
airflow dags list-import-errors | tee import_errors.txt && jq -e 'select(type=="array" and length == 0)' import_errors.txt
Jenkins pipeline 中的示例
stage('All dags are loadable') {
steps {
sh 'airflow dags list-import-errors | tee import_errors.txt && jq -e \'select(type=="array" and length == 0)\' import_errors.txt'
}
}
注意
为了使其准确工作,必须确保 Airflow 不会向标准输出 (stdout) 记录任何额外文本。例如,你可能需要修复任何弃用警告,向命令添加 2>/dev/null
,或者如果你有在加载时生成日志的插件,则在 Airflow 配置中设置 lazy_load_plugins = True
。