管理连接

另请参阅

有关钩子(Hooks)和连接的概述,请参阅 连接与钩子

Airflow 的 Connection 对象用于存储连接外部服务所需的凭据及其他信息。

连接可以通过以下方式定义:

在环境变量中存储连接

Airflow 连接可以在环境变量中定义。

命名约定为 AIRFLOW_CONN_{CONN_ID},全部大写(注意 CONN 前后各有一个下划线)。因此,如果您的连接 ID 是 my_prod_db,则变量名应为 AIRFLOW_CONN_MY_PROD_DB

值可以是 JSON 或 Airflow 的 URI 格式。

JSON 格式示例

在 2.3.0 版本中添加。

如果使用 JSON 序列化

export AIRFLOW_CONN_MY_PROD_DATABASE='{
    "conn_type": "my-conn-type",
    "login": "my-login",
    "password": "my-password",
    "host": "my-host",
    "port": 1234,
    "schema": "my-schema",
    "extra": {
        "param1": "val1",
        "param2": "val2"
    }
}'

生成 JSON 格式的连接表示

2.8.0 版本新增。

为了使连接 JSON 的生成更容易,Connection 类具有一个便捷属性 as_json()。它可以这样使用:

>>> from airflow.sdk import Connection
>>> c = Connection(
...     conn_id="some_conn",
...     conn_type="mysql",
...     description="connection description",
...     host="myhost.com",
...     login="myname",
...     password="mypassword",
...     extra={"this_param": "some val", "that_param": "other val*"},
... )
>>> print(f"AIRFLOW_CONN_{c.conn_id.upper()}='{c.as_json()}'")
AIRFLOW_CONN_SOME_CONN='{"conn_type": "mysql", "description": "connection description", "host": "myhost.com", "login": "myname", "password": "mypassword", "extra": {"this_param": "some val", "that_param": "other val*"}}'

此外,同样的方法也可用于将连接从 URI 格式转换为 JSON 格式。

>>> from airflow.sdk import Connection
>>> c = Connection(
...     conn_id="awesome_conn",
...     description="Example Connection",
...     uri="aws://AKIAIOSFODNN7EXAMPLE:wJalrXUtnFEMI%2FK7MDENG%2FbPxRfiCYEXAMPLEKEY@/?__extra__=%7B%22region_name%22%3A+%22eu-central-1%22%2C+%22config_kwargs%22%3A+%7B%22retries%22%3A+%7B%22mode%22%3A+%22standard%22%2C+%22max_attempts%22%3A+10%7D%7D%7D",
... )
>>> print(f"AIRFLOW_CONN_{c.conn_id.upper()}='{c.as_json()}'")
AIRFLOW_CONN_AWESOME_CONN='{"conn_type": "aws", "description": "Example Connection", "host": "", "login": "AKIAIOSFODNN7EXAMPLE", "password": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", "schema": "", "extra": {"region_name": "eu-central-1", "config_kwargs": {"retries": {"mode": "standard", "max_attempts": 10}}}}'

URI 格式示例

如果使用 Airflow URI 序列化

export AIRFLOW_CONN_MY_PROD_DATABASE='my-conn-type://login:password@host:port/schema?param1=val1&param2=val2'

有关如何生成有效 URI 的更多详细信息,请参阅 连接 URI 格式

在 UI 和 CLI 中的可见性

通过环境变量定义的连接不会显示在 Airflow UI 中,也不会通过 airflow connections list 列出。

这是因为这些连接是在运行时动态解析的,通常在执行您任务的工作进程(worker)上进行。它们不会存储在元数据数据库中,也不会加载到 Web 服务器或调度器的环境中。

这支持安全部署模式,即基于环境的密钥(例如通过 .env 文件、Docker 或 Kubernetes 密钥)仅注入到运行时组件(如工作进程)中,而不是注入到暴露给用户的组件(如 Web 服务器)中。

如果您需要为了可见性或编辑而在 UI 中显示连接,请改用元数据数据库进行定义。

在 Secrets Backend 中存储连接

您可以将 Airflow 连接存储在外部密钥后端(Secrets Backend)中,例如 HashiCorp Vault、AWS SSM Parameter Store 等服务。有关更多详细信息,请参阅 Secrets Backend (密钥后端)

在数据库中存储连接

另请参阅

连接也可以选择存储在 环境变量外部密钥后端(如 HashiCorp Vault、AWS SSM Parameter Store 等)中。

当将连接存储在数据库中时,您可以使用 Web UI 或 Airflow CLI 来管理它们。

使用 UI 创建连接

