如何创建您自己的 Provider¶
自定义 Provider 包¶
您可以开发和发布您自己的 Providers。您的自定义 operators、hooks、sensors、transfer operators 可以打包到一个标准的 airflow 包中,并使用相同的机制安装。此外,它们还可以使用相同的机制来扩展 Airflow 核心,包括身份验证后端、自定义连接、日志记录、密钥后端和额外的 operator 链接,如上一章所述。
如Providers文档中所述,自定义 Providers 可以扩展 Airflow 核心 - 它们可以向 operators 添加额外的链接以及自定义连接。如果您想使用该机制来管理您自己的自定义 Provider,您可以构建自己的 Providers 并将其作为包安装。
如何创建 Provider¶
向 Airflow 添加 Provider 只是构建一个 Python 包并将正确的元数据添加到包中。我们使用标准的 Python 机制来定义入口点。您的包需要定义适当的入口点 apache_airflow_provider
,它必须指向由您的包实现的可调用对象,并返回一个字典,其中包含您的包的可发现功能列表。该字典必须遵循json-schema 规范。
大多数 schema 都为文档提供了扩展点(您可能也想将其用于自己的目的),但从可扩展性的角度来看,重要的字段如下:
在 CLI/API 中显示包信息
package-name
- Provider 的包名称。name
- Provider 的用户友好名称。description
- Provider 的附加描述。version
- 包的版本列表(按时间倒序排列)。列表中的第一个版本是当前包版本。它取自已安装的包的版本,而不是来自 provider_info 信息。
向 Airflow 核心公开自定义功能
extra-links
- 此字段应包含所有添加额外链接功能的 operator 类名称的列表。有关如何向您的 operator 添加额外链接功能,请参阅定义 operator 额外链接。connection-types
- 此字段应包含所有连接类型以及实现这些自定义连接类型的 hook 类名称(提供自定义额外字段和自定义字段行为)。此字段从 Airflow 2.2.0 开始可用,它取代了已弃用的hook-class-names
。有关更多详细信息,请参阅管理连接。secret-backends
- 此字段应包含 Provider 提供的所有密钥后端类名称的列表。有关如何添加,请参阅密钥后端。task-decorators
- 此字段应包含可用装饰器的名称/路径字典列表。有关如何添加自定义装饰器,请参阅创建自定义 @task 装饰器。logging
- 此字段应包含 Provider 提供的所有日志处理程序类名称的列表。有关日志处理程序的描述,请参阅任务日志记录。auth-backends
- 此字段应包含 API/UI 的身份验证后端模块名称。有关身份验证后端的描述,请参阅API。notifications
- 此字段应包含通知类。有关通知的描述,请参阅创建通知器。executors
- 此字段应包含执行器类名称。有关执行器的描述,请参阅执行器。config
- 此字段应包含一个字典,该字典应符合airflow/config_templates/config.yml.schema.json
,其中包含 Provider 贡献的配置。有关设置配置的详细信息,请参阅设置配置选项。filesystems
- 此字段应包含所有文件系统模块名称的列表。有关文件系统的描述,请参阅对象存储。
dataset-uris
- 此字段应包含 URI 方案列表以及实现规范化函数的类名称。有关数据集 URI 的描述,请参阅数据感知调度。
注意
已弃用的值
hook-class-names
(已弃用)- 此字段应包含提供具有自定义额外字段和字段行为的自定义连接类型的所有 hook 类名称的列表。hook-class-names
数组从 Airflow 2.2.0 开始已弃用(出于优化原因),将在 Airflow 3 中删除。如果您的 Provider 面向 Airflow 2.2.0+,则不必包含hook-class-names
数组,如果您还想面向早期版本的 Airflow 2,则应同时包含hook-class-names
和connection-types
数组。有关更多详细信息,请参阅管理连接。
安装 Provider 后,您可以使用 airflow providers
命令查询已安装的 Provider 及其功能。这样,您可以验证您的 Provider 是否被正确识别,以及它们是否正确定义了扩展。有关可用 CLI 子命令的详细信息,请参阅命令行界面和环境变量参考。
当您编写自己的 Provider 时,请考虑遵循Provider 包的命名约定
特殊注意事项¶
可选 Provider 功能¶
在 2.3.0 版本中新增: 此功能在 Airflow 2.3+ 中可用。
某些 Provider 可能会提供可选功能,这些功能仅在安装某些包或库时可用。此类功能通常会导致 ImportErrors
;但是,这些导入错误应该被静默忽略,而不是用虚假的警告污染 Airflow 的日志。虚假警告是一种非常糟糕的模式,因为它们往往会变成盲点,因此建议避免虚假警告。然而,在 Airflow 2.3 之前,Airflow 没有机制来有选择地忽略“已知的” ImportErrors。因此,Airflow 2.1 和 2.2 静默忽略了来自 Provider 的所有 ImportErrors,实际上甚至导致忽略了重要的导入错误 - 而没有向 Airflow 用户提供有关 Provider 依赖项中缺少某些内容的线索。
使用 Providers 进行动态任务映射¶
Airflow 2.3 添加了动态任务映射,并且增加了为每个任务分配唯一键的可能性。这意味着当此类动态映射的任务想要从 XCom 中检索值时(例如,在应该计算额外链接的情况下),它应该始终检查传递的 ti_key 值是否不为 None,然后才使用 XCom.get_value 检索 XCom 值。这允许保持与早期版本 Airflow 的向后兼容性。
为了保持向后兼容性,提供者访问 XCom 值的典型代码应如下所示(注意 if ti_key is not None:
条件)。
def get_link( self, operator: BaseOperator, dttm: datetime | None = None, ti_key: "TaskInstanceKey" | None = None, ): if ti_key is not None: job_ids = XCom.get_value(key="job_id", ti_key=ti_key) else: assert dttm is not None job_ids = XCom.get_one( key="job_id", dag_id=operator.dag.dag_id, task_id=operator.task_id, execution_date=dttm, ) if not job_ids: return None if len(job_ids) < self.index: return None job_id = job_ids[self.index] return BIGQUERY_JOB_DETAILS_LINK_FMT.format(job_id=job_id)
自定义提供程序的常见问题解答¶
当我编写自己的提供程序时,我需要做任何特别的事情才能使其可供其他人使用吗?
除了创建返回格式正确的元数据的 apache_airflow_provider
入口点之外,您无需执行任何特殊操作 - 包含 extra-links
和 connection-types
字段的字典(如果您还针对 2.2.0 之前的 Airflow 版本,则包含已弃用的 hook-class-names
字段)。
任何在其环境中安装了您的 Python 包的 Airflow 运行者都能够将该包用作提供程序包。
我应该专门命名我的提供程序,还是应该在 ``airflow.providers`` 包中创建它?
我们有相当多的(>80 个)由社区管理的提供程序,我们将与 Apache Airflow 一起维护它们。所有这些提供程序都具有明确定义的结构,并遵循我们定义的命名约定,它们都在 airflow.providers
包中。如果您的目的是贡献您的提供程序,那么您应该遵循这些约定并向 Apache Airflow 提交 PR 进行贡献。但是您可以自由使用任何包名称,只要与其他名称没有冲突即可,因此最好选择您“领域”中的包。
我需要做什么才能将一个包变成一个提供程序?
您需要执行以下操作才能将现有的 Python 包变成一个提供程序(请参阅下面的示例)
在
pyproject.toml
文件中添加apache_airflow_provider
入口点 - 这告诉 Airflow 从哪里获取所需的提供程序元数据创建您在第一步中引用的函数作为您包的一部分:此函数返回一个字典,其中包含有关您的提供程序包的所有元数据
如果您希望 Airflow 在提供程序页面中链接到您的提供程序的文档,请确保将“project-url/documentation” 元数据添加到您的包中。这也会在 PyPI 中添加指向您的文档的链接。
请注意,该字典应符合
airflow/provider_info.schema.json
JSON 模式规范。社区管理的提供程序在那里有更多用于构建文档的字段,但运行时信息的要求仅包含架构中定义的几个字段
airflow/provider_info.schema.json
{
"$schema": "https://json-schema.fullstack.org.cn/draft-07/schema#",
"type": "object",
"properties": {
"package-name": {
"description": "Package name available under which the package is available in the PyPI repository.",
"type": "string"
},
"name": {
"description": "Provider name",
"type": "string"
},
"description": {
"description": "Information about the package in RST format",
"type": "string"
},
"hook-class-names": {
"type": "array",
"description": "Hook class names that provide connection types to core (deprecated by connection-types)",
"items": {
"type": "string"
},
"deprecated": {
"description": "The hook-class-names property has been deprecated in favour of connection-types which is more performant version allowing to only import individual Hooks rather than all hooks at once",
"deprecatedVersion": "2.2.0"
}
},
"filesystems": {
"type": "array",
"description": "Filesystem module names",
"items": {
"type": "string"
}
},
"transfers": {
"type": "array",
"items": {
"type": "object",
"properties": {
"how-to-guide": {
"description": "Path to how-to-guide for the transfer. The path must start with '/docs/'",
"type": "string"
},
"source-integration-name": {
"type": "string",
"description": "Integration name. It must have a matching item in the 'integration' section of any provider."
},
"target-integration-name": {
"type": "string",
"description": "Target integration name. It must have a matching item in the 'integration' section of any provider."
},
"python-module": {
"type": "string",
"description": "List of python modules containing the transfers."
}
},
"additionalProperties": false,
"required": [
"source-integration-name",
"target-integration-name",
"python-module"
]
}
},
"triggers": {
"type": "array",
"items": {
"type": "object",
"properties": {
"integration-name": {
"type": "string",
"description": "Integration name. It must have a matching item in the 'integration' section of any provider."
},
"python-modules": {
"description": "List of Python modules containing the triggers.",
"type": "array",
"items": {
"type": "string"
}
}
},
"additionalProperties": false,
"required": [
"integration-name",
"python-modules"
]
}
},
"connection-types": {
"type": "array",
"description": "Map of connection types mapped to hook class names.",
"items": {
"type": "object",
"properties": {
"connection-type": {
"description": "Type of connection defined by the provider",
"type": "string"
},
"hook-class-name": {
"description": "Hook class name that implements the connection type",
"type": "string"
}
},
"required": [
"connection-type",
"hook-class-name"
]
}
},
"extra-links": {
"type": "array",
"description": "Operator class names that provide extra link functionality",
"items": {
"type": "string"
}
},
"secrets-backends": {
"type": "array",
"description": "Secrets Backend class names",
"items": {
"type": "string"
}
},
"logging": {
"type": "array",
"description": "Logging Task Handlers class names",
"items": {
"type": "string"
}
},
"auth-backends": {
"type": "array",
"description": "API Auth Backend module names",
"items": {
"type": "string"
}
},
"auth-managers": {
"type": "array",
"description": "Auth managers module names",
"items": {
"type": "string"
}
},
"notifications": {
"type": "array",
"description": "Notification class names",
"items": {
"type": "string"
}
},
"executors": {
"type": "array",
"description": "Executor class names",
"items": {
"type": "string"
}
},
"config": {
"type": "object",
"additionalProperties": {
"type": "object",
"properties": {
"description": {
"type": [
"string",
"null"
]
},
"options": {
"type": "object",
"additionalProperties": {
"$ref": "#/definitions/option"
}
},
"renamed": {
"type": "object",
"properties": {
"previous_name": {
"type": "string"
},
"version": {
"type": "string"
}
}
}
},
"required": [
"description",
"options"
],
"additionalProperties": false
}
},
"task-decorators": {
"type": "array",
"description": "Apply custom decorators to the TaskFlow API. Can be accessed by users via '@task.<name>'",
"items": {
"name": {
"type": "string"
},
"path": {
"type": "string"
}
}
}
},
"definitions": {
"option": {
"type": "object",
"properties": {
"description": {
"type": [
"string",
"null"
]
},
"version_added": {
"type": [
"string",
"null"
]
},
"type": {
"type": "string",
"enum": [
"string",
"boolean",
"integer",
"float"
]
},
"example": {
"type": [
"string",
"null",
"number"
]
},
"default": {
"type": [
"string",
"null",
"number"
]
},
"sensitive": {
"type": "boolean",
"description": "When true, this option is sensitive and can be specified using AIRFLOW__{section}___{name}__SECRET or AIRFLOW__{section}___{name}_CMD environment variables. See: airflow.configuration.AirflowConfigParser.sensitive_config_values"
}
},
"required": [
"description",
"version_added",
"type",
"example",
"default"
],
"additional_properties": false
}
},
"required": [
"name",
"description"
]
}
pyproject.toml
示例
[project.entry-points."apache_airflow_provider"]
provider_info = "airflow.providers.myproviderpackage.get_provider_info:get_provider_info"
myproviderpackage/get_provider_info.py
示例
def get_provider_info():
return {
"package-name": "my-package-name",
"name": "name",
"description": "a description",
"hook-class-names": [
"myproviderpackage.hooks.source.SourceHook",
],
}
连接 ID 和类型是否有约定?
问得好。很高兴你问了。我们通常遵循连接 ID 的 <NAME>_default
约定,连接类型则遵循 <NAME>
的约定。以下是一些示例
google_cloud_default
ID 和google_cloud_platform
类型aws_default
ID 和aws
类型
您应该遵循这个约定。重要的是,连接类型使用唯一的名称,因此它应该对您的提供程序是唯一的。如果两个提供程序尝试添加具有相同类型的连接,则只会有一个成功。
我可以将自己的提供程序贡献给 Apache Airflow 吗?
答案取决于提供程序。我们在 PROVIDERS.rst 开发人员文档中对此有政策。
我可以在 PyPI 中向 Apache Airflow 用户宣传我自己的提供程序并与他人共享它吗?
当然!我们在我们的网站上有一个 生态系统区域,我们在这里分享非社区管理的 Airflow 扩展和工作。请随意向页面提交 PR,当我们看到这样的提供程序对 Airflow 用户社区有用时,我们将评估并合并它。
我可以为使用我的提供程序收费吗?
这不在我们的控制和管辖范围之内。作为一个 Apache 项目,我们对商业是友好的,并且有很多企业是围绕 Apache Airflow 和许多其他 Apache 项目建立的。作为一个社区,我们免费提供所有软件,这一点永远不会改变。第三方开发人员正在做的事情不受 Apache Airflow 社区的控制。