管理连接¶
另请参见
有关钩子和连接的概述,请参阅 连接和钩子。
Airflow 的 连接
对象用于存储连接到外部服务的凭据和其他必要信息。
连接可以通过以下方式定义
在 环境变量 中
在外部 秘密后端 中
在 Airflow 元数据数据库 中(使用 CLI 或 Web UI)
在环境变量中存储连接¶
Airflow 连接可以在环境变量中定义。
命名约定为 AIRFLOW_CONN_{CONN_ID}
,全部大写(注意 CONN
> 周围的单下划线)。因此,如果你的连接 ID 为 my_prod_db
,则变量名称应为 AIRFLOW_CONN_MY_PROD_DB
。
值可以是 JSON 或 Airflow 的 URI 格式。
JSON 格式示例¶
2.3.0 版新增功能。
如果使用 JSON 序列化
export AIRFLOW_CONN_MY_PROD_DATABASE='{
"conn_type": "my-conn-type",
"login": "my-login",
"password": "my-password",
"host": "my-host",
"port": 1234,
"schema": "my-schema",
"extra": {
"param1": "val1",
"param2": "val2"
}
}'
生成 JSON 连接表示¶
2.8.0 版新增功能。
为了简化连接 JSON 生成,Connection
类具有一个便捷属性 as_json()
。可以使用它,如下所示
>>> from airflow.models.connection import Connection
>>> c = Connection(
... conn_id="some_conn",
... conn_type="mysql",
... description="connection description",
... host="myhost.com",
... login="myname",
... password="mypassword",
... extra={"this_param": "some val", "that_param": "other val*"},
... )
>>> print(f"AIRFLOW_CONN_{c.conn_id.upper()}='{c.as_json()}'")
AIRFLOW_CONN_SOME_CONN='{"conn_type": "mysql", "description": "connection description", "host": "myhost.com", "login": "myname", "password": "mypassword", "extra": {"this_param": "some val", "that_param": "other val*"}}'
此外,可以使用相同的方法将连接从 URI 格式转换为 JSON 格式
>>> from airflow.models.connection import Connection
>>> c = Connection(
... conn_id="awesome_conn",
... description="Example Connection",
... uri="aws://AKIAIOSFODNN7EXAMPLE:wJalrXUtnFEMI%2FK7MDENG%2FbPxRfiCYEXAMPLEKEY@/?__extra__=%7B%22region_name%22%3A+%22eu-central-1%22%2C+%22config_kwargs%22%3A+%7B%22retries%22%3A+%7B%22mode%22%3A+%22standard%22%2C+%22max_attempts%22%3A+10%7D%7D%7D",
... )
>>> print(f"AIRFLOW_CONN_{c.conn_id.upper()}='{c.as_json()}'")
AIRFLOW_CONN_AWESOME_CONN='{"conn_type": "aws", "description": "Example Connection", "host": "", "login": "AKIAIOSFODNN7EXAMPLE", "password": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", "schema": "", "extra": {"region_name": "eu-central-1", "config_kwargs": {"retries": {"mode": "standard", "max_attempts": 10}}}}'
在 Secrets Backend 中存储连接¶
可以在外部 secrets 后端(如 HashiCorp Vault、AWS SSM Parameter Store 和其他此类服务)中存储 Airflow 连接。有关更多详细信息,请参阅 Secrets Backend。
在数据库中存储连接¶
另请参见
连接也可以存储在 环境变量 或 外部 secrets 后端(如 HashiCorp Vault、AWS SSM Parameter Store 等)中。
在数据库中存储连接时,可以使用 Web UI 或 Airflow CLI 来管理它们。
使用 UI 创建连接¶
打开 UI 的 Admin->Connections
部分。单击 Create
链接以创建新连接。
使用所需的连接 ID 填写
Connection Id
字段。建议使用小写字符,并用下划线分隔单词。使用
Connection Type
字段选择连接类型。填写剩余字段。请参阅 处理 extra 中的任意 dict,了解属于不同连接类型的字段的说明。
单击
Save
按钮以创建连接。
从 CLI 创建连接¶
您可以从 CLI 向数据库添加连接。
您可以使用 JSON 格式添加连接(从版本 2.3.0 开始)
airflow connections add 'my_prod_db' \
--conn-json '{
"conn_type": "my-conn-type",
"login": "my-login",
"password": "my-password",
"host": "my-host",
"port": 1234,
"schema": "my-schema",
"extra": {
"param1": "val1",
"param2": "val2"
}
}'
或者,您可以使用 Airflow 的连接 URI 格式(请参阅 生成连接 URI)。
airflow connections add 'my_prod_db' \
--conn-uri '<conn-type>://<login>:<password>@<host>:<port>/<schema>?param1=val1¶m2=val2&...'
最后,您还可以单独指定每个参数
airflow connections add 'my_prod_db' \
--conn-type 'my-conn-type' \
--conn-login 'login' \
--conn-password 'password' \
--conn-host 'host' \
--conn-port 'port' \
--conn-schema 'schema' \
...
数据库中连接的安全性¶
对于存储在 Airflow 元数据数据库中的连接,Airflow 使用 Fernet 来加密密码和其他可能敏感的数据。它保证在没有加密密码的情况下,无法在没有密钥的情况下处理或读取连接密码。有关配置 Fernet 的信息,请参阅 Fernet。
测试连接¶
出于安全原因,默认情况下在 Airflow UI、API 和 CLI 中禁用测试连接功能。
有关用户功能的更多信息,请参阅文档:https://airflow.apache.org/docs/apache-airflow/stable/security/security_model.html#capabilities-of-authenticated-ui-users。强烈建议在确保只有高度可信的 UI/API 用户拥有“编辑连接”权限之前不要启用此功能。
此功能的可用性可以通过 Airflow 配置(airflow.cfg)的核心部分中的 test_connection 标志进行控制。它还可以通过环境变量 AIRFLOW__CORE__TEST_CONNECTION
进行控制。
此配置参数接受以下值
已禁用:禁用测试连接功能,并在 UI 中禁用“测试连接”按钮。这也是在 Airflow 配置中设置的默认值。
已启用:启用测试连接功能,并在 UI 中激活“测试连接”按钮。
已隐藏:禁用测试连接功能,并在 UI 中隐藏“测试连接”按钮。
启用“测试连接”后,可以通过以下方式使用它:在 UI 中调用 创建 或 编辑 连接页面,通过调用 连接 REST API,或运行 airflow connections test
CLI 命令。
警告
使用 Airflow UI 或 REST API 时,此功能对于驻留在外部机密后端的连接不可用。
要测试连接,Airflow 会从关联的钩子类调用 test_connection
方法并报告结果。可能发生的情况是,连接类型没有任何关联的钩子,或者钩子没有 test_connection
方法实现,在任何一种情况下,都会显示错误消息或禁用功能(如果你在 UI 中进行测试)。
注意
在 Airflow UI 中进行测试时,测试会从 Web 服务器执行,因此此功能受为 Web 服务器设置的网络出口规则约束。
注意
如果 Web 服务器和工作机(如果通过 Airflow UI 进行测试)或机器/Pod(如果通过 Airflow CLI 进行测试)安装了不同的库或提供程序包,则测试结果可能会有所不同。
自定义连接类型¶
Airflow 允许定义自定义连接类型,包括修改连接的添加/编辑表单。自定义连接类型在社区维护的提供程序中定义,但你还可以添加一个添加自定义连接类型的自定义提供程序。有关如何添加自定义提供程序的说明,请参阅 提供程序包。
自定义连接类型通过提供程序提供的钩子定义。钩子可以实现协议类 DiscoverableHook
中定义的方法。请注意,您的自定义钩子不应派生自此类,此类是一个示例,用于记录有关您的钩子可能定义的类字段和方法的期望。另一个好的示例是 JdbcHook
。
通过在您的钩子中实现这些方法并通过 connection-types
数组(和已弃用的 hook-class-names
)在提供程序元数据中公开它们,您可以通过以下方式自定义 Airflow
添加自定义连接类型
添加从连接类型自动创建钩子
添加自定义表单小部件以显示和编辑连接 URL 中的自定义“额外”参数
隐藏未用于您的连接的字段
添加占位符,显示字段应如何格式化的示例
您可以在 提供程序包 中阅读有关如何添加自定义提供程序包的详细信息
自定义连接字段¶
可以在 Airflow Web 服务器的连接添加/编辑视图中添加自定义表单字段。自定义字段存储在 Connection.extra
字段中,格式为 JSON。要添加自定义字段,请实现方法 get_connection_form_widgets()
。此方法应返回一个字典。键应该是字段的字符串名称,因为它应存储在 extra
字典中。值应该是 wtforms.fields.core.Field
的继承者。
这是一个示例
@staticmethod
def get_connection_form_widgets() -> dict[str, Any]:
"""Returns connection widgets to add to connection form"""
from flask_appbuilder.fieldwidgets import BS3TextFieldWidget
from flask_babel import lazy_gettext
from wtforms import StringField
return {
"workspace": StringField(lazy_gettext("Workspace"), widget=BS3TextFieldWidget()),
"project": StringField(lazy_gettext("Project"), widget=BS3TextFieldWidget()),
}
注意
自定义字段不再需要 extra__<conn type>__
前缀
在 Airflow 2.3 之前,如果您想要在 UI 中添加一个自定义字段,则必须使用 extra__<conn type>__
为其添加前缀,并且这是其值在 extra
字典中的存储方式。从 2.3 开始,您不再需要这样做。
方法 get_ui_field_behaviour()
允许您自定义这两个方法的行为。例如,您可以隐藏或重新标记一个字段(例如,如果它未被使用或重新用于其他用途),并且您可以添加占位符文本。
一个示例
@staticmethod
def get_ui_field_behaviour() -> dict[str, Any]:
"""Returns custom field behaviour"""
return {
"hidden_fields": ["port", "host", "login", "schema"],
"relabeling": {},
"placeholders": {
"password": "Asana personal access token",
"workspace": "My workspace gid",
"project": "My project gid",
},
}
注意
如果您想为名称与标准连接属性(即登录名、密码、主机、方案、端口、额外)冲突的 extra
字段添加表单占位符,则必须使用 extra__<conn type>__
为其添加前缀。例如 extra__myservice__password
。
查看提供程序以了解您可以执行的操作示例,例如 JdbcHook
和 AsanaHook
都使用了此功能。
注意
已弃用的 hook-class-names
在 Airflow 2.2.0 之前,提供程序中的连接已通过提供程序元数据中的 hook-class-names
数组公开。但是,当在工作进程中使用单独的挂钩时,这已被证明效率低下,并且 hook-class-names
数组现在已被 connection-types
数组替换。在提供程序支持低于 2.2.0 的 Airflow 之前,connection-types
和 hook-class-names
都应存在。CI 构建期间的自动化检查将验证这两个数组的一致性。
URI 格式¶
注意
从版本 2.3.0 开始,您可以使用 JSON 序列化连接。请参阅 示例。
出于历史原因,Airflow 具有特殊的 URI 格式,可用于将 Connection 对象序列化为字符串值。
通常,Airflow 的 URI 格式如下所示
my-conn-type://my-login:my-password@my-host:5432/my-schema?param1=val1¶m2=val2
上述 URI 将生成等效于以下内容的 Connection
对象
Connection(
conn_id="",
conn_type="my_conn_type",
description=None,
login="my-login",
password="my-password",
host="my-host",
port=5432,
schema="my-schema",
extra=json.dumps(dict(param1="val1", param2="val2")),
)
生成连接 URI¶
为了简化连接 URI 生成,Connection
类提供了一个便捷方法 get_uri()
。它可以这样使用
>>> import json
>>> from airflow.models.connection import Connection
>>> c = Connection(
... conn_id="some_conn",
... conn_type="mysql",
... description="connection description",
... host="myhost.com",
... login="myname",
... password="mypassword",
... extra=json.dumps(dict(this_param="some val", that_param="other val*")),
... )
>>> print(f"AIRFLOW_CONN_{c.conn_id.upper()}='{c.get_uri()}'")
AIRFLOW_CONN_SOME_CONN='mysql://myname:[email protected]?this_param=some+val&that_param=other+val%2A'
此外,如果你已创建连接,可以使用 airflow connections get
命令。
$ airflow connections get sqlite_default
Id: 40
Connection Id: sqlite_default
Connection Type: sqlite
Host: /tmp/sqlite_default.db
Schema: null
Login: null
Password: null
Port: null
Is Encrypted: false
Is Extra Encrypted: false
Extra: {}
URI: sqlite://%2Ftmp%2Fsqlite_default.db
处理 extra 中的任意 dict¶
某些 JSON 结构无法在不丢失的情况下进行 URL 编码。对于此类 JSON,get_uri
会将整个字符串存储在 URL 查询参数 __extra__
下。
例如
>>> extra_dict = {"my_val": ["list", "of", "values"], "extra": {"nested": {"json": "val"}}}
>>> c = Connection(
... conn_type="scheme",
... host="host/location",
... schema="schema",
... login="user",
... password="password",
... port=1234,
... extra=json.dumps(extra_dict),
... )
>>> uri = c.get_uri()
>>> uri
'scheme://user:password@host%2Flocation:1234/schema?__extra__=%7B%22my_val%22%3A+%5B%22list%22%2C+%22of%22%2C+%22values%22%5D%2C+%22extra%22%3A+%7B%22nested%22%3A+%7B%22json%22%3A+%22val%22%7D%7D%7D'
我们可以验证它返回相同的字典
>>> new_c = Connection(uri=uri)
>>> new_c.extra_dejson == extra_dict
True
但对于仅存储键值对的最常见情况,将使用纯 URL 编码。
你可以这样验证 URI 是否已正确解析
>>> from airflow.models.connection import Connection
>>> c = Connection(uri="my-conn-type://my-login:my-password@my-host:5432/my-schema?param1=val1¶m2=val2")
>>> print(c.login)
my-login
>>> print(c.password)
my-password
处理连接参数中的特殊字符¶
注意
在生成连接时,使用便捷方法 Connection.get_uri
,如 生成连接 URI 一节中所述。本节仅供参考。
手动构建 URI 时,某些字符需要特殊处理。
例如,如果你的密码包含 /
,则会失败
>>> c = Connection(uri="my-conn-type://my-login:my-pa/ssword@my-host:5432/my-schema?param1=val1¶m2=val2")
ValueError: invalid literal for int() with base 10: 'my-pa'
要解决此问题,可以使用 quote_plus()
进行编码
>>> c = Connection(uri="my-conn-type://my-login:my-pa%2Fssword@my-host:5432/my-schema?param1=val1¶m2=val2")
>>> print(c.password)
my-pa/ssword