打开 UI 的 Admin->Connections 部分。点击 Add Connection 链接以创建新连接。

../_images/connection_create.png
  1. Connection Id 字段中填写所需的连接 ID。建议使用小写字符,并用下划线分隔单词。

  2. Connection Type 字段中选择连接类型。

  3. 填写其余字段。有关不同连接类型所属字段的描述,请参阅 处理 extra 中的任意字典

  4. 点击 Save 按钮以创建连接。

使用 UI 编辑连接

打开 UI 的 Admin->Connections 部分。在连接列表中,点击您希望编辑的连接旁边的铅笔图标。

../_images/connection_edit.png

修改连接属性,然后点击 Save 按钮保存您的更改。

从 CLI 创建连接

您可以从 CLI 向数据库添加连接。

您可以使用 JSON 格式添加连接(自 2.3.0 版本起)。

airflow connections add 'my_prod_db' \
    --conn-json '{
        "conn_type": "my-conn-type",
        "login": "my-login",
        "password": "my-password",
        "host": "my-host",
        "port": 1234,
        "schema": "my-schema",
        "extra": {
            "param1": "val1",
            "param2": "val2"
        }
    }'

或者,您可以使用 Airflow 的连接 URI 格式(请参阅 生成连接 URI)。

airflow connections add 'my_prod_db' \
    --conn-uri '<conn-type>://<login>:<password>@<host>:<port>/<schema>?param1=val1&param2=val2&...'

最后,您也可以分别指定每个参数。

airflow connections add 'my_prod_db' \
    --conn-type 'my-conn-type' \
    --conn-login 'login' \
    --conn-password 'password' \
    --conn-host 'host' \
    --conn-port 'port' \
    --conn-schema 'schema' \
    ...

将连接导出到文件

您可以将存储在数据库中的连接导出到文件(例如用于将连接从一个环境迁移到另一个环境)。有关用法,请参阅 导出连接

数据库中连接的安全性

对于存储在 Airflow 元数据数据库中的连接,Airflow 使用 Fernet 对密码和其他潜在敏感数据进行加密。它保证如果没有加密密码,就无法在没有密钥的情况下篡改或读取连接密码。有关配置 Fernet 的信息,请查看 Fernet

测试连接

出于安全考虑,测试连接功能在 Airflow UI、API 和 CLI 中默认是禁用的。

有关用户能力的更多信息,请参阅文档:https://airflow.apache.org/docs/apache-airflow/stable/security/security_model.html#capabilities-of-authenticated-ui-users。强烈建议在确保只有高度可信的 UI/API 用户拥有“编辑连接”权限之前,不要启用此功能。

该功能的可使用性可以通过 Airflow 配置文件 (airflow.cfg) 中 core 部分的 test_connection 标志进行控制。它也可以通过环境变量 AIRFLOW__CORE__TEST_CONNECTION 进行控制。

此配置参数接受以下值:

  • Disabled: 禁用测试连接功能,并禁用 UI 中的“Test Connection”按钮。这也是 Airflow 配置中的默认值。

  • Enabled: 启用测试连接功能,并激活 UI 中的“Test Connection”按钮。

  • Hidden: 禁用测试连接功能,并在 UI 中隐藏“Test Connection”按钮。

启用“Test Connection”后,它可以在 UI 的 创建编辑 连接页面中使用,通过调用 连接 REST API 使用,或者通过运行 airflow connections test CLI 命令 使用。

警告

当使用 Airflow UI 或 REST API 时,此功能对于驻留在外部密钥后端中的连接将不可用。

为了测试连接,Airflow 会调用关联钩子类中的 test_connection 方法并报告结果。可能会出现连接类型没有任何关联钩子,或者钩子没有实现 test_connection 方法的情况;在这两种情况下,都会显示错误消息或禁用该功能(如果您正在 UI 中进行测试)。

注意

在 Airflow UI 中进行测试时,测试将从 Web 服务器执行,因此此功能受为您的 Web 服务器设置的网络出口规则的约束。

注意

如果 Web 服务器和工作机(如果通过 Airflow UI 测试)或机器/Pod(如果通过 Airflow CLI 测试)安装了不同的库或提供程序,则测试结果可能会有所不同。

自定义连接类型

Airflow 允许定义自定义连接类型,包括对连接的添加/编辑表单进行修改。自定义连接类型在社区维护的提供程序(providers)中定义,但您也可以添加一个添加自定义连接类型的自定义提供程序。有关如何添加自定义提供程序的描述,请参阅 提供程序 (Providers)

