插件

Airflow 内置了一个简单的插件管理器,只需将文件放到 $AIRFLOW_HOME/plugins 文件夹中,即可将外部功能集成到其核心。

plugins 文件夹中的 Python 模块会被导入,并且和 Web 视图会被集成到 Airflow 的主集合中并可供使用。

要对插件问题进行故障排除,可以使用 airflow plugins 命令。此命令会转储有关已加载插件的信息。

在 2.0 版更改: 不再支持通过 airflow.{operators,sensors,hooks}.<plugin_name> 导入插件中添加的操作符、传感器、钩子,这些扩展应该作为常规 Python 模块导入。有关更多信息,请参阅:模块管理创建自定义操作符

用途

Airflow 提供了一个用于处理数据的通用工具箱。不同的组织有不同的技术栈和不同的需求。使用 Airflow 插件可以让公司自定义其 Airflow 安装,以反映其生态系统。

插件可以作为一种简单的方式来编写、共享和激活新的功能集。

还需要一组更复杂的应用程序来与不同类型的数据和元数据进行交互。

示例

  • 一组用于解析 Hive 日志并公开 Hive 元数据(CPU /IO / 阶段/ 倾斜 /…)的工具

  • 一个异常检测框架,允许人们收集指标、设置阈值和警报

  • 一个审计工具,帮助了解谁访问了什么

  • 一个配置驱动的 SLA 监控工具,允许您设置监控的表以及它们应该在什么时候到达,提醒人们,并公开中断的可视化

为什么要基于 Airflow 构建?

Airflow 有许多组件可以在构建应用程序时重复使用

  • 一个可用于呈现视图的 Web 服务器

  • 一个用于存储模型的元数据数据库

  • 访问您的数据库,以及如何连接到它们的知识

  • 一个您的应用程序可以将工作负载推送到其中的工作器数组

  • Airflow 已部署,您可以直接利用其部署逻辑

  • 基本的图表功能、底层库和抽象

插件何时(重新)加载?

默认情况下,插件是延迟加载的,一旦加载,它们就永远不会重新加载(除了 UI 插件在 Web 服务器中自动加载)。要在每个 Airflow 进程启动时加载它们,请在 airflow.cfg 中设置 [core] lazy_load_plugins = False

这意味着,如果您对插件进行了任何更改,并且希望 Web 服务器或调度程序使用该新代码,则需要重新启动这些进程。但是,在调度程序启动后,这不会反映在新的正在运行的任务中。

默认情况下,任务执行使用 fork。这避免了创建新的 Python 解释器并重新解析所有 Airflow 代码和启动例程所带来的速度下降。这种方法提供了显著的好处,特别是对于较短的任务。但这确实意味着,如果您在任务中使用插件,并且希望它们更新,则需要重新启动工作器(如果使用 CeleryExecutor)或调度程序(本地或顺序执行器)。另一种选择是,您可以接受启动时的速度损失,将 core.execute_tasks_new_python_interpreter 配置设置设置为 True,从而为任务启动一个全新的 Python 解释器。

(另一方面,仅由 DAG 文件导入的模块不会遇到这个问题,因为 DAG 文件不会在任何长时间运行的 Airflow 进程中加载/解析。)

接口

要创建插件,您需要派生 airflow.plugins_manager.AirflowPlugin 类并引用要插入 Airflow 的对象。以下是您需要派生的类的外观

class AirflowPlugin:
    # The name of your plugin (str)
    name = None
    # A list of class(es) derived from BaseHook
    hooks = []
    # A list of references to inject into the macros namespace
    macros = []
    # A list of Blueprint object created from flask.Blueprint. For use with the flask_appbuilder based GUI
    flask_blueprints = []
    # A list of dictionaries containing FlaskAppBuilder BaseView object and some metadata. See example below
    appbuilder_views = []
    # A list of dictionaries containing kwargs for FlaskAppBuilder add_link. See example below
    appbuilder_menu_items = []

    # A callback to perform actions when airflow starts and the plugin is loaded.
    # NOTE: Ensure your plugin has *args, and **kwargs in the method definition
    #   to protect against extra parameters injected into the on_load(...)
    #   function in future changes
    def on_load(*args, **kwargs):
        # ... perform Plugin boot actions
        pass

    # A list of global operator extra links that can redirect users to
    # external systems. These extra links will be available on the
    # task page in the form of buttons.
    #
    # Note: the global operator extra link can be overridden at each
    # operator level.
    global_operator_extra_links = []

    # A list of operator extra links to override or add operator links
    # to existing Airflow Operators.
    # These extra links will be available on the task page in form of
    # buttons.
    operator_extra_links = []

    # A list of timetable classes to register so they can be used in DAGs.
    timetables = []

    # A list of Listeners that plugin provides. Listeners can register to
    # listen to particular events that happen in Airflow, like
    # TaskInstance state changes. Listeners are python modules.
    listeners = []

