管理连接
另请参阅
有关钩子(Hooks)和连接的概述,请参阅 连接与钩子。
Airflow 的 Connection 对象用于存储连接外部服务所需的凭据及其他信息。
连接可以通过以下方式定义:
在 环境变量 中
在外部 Secrets Backend (密钥后端) 中
在 Airflow 元数据数据库 中(使用 CLI 或 Web UI)
在环境变量中存储连接
Airflow 连接可以在环境变量中定义。
命名约定为 AIRFLOW_CONN_{CONN_ID},全部大写(注意 CONN 前后各有一个下划线)。因此,如果您的连接 ID 是 my_prod_db,则变量名应为 AIRFLOW_CONN_MY_PROD_DB。
值可以是 JSON 或 Airflow 的 URI 格式。
JSON 格式示例
在 2.3.0 版本中添加。
如果使用 JSON 序列化
export AIRFLOW_CONN_MY_PROD_DATABASE='{
"conn_type": "my-conn-type",
"login": "my-login",
"password": "my-password",
"host": "my-host",
"port": 1234,
"schema": "my-schema",
"extra": {
"param1": "val1",
"param2": "val2"
}
}'
生成 JSON 格式的连接表示
2.8.0 版本新增。
为了使连接 JSON 的生成更容易,Connection 类具有一个便捷属性 as_json()。它可以这样使用:
>>> from airflow.sdk import Connection
>>> c = Connection(
... conn_id="some_conn",
... conn_type="mysql",
... description="connection description",
... host="myhost.com",
... login="myname",
... password="mypassword",
... extra={"this_param": "some val", "that_param": "other val*"},
... )
>>> print(f"AIRFLOW_CONN_{c.conn_id.upper()}='{c.as_json()}'")
AIRFLOW_CONN_SOME_CONN='{"conn_type": "mysql", "description": "connection description", "host": "myhost.com", "login": "myname", "password": "mypassword", "extra": {"this_param": "some val", "that_param": "other val*"}}'
此外,同样的方法也可用于将连接从 URI 格式转换为 JSON 格式。
>>> from airflow.sdk import Connection
>>> c = Connection(
... conn_id="awesome_conn",
... description="Example Connection",
... uri="aws://AKIAIOSFODNN7EXAMPLE:wJalrXUtnFEMI%2FK7MDENG%2FbPxRfiCYEXAMPLEKEY@/?__extra__=%7B%22region_name%22%3A+%22eu-central-1%22%2C+%22config_kwargs%22%3A+%7B%22retries%22%3A+%7B%22mode%22%3A+%22standard%22%2C+%22max_attempts%22%3A+10%7D%7D%7D",
... )
>>> print(f"AIRFLOW_CONN_{c.conn_id.upper()}='{c.as_json()}'")
AIRFLOW_CONN_AWESOME_CONN='{"conn_type": "aws", "description": "Example Connection", "host": "", "login": "AKIAIOSFODNN7EXAMPLE", "password": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", "schema": "", "extra": {"region_name": "eu-central-1", "config_kwargs": {"retries": {"mode": "standard", "max_attempts": 10}}}}'
URI 格式示例
如果使用 Airflow URI 序列化
export AIRFLOW_CONN_MY_PROD_DATABASE='my-conn-type://login:password@host:port/schema?param1=val1¶m2=val2'
有关如何生成有效 URI 的更多详细信息,请参阅 连接 URI 格式。
在 UI 和 CLI 中的可见性
通过环境变量定义的连接不会显示在 Airflow UI 中,也不会通过 airflow connections list 列出。
这是因为这些连接是在运行时动态解析的,通常在执行您任务的工作进程(worker)上进行。它们不会存储在元数据数据库中,也不会加载到 Web 服务器或调度器的环境中。
这支持安全部署模式,即基于环境的密钥(例如通过 .env 文件、Docker 或 Kubernetes 密钥)仅注入到运行时组件(如工作进程)中,而不是注入到暴露给用户的组件(如 Web 服务器)中。
如果您需要为了可见性或编辑而在 UI 中显示连接,请改用元数据数据库进行定义。
在 Secrets Backend 中存储连接
您可以将 Airflow 连接存储在外部密钥后端(Secrets Backend)中,例如 HashiCorp Vault、AWS SSM Parameter Store 等服务。有关更多详细信息,请参阅 Secrets Backend (密钥后端)。
在数据库中存储连接
当将连接存储在数据库中时,您可以使用 Web UI 或 Airflow CLI 来管理它们。
使用 UI 创建连接
打开 UI 的 Admin->Connections 部分。点击 Add Connection 链接以创建新连接。
在
Connection Id字段中填写所需的连接 ID。建议使用小写字符,并用下划线分隔单词。在
Connection Type字段中选择连接类型。填写其余字段。有关不同连接类型所属字段的描述,请参阅 处理 extra 中的任意字典。
点击
Save按钮以创建连接。
使用 UI 编辑连接
打开 UI 的 Admin->Connections 部分。在连接列表中,点击您希望编辑的连接旁边的铅笔图标。
修改连接属性,然后点击 Save 按钮保存您的更改。
从 CLI 创建连接
您可以从 CLI 向数据库添加连接。
您可以使用 JSON 格式添加连接(自 2.3.0 版本起)。
airflow connections add 'my_prod_db' \
--conn-json '{
"conn_type": "my-conn-type",
"login": "my-login",
"password": "my-password",
"host": "my-host",
"port": 1234,
"schema": "my-schema",
"extra": {
"param1": "val1",
"param2": "val2"
}
}'
或者,您可以使用 Airflow 的连接 URI 格式(请参阅 生成连接 URI)。
airflow connections add 'my_prod_db' \
--conn-uri '<conn-type>://<login>:<password>@<host>:<port>/<schema>?param1=val1¶m2=val2&...'
最后,您也可以分别指定每个参数。
airflow connections add 'my_prod_db' \
--conn-type 'my-conn-type' \
--conn-login 'login' \
--conn-password 'password' \
--conn-host 'host' \
--conn-port 'port' \
--conn-schema 'schema' \
...
将连接导出到文件
您可以将存储在数据库中的连接导出到文件(例如用于将连接从一个环境迁移到另一个环境)。有关用法,请参阅 导出连接。
数据库中连接的安全性
对于存储在 Airflow 元数据数据库中的连接,Airflow 使用 Fernet 对密码和其他潜在敏感数据进行加密。它保证如果没有加密密码,就无法在没有密钥的情况下篡改或读取连接密码。有关配置 Fernet 的信息,请查看 Fernet。
测试连接
出于安全考虑,测试连接功能在 Airflow UI、API 和 CLI 中默认是禁用的。
有关用户能力的更多信息,请参阅文档:https://airflow.apache.org/docs/apache-airflow/stable/security/security_model.html#capabilities-of-authenticated-ui-users。强烈建议在确保只有高度可信的 UI/API 用户拥有“编辑连接”权限之前,不要启用此功能。
该功能的可使用性可以通过 Airflow 配置文件 (airflow.cfg) 中 core 部分的 test_connection 标志进行控制。它也可以通过环境变量 AIRFLOW__CORE__TEST_CONNECTION 进行控制。
此配置参数接受以下值:
Disabled: 禁用测试连接功能,并禁用 UI 中的“Test Connection”按钮。这也是 Airflow 配置中的默认值。
Enabled: 启用测试连接功能,并激活 UI 中的“Test Connection”按钮。
Hidden: 禁用测试连接功能,并在 UI 中隐藏“Test Connection”按钮。
启用“Test Connection”后,它可以在 UI 的 创建 或 编辑 连接页面中使用,通过调用 连接 REST API 使用,或者通过运行 airflow connections test CLI 命令 使用。
警告
当使用 Airflow UI 或 REST API 时,此功能对于驻留在外部密钥后端中的连接将不可用。
为了测试连接,Airflow 会调用关联钩子类中的 test_connection 方法并报告结果。可能会出现连接类型没有任何关联钩子,或者钩子没有实现 test_connection 方法的情况;在这两种情况下,都会显示错误消息或禁用该功能(如果您正在 UI 中进行测试)。
注意
在 Airflow UI 中进行测试时,测试将从 Web 服务器执行,因此此功能受为您的 Web 服务器设置的网络出口规则的约束。
注意
如果 Web 服务器和工作机(如果通过 Airflow UI 测试)或机器/Pod(如果通过 Airflow CLI 测试)安装了不同的库或提供程序,则测试结果可能会有所不同。
自定义连接类型
Airflow 允许定义自定义连接类型,包括对连接的添加/编辑表单进行修改。自定义连接类型在社区维护的提供程序(providers)中定义,但您也可以添加一个添加自定义连接类型的自定义提供程序。有关如何添加自定义提供程序的描述,请参阅 提供程序 (Providers)。
通过在您的 provider.yaml 中的 connection-types 数组暴露连接类型,您可以通过以下方式自定义 Airflow:
添加自定义连接类型
从连接类型自动创建钩子
添加自定义表单字段,以显示和编辑连接 URL 中的自定义“extra”参数
隐藏您的连接不使用的标准字段
添加展示字段应如何格式化的示例占位符
您可以阅读有关如何添加自定义提供程序的更多详细信息,请参阅 提供程序 (Providers)。
自定义连接字段
注意
首选方法:在 provider.yaml 中定义连接 UI 元数据
自 Airflow 3.2 起,定义自定义连接字段和字段行为的首选方式是在 provider.yaml 中声明式定义。这种方法不需要在运行时导入 flask_appbuilder 或 wtforms。
Python 钩子方法 get_connection_form_widgets() 和 get_ui_field_behaviour() 将作为后备方案继续工作,并且仅在弃用通知和迁移窗口后才会被移除。使用旧方法编写的自定义提供程序将保持正常工作。但是,新的提供程序应使用下面描述的 YAML 方法。
在 provider.yaml 中定义连接 UI 元数据
连接表单元数据在您的提供程序的 provider.yaml 文件中的 connection-types 下以声明方式定义。有两个部分:
conn-fields — 存储在 Connection.extra 中的自定义字段
connection-types:
- hook-class-name: airflow.providers.myservice.hooks.myservice.MyServiceHook
connection-type: myservice
conn-fields:
workspace:
label: Workspace
schema:
type:
- string
- 'null'
project:
label: Project ID
schema:
type:
- string
- 'null'
ui-field-behaviour — 对标准连接字段的自定义(隐藏、重新标记、占位符)
connection-types:
- hook-class-name: airflow.providers.myservice.hooks.myservice.MyServiceHook
connection-type: myservice
ui-field-behaviour:
hidden-fields:
- port
- host
- login
- schema
relabeling:
password: API Token
placeholders:
password: your-api-token
workspace: My workspace gid
project: My project gid
字段架构类型遵循 JSON Schema 约定。
有关支持的 conn-fields 架构选项的完整参考,请参阅 使用 Params 提供触发器 UI 表单。
在 Python 中定义连接 UI 元数据(旧版)
注意
下面的 Python 方法仍然有效,在没有弃用通知的情况下不会被移除。但是,新的提供程序应使用上面描述的 YAML 方法。
可以通过在 Hook 类上实现 get_connection_form_widgets(),在连接添加/编辑视图中添加自定义表单字段。键应该是字段的字符串名称,就像它在 extra 字典中存储的那样。值应该是 wtforms.fields.core.Field 的继承者。
下面是一个示例
@staticmethod
def get_connection_form_widgets() -> dict[str, Any]:
"""Returns connection widgets to add to connection form"""
from flask_appbuilder.fieldwidgets import BS3TextFieldWidget
from flask_babel import lazy_gettext
from wtforms import StringField
return {
"workspace": StringField(lazy_gettext("Workspace"), widget=BS3TextFieldWidget()),
"project": StringField(lazy_gettext("Project"), widget=BS3TextFieldWidget()),
}
注意
自定义字段不再需要 extra__<conn type>__ 前缀
在 Airflow 2.3 之前,如果您想在 UI 中使用自定义字段,必须给它加上 extra__<conn type>__ 前缀,这就是它的值在 extra 字典中的存储方式。从 2.3 版本开始,您不再需要这样做。
get_ui_field_behaviour() 方法让您可以自定义标准字段的行为。例如,您可以隐藏或重新标记字段(例如,如果它未被使用或已重新利用),并且可以添加占位符文本。
一个示例:
@staticmethod
def get_ui_field_behaviour() -> dict[str, Any]:
"""Returns custom field behaviour"""
return {
"hidden_fields": ["port", "host", "login", "schema"],
"relabeling": {},
"placeholders": {
"password": "Asana personal access token",
"workspace": "My workspace gid",
"project": "My project gid",
},
}
注意
如果您想为名称与标准连接属性(即 login, password, host, scheme, port, extra)冲突的 extra 字段添加表单占位符,则必须给它加上 extra__<conn type>__ 前缀。例如 extra__myservice__password。
看看提供程序的示例,了解您可以做什么,例如 JdbcHook。
注意
已弃用的 hook-class-names
在 Airflow 2.2.0 之前,提供程序中的连接通过提供程序元数据中的 hook-class-names 数组暴露。然而,这已被证明在工作进程中使用单个钩子时效率低下,并且 hook-class-names 数组现在已被 connection-types 数组取代。在提供程序支持 Airflow 2.2.0 以下版本之前,应同时存在 connection-types 和 hook-class-names。CI 构建过程中的自动检查将验证这两个数组的一致性。
URI 格式
注意
自 2.3.0 版本起,您可以使用 JSON 来序列化连接。请参阅 示例。
出于历史原因,Airflow 有一种特殊的 URI 格式,可用于将 Connection 对象序列化为字符串值。
通常,Airflow 的 URI 格式如下所示:
my-conn-type://my-login:my-password@my-host:5432/my-schema?param1=val1¶m2=val2
上述 URI 将产生一个等效于以下内容的 Connection 对象:
Connection(
conn_id="",
conn_type="my_conn_type",
description=None,
login="my-login",
password="my-password",
host="my-host",
port=5432,
schema="my-schema",
extra=json.dumps(dict(param1="val1", param2="val2")),
)
生成连接 URI
为了使连接 URI 的生成更容易,Connection 类具有一个便捷方法 get_uri()。它可以这样使用:
>>> import json
>>> from airflow.sdk import Connection
>>> c = Connection(
... conn_id="some_conn",
... conn_type="mysql",
... description="connection description",
... host="myhost.com",
... login="myname",
... password="mypassword",
... extra=json.dumps(dict(this_param="some val", that_param="other val*")),
... )
>>> print(f"AIRFLOW_CONN_{c.conn_id.upper()}='{c.get_uri()}'")
AIRFLOW_CONN_SOME_CONN='mysql://myname:mypassword@myhost.com?this_param=some+val&that_param=other+val%2A'
注意
get_uri() 方法以 Airflow 格式返回连接 URI,而不是 SQLAlchemy 兼容的 URI。如果您需要用于数据库连接的 SQLAlchemy 兼容 URI,请改用 sqlalchemy_url 属性。
此外,如果您已经创建了一个连接,则可以使用 airflow connections get 命令。
$ airflow connections get sqlite_default
Id: 40
Connection Id: sqlite_default
Connection Type: sqlite
Host: /tmp/sqlite_default.db
Schema: null
Login: null
Password: null
Port: null
Is Encrypted: false
Is Extra Encrypted: false
Extra: {}
URI: sqlite://%2Ftmp%2Fsqlite_default.db
处理 extra 中的任意字典
某些 JSON 结构在不丢失数据的情况下无法进行 URL 编码。对于此类 JSON,get_uri 将把整个字符串存储在 URL 查询参数 __extra__ 下。
例如
>>> extra_dict = {"my_val": ["list", "of", "values"], "extra": {"nested": {"json": "val"}}}
>>> c = Connection(
... conn_type="scheme",
... host="host/location",
... schema="schema",
... login="user",
... password="password",
... port=1234,
... extra=json.dumps(extra_dict),
... )
>>> uri = c.get_uri()
>>> uri
'scheme://user:password@host%2Flocation:1234/schema?__extra__=%7B%22my_val%22%3A+%5B%22list%22%2C+%22of%22%2C+%22values%22%5D%2C+%22extra%22%3A+%7B%22nested%22%3A+%7B%22json%22%3A+%22val%22%7D%7D%7D'
我们可以验证它返回的是相同的字典:
>>> new_c = Connection(uri=uri)
>>> new_c.extra_dejson == extra_dict
True
但对于只存储键值对的最常见情况,使用普通的 URL 编码。
您可以这样验证 URI 是否被正确解析:
>>> from airflow.sdk import Connection
>>> c = Connection(uri="my-conn-type://my-login:my-password@my-host:5432/my-schema?param1=val1¶m2=val2")
>>> print(c.login)
my-login
>>> print(c.password)
my-password
处理连接参数中的特殊字符
注意
在生成连接时,请使用 生成连接 URI 部分所述的便捷方法 Connection.get_uri。本节仅供参考。
手动构建 URI 时,某些字符需要特殊处理。
例如,如果您的密码包含 /,这将失败:
>>> c = Connection(uri="my-conn-type://my-login:my-pa/ssword@my-host:5432/my-schema?param1=val1¶m2=val2")
ValueError: invalid literal for int() with base 10: 'my-pa'
要修复此问题,您可以使用 quote_plus() 进行编码:
>>> c = Connection(uri="my-conn-type://my-login:my-pa%2Fssword@my-host:5432/my-schema?param1=val1¶m2=val2")
>>> print(c.password)
my-pa/ssword