通过在您的 provider.yaml 中的 connection-types 数组暴露连接类型,您可以通过以下方式自定义 Airflow:

  • 添加自定义连接类型

  • 从连接类型自动创建钩子

  • 添加自定义表单字段,以显示和编辑连接 URL 中的自定义“extra”参数

  • 隐藏您的连接不使用的标准字段

  • 添加展示字段应如何格式化的示例占位符

您可以阅读有关如何添加自定义提供程序的更多详细信息,请参阅 提供程序 (Providers)

自定义连接字段

注意

首选方法:在 provider.yaml 中定义连接 UI 元数据

自 Airflow 3.2 起,定义自定义连接字段和字段行为的首选方式是在 provider.yaml 中声明式定义。这种方法不需要在运行时导入 flask_appbuilderwtforms

Python 钩子方法 get_connection_form_widgets()get_ui_field_behaviour() 将作为后备方案继续工作,并且仅在弃用通知和迁移窗口后才会被移除。使用旧方法编写的自定义提供程序将保持正常工作。但是,新的提供程序应使用下面描述的 YAML 方法。

provider.yaml 中定义连接 UI 元数据

连接表单元数据在您的提供程序的 provider.yaml 文件中的 connection-types 下以声明方式定义。有两个部分:

conn-fields — 存储在 Connection.extra 中的自定义字段

connection-types:
  - hook-class-name: airflow.providers.myservice.hooks.myservice.MyServiceHook
    connection-type: myservice
    conn-fields:
      workspace:
        label: Workspace
        schema:
          type:
            - string
            - 'null'
      project:
        label: Project ID
        schema:
          type:
            - string
            - 'null'

ui-field-behaviour — 对标准连接字段的自定义(隐藏、重新标记、占位符)

connection-types:
  - hook-class-name: airflow.providers.myservice.hooks.myservice.MyServiceHook
    connection-type: myservice
    ui-field-behaviour:
      hidden-fields:
        - port
        - host
        - login
        - schema
      relabeling:
        password: API Token
      placeholders:
        password: your-api-token
        workspace: My workspace gid
        project: My project gid

字段架构类型遵循 JSON Schema 约定。

有关支持的 conn-fields 架构选项的完整参考,请参阅 使用 Params 提供触发器 UI 表单

在 Python 中定义连接 UI 元数据(旧版)

注意

下面的 Python 方法仍然有效,在没有弃用通知的情况下不会被移除。但是,新的提供程序应使用上面描述的 YAML 方法。

可以通过在 Hook 类上实现 get_connection_form_widgets(),在连接添加/编辑视图中添加自定义表单字段。键应该是字段的字符串名称,就像它在 extra 字典中存储的那样。值应该是 wtforms.fields.core.Field 的继承者。

下面是一个示例

@staticmethod
def get_connection_form_widgets() -> dict[str, Any]:
    """Returns connection widgets to add to connection form"""
    from flask_appbuilder.fieldwidgets import BS3TextFieldWidget
    from flask_babel import lazy_gettext
    from wtforms import StringField

    return {
        "workspace": StringField(lazy_gettext("Workspace"), widget=BS3TextFieldWidget()),
        "project": StringField(lazy_gettext("Project"), widget=BS3TextFieldWidget()),
    }

注意

自定义字段不再需要 extra__<conn type>__ 前缀

在 Airflow 2.3 之前,如果您想在 UI 中使用自定义字段,必须给它加上 extra__<conn type>__ 前缀,这就是它的值在 extra 字典中的存储方式。从 2.3 版本开始,您不再需要这样做。

get_ui_field_behaviour() 方法让您可以自定义标准字段的行为。例如,您可以隐藏或重新标记字段(例如,如果它未被使用或已重新利用),并且可以添加占位符文本。

一个示例:

@staticmethod
def get_ui_field_behaviour() -> dict[str, Any]:
    """Returns custom field behaviour"""
    return {
        "hidden_fields": ["port", "host", "login", "schema"],
        "relabeling": {},
        "placeholders": {
            "password": "Asana personal access token",
            "workspace": "My workspace gid",
            "project": "My project gid",
        },
    }

注意

如果您想为名称与标准连接属性(即 login, password, host, scheme, port, extra)冲突的 extra 字段添加表单占位符,则必须给它加上 extra__<conn type>__ 前缀。例如 extra__myservice__password

看看提供程序的示例,了解您可以做什么,例如 JdbcHook

注意

已弃用的 hook-class-names