您可以通过继承来派生它(请参阅下面的示例)。在示例中,所有选项都已定义为类属性,但如果您需要执行其他初始化,也可以将它们定义为属性。请注意,此类中的 name 必须指定。

确保在对插件进行更改后重新启动 Web 服务器和调度程序,以便使更改生效。

示例

下面的代码定义了一个插件,该插件在 Airflow 中注入了一组说明性对象定义。

# This is the class you derive to create a plugin
from airflow.plugins_manager import AirflowPlugin
from airflow.security import permissions
from airflow.www.auth import has_access

from flask import Blueprint
from flask_appbuilder import expose, BaseView as AppBuilderBaseView

# Importing base classes that we need to derive
from airflow.hooks.base import BaseHook
from airflow.providers.amazon.aws.transfers.gcs_to_s3 import GCSToS3Operator


# Will show up in Connections screen in a future version
class PluginHook(BaseHook):
    pass


# Will show up under airflow.macros.test_plugin.plugin_macro
# and in templates through {{ macros.test_plugin.plugin_macro }}
def plugin_macro():
    pass


# Creating a flask blueprint to integrate the templates and static folder
bp = Blueprint(
    "test_plugin",
    __name__,
    template_folder="templates",  # registers airflow/plugins/templates as a Jinja template folder
    static_folder="static",
    static_url_path="/static/test_plugin",
)


# Creating a flask appbuilder BaseView
class TestAppBuilderBaseView(AppBuilderBaseView):
    default_view = "test"

    @expose("/")
    @has_access(
        [
            (permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE),
        ]
    )
    def test(self):
        return self.render_template("test_plugin/test.html", content="Hello galaxy!")


# Creating a flask appbuilder BaseView
class TestAppBuilderBaseNoMenuView(AppBuilderBaseView):
    default_view = "test"

    @expose("/")
    @has_access(
        [
            (permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE),
        ]
    )
    def test(self):
        return self.render_template("test_plugin/test.html", content="Hello galaxy!")


v_appbuilder_view = TestAppBuilderBaseView()
v_appbuilder_package = {
    "name": "Test View",
    "category": "Test Plugin",
    "view": v_appbuilder_view,
}

v_appbuilder_nomenu_view = TestAppBuilderBaseNoMenuView()
v_appbuilder_nomenu_package = {"view": v_appbuilder_nomenu_view}

# Creating flask appbuilder Menu Items
appbuilder_mitem = {
    "name": "Google",
    "href": "https://www.google.com",
    "category": "Search",
}
appbuilder_mitem_toplevel = {
    "name": "Apache",
    "href": "https://www.apache.org/",
}


# Defining the plugin class
class AirflowTestPlugin(AirflowPlugin):
    name = "test_plugin"
    hooks = [PluginHook]
    macros = [plugin_macro]
    flask_blueprints = [bp]
    appbuilder_views = [v_appbuilder_package, v_appbuilder_nomenu_package]
    appbuilder_menu_items = [appbuilder_mitem, appbuilder_mitem_toplevel]

从 CSRF 保护中排除视图

我们强烈建议您使用 CSRF 保护所有视图。但如果需要,您可以使用装饰器排除某些视图。

from airflow.www.app import csrf


@csrf.exempt
def my_handler():
    # ...
    return "ok"

作为 Python 包的插件

可以通过 setuptools 入口点 机制加载插件。为此,请使用包中的入口点链接您的插件。如果安装了该包,Airflow 将自动从入口点列表中加载注册的插件。

注意

入口点名称(例如,my_plugin)和插件类的名称都不会影响插件本身的模块和类名。

# my_package/my_plugin.py
from airflow.plugins_manager import AirflowPlugin
from flask import Blueprint

# Creating a flask blueprint to integrate the templates and static folder
bp = Blueprint(
    "test_plugin",
    __name__,
    template_folder="templates",  # registers airflow/plugins/templates as a Jinja template folder
    static_folder="static",
    static_url_path="/static/test_plugin",
)


class MyAirflowPlugin(AirflowPlugin):
    name = "my_namespace"
    flask_blueprints = [bp]

然后在 pyproject.toml 中

[project.entry-points."airflow.plugins"]
my_plugin = "my_package.my_plugin:MyAirflowPlugin"

自动重新加载 Web 服务器

要在检测到插件目录中的更改时启用 Web 服务器的自动重新加载,您应该在 [webserver] 部分中将 reload_on_plugin_change 选项设置为 True

注意

有关设置配置的更多信息,请参阅 设置配置选项

注意

有关 Python 和 Airflow 如何管理模块的详细信息,请参阅 模块管理

故障排除

您可以使用 Flask CLI 对问题进行故障排除。要运行此程序,您需要将变量 FLASK_APP 设置为 airflow.www.app:create_app

例如,要打印所有路由,请运行

FLASK_APP=airflow.www.app:create_app flask routes

此条目有帮助吗?