如何创建您自己的提供程序¶
自定义提供程序包¶
您可以开发和发布您自己的提供程序。您的自定义操作符、钩子、传感器、传输操作符可以打包在一个标准的 Airflow 包中,并使用相同的机制进行安装。此外,它们还可以使用相同的机制来扩展 Airflow 核心,包括身份验证后端、自定义连接、日志记录、密钥后端和额外的操作符链接,如上一章所述。
如提供程序文档中所述,自定义提供程序可以扩展 Airflow 核心 - 它们可以向操作符添加额外的链接以及自定义连接。如果您想将该机制用于您自己的自定义提供程序,您可以构建自己的提供程序并将其作为包安装。
如何创建提供程序¶
向 Airflow 添加提供程序只需构建一个 Python 包并将正确的元数据添加到包中即可。我们使用 Python 的标准机制来定义入口点。您的包需要定义适当的入口点 apache_airflow_provider
,该入口点必须指向由您的包实现的可调用对象,并返回一个包含包可发现功能列表的字典。该字典必须遵循json-schema 规范。
该模式的大部分内容都为文档提供了扩展点(您可能也希望将其用于您自己的目的),但从可扩展性的角度来看,重要的字段是以下这些
在 CLI/API 中显示包信息
package-name
- 提供程序的包名称。name
- 提供程序的人类可读名称。description
- 提供程序的附加描述。version
- 包版本列表(按时间倒序排列)。列表中的第一个版本是当前的包版本。它取自已安装包的版本,而不是 provider_info 信息。
向 Airflow 核心公开自定义功能
extra-links
- 此字段应包含所有添加额外链接功能的操作符类名称的列表。有关如何向您的操作符添加额外链接功能的说明,请参阅定义操作符额外链接。connection-types
- 此字段应包含所有连接类型的列表,以及实现这些自定义连接类型的钩子类名称(提供自定义的额外字段和自定义字段行为)。此字段从 Airflow 2.2.0 开始可用,它取代了已弃用的hook-class-names
。有关更多详细信息,请参阅管理连接。secret-backends
- 此字段应包含提供程序提供的所有密钥后端类名称的列表。有关如何添加的说明,请参阅密钥后端。task-decorators
- 此字段应包含名称/路径字典的列表,其中包含可用的装饰器。有关如何添加自定义装饰器的说明,请参阅创建自定义 @task 装饰器。logging
- 此字段应包含提供程序提供的所有日志处理程序类名称的列表。有关日志处理程序的说明,请参阅任务日志记录。auth-backends
- 此字段应包含 API/UI 的身份验证后端模块名称。有关身份验证后端的说明,请参阅API。notifications
- 此字段应包含通知类。有关通知的说明,请参阅创建通知程序。executors
- 此字段应包含执行器类名称。有关执行器的说明,请参阅执行器。config
- 此字段应包含符合airflow/config_templates/config.yml.schema.json
的字典,其中包含提供程序提供的配置。有关设置配置的详细信息,请参阅设置配置选项。
注意
已弃用的值
hook-class-names
(已弃用)- 此字段应包含所有钩子类名称的列表,这些钩子类提供具有自定义额外字段和字段行为的自定义连接类型。从 Airflow 2.2.0 开始,hook-class-names
数组已被弃用(出于优化原因),并将在 Airflow 3 中删除。如果您的提供程序的目标是 Airflow 2.2.0+,则您不必包含hook-class-names
数组,如果您还想针对 Airflow 2 的早期版本,则应同时包含hook-class-names
和connection-types
数组。有关更多详细信息,请参阅管理连接。
安装提供程序后,您可以使用 airflow providers
命令查询已安装的提供程序及其功能。这样,您可以验证您的提供程序是否被正确识别,以及它们是否正确定义了扩展。有关可用 CLI 子命令的详细信息,请参阅命令行界面和环境变量参考。
编写自己的提供程序时,请考虑遵循提供程序包的命名约定
特别注意事项¶
可选的提供程序功能¶
2.3.0 版中的新功能: 此功能在 Airflow 2.3+ 中可用。
某些提供程序可能会提供可选功能,这些功能仅在安装了某些软件包或库时才可用。此类功能通常会导致 ImportErrors
;但是,应静默忽略这些导入错误,而不是用错误警告污染 Airflow 的日志。错误警告是一种非常糟糕的模式,因为它们往往会变成盲点,因此鼓励避免错误警告。但是,在 Airflow 2.3 之前,Airflow 没有机制可以选择性地忽略“已知”的 ImportErrors。因此,Airflow 2.1 和 2.2 静默忽略了来自提供程序的所有 ImportErrors,这实际上导致甚至忽略了重要的导入错误 - 却没有向 Airflow 用户提示提供程序依赖项中缺少某些内容。
将提供程序与动态任务映射一起使用¶
Airflow 2.3 增加了动态任务映射,它增加了为每个任务分配唯一键的可能性。这意味着,当此类动态映射的任务想要从 XCom 中检索值时(例如,在应该计算额外链接的情况下),它应该始终检查传递的 ti_key 值是否不为 None,然后才使用 XCom.get_value 检索 XCom 值。这允许与早期版本的 Airflow 保持向后兼容性。
想要保持向后兼容性的提供程序中访问 XCom 值的典型代码应类似于以下内容(请注意 if ti_key is not None:
条件)。
def get_link( self, operator: BaseOperator, dttm: datetime | None = None, ti_key: "TaskInstanceKey" | None = None, ): if ti_key is not None: job_ids = XCom.get_value(key="job_id", ti_key=ti_key) else: assert dttm is not None job_ids = XCom.get_one( key="job_id", dag_id=operator.dag.dag_id, task_id=operator.task_id, execution_date=dttm, ) if not job_ids: return None if len(job_ids) < self.index: return None job_id = job_ids[self.index] return BIGQUERY_JOB_DETAILS_LINK_FMT.format(job_id=job_id)
自定义提供程序的常见问题解答¶
当我编写自己的提供程序时,是否需要做些什么特别的事情才能让其他人可以使用它?
除了创建返回正确格式的元数据的 apache_airflow_provider
入口点外,您不需要做任何特别的事情 - 包含 extra-links
和 connection-types
字段的字典(以及已弃用的 hook-class-names
字段,如果您还针对 2.2.0 之前的 Airflow 版本)。
任何在安装了您的 Python 包的环境中运行 Airflow 的人都可以使用该包作为提供程序包。
我应该为我的提供程序指定特定的名称,还是应该在 ``airflow.providers`` 包中创建它?
我们有相当多(>80)个由社区管理的提供程序,我们将与 Apache Airflow 一起维护它们。所有这些提供程序都具有定义良好的结构,并遵循我们定义的命名约定,它们都在 airflow.providers
包中。如果您打算贡献您的提供程序,那么您应该遵循这些约定并向 Apache Airflow 发起 PR 来贡献它。但是,只要不与其他名称冲突,您可以随意使用任何包名称,因此最好选择您“域”中的包。
我需要做什么才能将包转换为提供程序?
您需要执行以下操作才能将现有的 Python 包转换为提供程序(示例如下)
在
pyproject.toml
文件中添加apache_airflow_provider
入口点 - 这告诉 airflow 从哪里获取所需的提供程序元数据创建您在第一步中引用的函数作为包的一部分:此函数返回一个字典,其中包含有关您的提供程序包的所有元数据
如果您希望 Airflow 在提供程序页面中链接到您的提供程序的文档,请确保将“project-url/documentation” 元数据 添加到您的包中。这也会在 PyPI 中添加指向您的文档的链接。
请注意,字典应符合
airflow/provider_info.schema.json
JSON 模式规范。社区管理的提供程序在那里有更多用于构建文档的字段,但运行时信息的要求仅包含架构中定义的几个字段
airflow/provider_info.schema.json
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"package-name": {
"description": "Package name available under which the package is available in the PyPI repository.",
"type": "string"
},
"name": {
"description": "Provider name",
"type": "string"
},
"description": {
"description": "Information about the package in RST format",
"type": "string"
},
"hook-class-names": {
"type": "array",
"description": "Hook class names that provide connection types to core (deprecated by connection-types)",
"items": {
"type": "string"
},
"deprecated": {
"description": "The hook-class-names property has been deprecated in favour of connection-types which is more performant version allowing to only import individual Hooks rather than all hooks at once",
"deprecatedVersion": "2.2.0"
}
},
"filesystems": {
"type": "array",
"description": "Filesystem module names",
"items": {
"type": "string"
}
},
"transfers": {
"type": "array",
"items": {
"type": "object",
"properties": {
"how-to-guide": {
"description": "Path to how-to-guide for the transfer. The path must start with '/docs/'",
"type": "string"
},
"source-integration-name": {
"type": "string",
"description": "Integration name. It must have a matching item in the 'integration' section of any provider."
},
"target-integration-name": {
"type": "string",
"description": "Target integration name. It must have a matching item in the 'integration' section of any provider."
},
"python-module": {
"type": "string",
"description": "List of python modules containing the transfers."
}
},
"additionalProperties": false,
"required": [
"source-integration-name",
"target-integration-name",
"python-module"
]
}
},
"triggers": {
"type": "array",
"items": {
"type": "object",
"properties": {
"integration-name": {
"type": "string",
"description": "Integration name. It must have a matching item in the 'integration' section of any provider."
},
"python-modules": {
"description": "List of Python modules containing the triggers.",
"type": "array",
"items": {
"type": "string"
}
}
},
"additionalProperties": false,
"required": [
"integration-name",
"python-modules"
]
}
},
"connection-types": {
"type": "array",
"description": "Map of connection types mapped to hook class names.",
"items": {
"type": "object",
"properties": {
"connection-type": {
"description": "Type of connection defined by the provider",
"type": "string"
},
"hook-class-name": {
"description": "Hook class name that implements the connection type",
"type": "string"
}
},
"required": [
"connection-type",
"hook-class-name"
]
}
},
"extra-links": {
"type": "array",
"description": "Operator class names that provide extra link functionality",
"items": {
"type": "string"
}
},
"secrets-backends": {
"type": "array",
"description": "Secrets Backend class names",
"items": {
"type": "string"
}
},
"logging": {
"type": "array",
"description": "Logging Task Handlers class names",
"items": {
"type": "string"
}
},
"auth-backends": {
"type": "array",
"description": "API Auth Backend module names",
"items": {
"type": "string"
}
},
"auth-managers": {
"type": "array",
"description": "Auth managers module names",
"items": {
"type": "string"
}
},
"notifications": {
"type": "array",
"description": "Notification class names",
"items": {
"type": "string"
}
},
"executors": {
"type": "array",
"description": "Executor class names",
"items": {
"type": "string"
}
},
"config": {
"type": "object",
"additionalProperties": {
"type": "object",
"properties": {
"description": {
"type": [
"string",
"null"
]
},
"options": {
"type": "object",
"additionalProperties": {
"$ref": "#/definitions/option"
}
},
"renamed": {
"type": "object",
"properties": {
"previous_name": {
"type": "string"
},
"version": {
"type": "string"
}
}
}
},
"required": [
"description",
"options"
],
"additionalProperties": false
}
},
"task-decorators": {
"type": "array",
"description": "Apply custom decorators to the TaskFlow API. Can be accessed by users via '@task.<name>'",
"items": {
"name": {
"type": "string"
},
"path": {
"type": "string"
}
}
}
},
"definitions": {
"option": {
"type": "object",
"properties": {
"description": {
"type": [
"string",
"null"
]
},
"version_added": {
"type": [
"string",
"null"
]
},
"type": {
"type": "string",
"enum": [
"string",
"boolean",
"integer",
"float"
]
},
"example": {
"type": [
"string",
"null",
"number"
]
},
"default": {
"type": [
"string",
"null",
"number"
]
},
"sensitive": {
"type": "boolean",
"description": "When true, this option is sensitive and can be specified using AIRFLOW__{section}___{name}__SECRET or AIRFLOW__{section}___{name}_CMD environment variables. See: airflow.configuration.AirflowConfigParser.sensitive_config_values"
}
},
"required": [
"description",
"version_added",
"type",
"example",
"default"
],
"additional_properties": false
}
},
"required": [
"name",
"description"
]
}
示例 pyproject.toml
[project.entry-points."apache_airflow_provider"]
provider_info = "airflow.providers.myproviderpackage.get_provider_info:get_provider_info"
示例 myproviderpackage/get_provider_info.py
def get_provider_info():
return {
"package-name": "my-package-name",
"name": "name",
"description": "a description",
"hook-class-names": [
"myproviderpackage.hooks.source.SourceHook",
],
}
连接 ID 和类型是否有约定?
很好的问题。很高兴你问。我们通常遵循连接 ID 的约定 <NAME>_default
和连接类型的约定 <NAME>
。几个例子
google_cloud_default
ID 和google_cloud_platform
类型aws_default
ID 和aws
类型
您应该遵循此约定。重要的是,为连接类型使用唯一的名称,因此它对您的提供程序应该是唯一的。如果两个提供程序尝试添加具有相同类型的连接,则只有一个会成功。
我可以将我自己的提供程序贡献给 Apache Airflow 吗?
答案取决于提供程序。我们在 PROVIDERS.rst 开发人员文档中对此有一个策略。
我可以向 Apache Airflow 用户宣传我自己的提供程序并将其作为 PyPI 中的包与他人共享吗?
绝对!我们在我们的网站上有一个 生态系统 区域,我们在其中共享非社区管理的扩展程序并为 Airflow 工作。请随意向页面提交 PR,我们会评估并在我们认为此类提供程序对 Airflow 用户社区有用时合并它。
我可以对我的提供程序的使用收费吗?
这是我们无法控制和控制的范围。作为一个 Apache 项目,我们对商业友好,并且有许多企业围绕 Apache Airflow 和许多其他 Apache 项目构建。作为一个社区,我们免费提供所有软件,这永远不会改变。第三方开发人员的行为不受 Apache Airflow 社区的控制。