在 Airflow 2.2.0 之前,提供程序中的连接通过提供程序元数据中的 hook-class-names 数组暴露。然而,这已被证明在工作进程中使用单个钩子时效率低下,并且 hook-class-names 数组现在已被 connection-types 数组取代。在提供程序支持 Airflow 2.2.0 以下版本之前,应同时存在 connection-typeshook-class-names。CI 构建过程中的自动检查将验证这两个数组的一致性。

URI 格式

注意

自 2.3.0 版本起,您可以使用 JSON 来序列化连接。请参阅 示例

出于历史原因,Airflow 有一种特殊的 URI 格式,可用于将 Connection 对象序列化为字符串值。

通常,Airflow 的 URI 格式如下所示:

my-conn-type://my-login:my-password@my-host:5432/my-schema?param1=val1&param2=val2

上述 URI 将产生一个等效于以下内容的 Connection 对象:

Connection(
    conn_id="",
    conn_type="my_conn_type",
    description=None,
    login="my-login",
    password="my-password",
    host="my-host",
    port=5432,
    schema="my-schema",
    extra=json.dumps(dict(param1="val1", param2="val2")),
)

生成连接 URI

为了使连接 URI 的生成更容易,Connection 类具有一个便捷方法 get_uri()。它可以这样使用:

>>> import json
>>> from airflow.sdk import Connection
>>> c = Connection(
...     conn_id="some_conn",
...     conn_type="mysql",
...     description="connection description",
...     host="myhost.com",
...     login="myname",
...     password="mypassword",
...     extra=json.dumps(dict(this_param="some val", that_param="other val*")),
... )
>>> print(f"AIRFLOW_CONN_{c.conn_id.upper()}='{c.get_uri()}'")
AIRFLOW_CONN_SOME_CONN='mysql://myname:mypassword@myhost.com?this_param=some+val&that_param=other+val%2A'

注意

get_uri() 方法以 Airflow 格式返回连接 URI,而不是 SQLAlchemy 兼容的 URI。如果您需要用于数据库连接的 SQLAlchemy 兼容 URI,请改用 sqlalchemy_url 属性。

此外,如果您已经创建了一个连接,则可以使用 airflow connections get 命令。

$ airflow connections get sqlite_default
Id: 40
Connection Id: sqlite_default
Connection Type: sqlite
Host: /tmp/sqlite_default.db
Schema: null
Login: null
Password: null
Port: null
Is Encrypted: false
Is Extra Encrypted: false
Extra: {}
URI: sqlite://%2Ftmp%2Fsqlite_default.db

处理 extra 中的任意字典

某些 JSON 结构在不丢失数据的情况下无法进行 URL 编码。对于此类 JSON,get_uri 将把整个字符串存储在 URL 查询参数 __extra__ 下。

例如

>>> extra_dict = {"my_val": ["list", "of", "values"], "extra": {"nested": {"json": "val"}}}
>>> c = Connection(
...     conn_type="scheme",
...     host="host/location",
...     schema="schema",
...     login="user",
...     password="password",
...     port=1234,
...     extra=json.dumps(extra_dict),
... )
>>> uri = c.get_uri()
>>> uri
'scheme://user:password@host%2Flocation:1234/schema?__extra__=%7B%22my_val%22%3A+%5B%22list%22%2C+%22of%22%2C+%22values%22%5D%2C+%22extra%22%3A+%7B%22nested%22%3A+%7B%22json%22%3A+%22val%22%7D%7D%7D'

我们可以验证它返回的是相同的字典:

>>> new_c = Connection(uri=uri)
>>> new_c.extra_dejson == extra_dict
True

但对于只存储键值对的最常见情况,使用普通的 URL 编码。

您可以这样验证 URI 是否被正确解析:

>>> from airflow.sdk import Connection

>>> c = Connection(uri="my-conn-type://my-login:my-password@my-host:5432/my-schema?param1=val1&param2=val2")
>>> print(c.login)
my-login
>>> print(c.password)
my-password

处理连接参数中的特殊字符

注意

在生成连接时,请使用 生成连接 URI 部分所述的便捷方法 Connection.get_uri。本节仅供参考。

手动构建 URI 时,某些字符需要特殊处理。

例如,如果您的密码包含 /,这将失败:

>>> c = Connection(uri="my-conn-type://my-login:my-pa/ssword@my-host:5432/my-schema?param1=val1&param2=val2")
ValueError: invalid literal for int() with base 10: 'my-pa'

要修复此问题,您可以使用 quote_plus() 进行编码:

>>> c = Connection(uri="my-conn-type://my-login:my-pa%2Fssword@my-host:5432/my-schema?param1=val1&param2=val2")
>>> print(c.password)
my-pa/ssword

此条目是否有帮助?