插件

Airflow 有一个简单的内置插件管理器,只需将文件放入 $AIRFLOW_HOME/plugins 文件夹,即可将外部功能集成到其核心。

plugins 文件夹中的 Python 模块会被导入,并且 和 Web 视图 会集成到 Airflow 的主集合中,并可供使用。

要排查插件问题,可以使用 airflow plugins 命令。此命令会转储有关已加载插件的信息。

在 2.0 版本中更改: 不再支持通过 airflow.{operators,sensors,hooks}.<plugin_name> 在插件中导入操作符、传感器、钩子,这些扩展应该像普通的 Python 模块一样导入。有关更多信息,请参阅:模块管理创建自定义操作符

用途?

Airflow 提供了一个通用的数据处理工具箱。不同的组织有不同的堆栈和不同的需求。使用 Airflow 插件可以成为公司自定义 Airflow 安装以反映其生态系统的一种方式。

插件可以用作编写、共享和激活新功能集的简便方法。

还需要一组更复杂的应用程序来与不同风格的数据和元数据进行交互。

示例

  • 一套用于解析 Hive 日志并公开 Hive 元数据(CPU / IO / 阶段/倾斜/...)的工具

  • 一个异常检测框架,允许人们收集指标、设置阈值和警报

  • 一个审计工具,帮助了解谁访问了什么

  • 一个配置驱动的 SLA 监控工具,允许您设置受监控的表以及它们应该在什么时间落地,向人们发出警报,并公开中断的可视化

为什么要基于 Airflow 构建?

Airflow 有许多组件可以在构建应用程序时重用

  • 一个 Web 服务器,可用于渲染您的视图

  • 一个元数据数据库,用于存储您的模型

  • 访问您的数据库以及如何连接到它们的知识

  • 一组工作人员,您的应用程序可以将工作负载推送到他们

  • Airflow 已部署,您可以直接在其部署物流上进行搭载

  • 基本图表功能、底层库和抽象

插件何时(重新)加载?

默认情况下,插件是延迟加载的,一旦加载,它们将永远不会重新加载(除了 UI 插件在 Web 服务器中自动加载)。要在每个 Airflow 进程启动时加载它们,请在 airflow.cfg 中设置 [core] lazy_load_plugins = False

这意味着如果您对插件进行任何更改,并且您希望 Web 服务器或调度程序使用该新代码,则需要重新启动这些进程。但是,在调度程序启动后,它不会反映在新的正在运行的任务中。

默认情况下,任务执行使用 fork。这避免了创建新的 Python 解释器并重新解析 Airflow 的所有代码和启动例程所带来的速度下降。这种方法提供了显著的优势,特别是对于较短的任务。这确实意味着,如果您在任务中使用插件,并且希望它们更新,则需要重新启动工作进程(如果使用 CeleryExecutor)或调度程序(本地或顺序执行器)。另一种选择是您可以接受启动时的速度损失,将 core.execute_tasks_new_python_interpreter 配置设置为 True,这将导致为任务启动一个全新的 Python 解释器。

(另一方面,仅由 DAG 文件导入的模块不会遇到此问题,因为 DAG 文件不会在任何长时间运行的 Airflow 进程中加载/解析。)

接口

要创建插件,您需要派生 airflow.plugins_manager.AirflowPlugin 类,并引用您要插入 Airflow 的对象。这是您需要派生的类的样子

class AirflowPlugin:
    # The name of your plugin (str)
    name = None
    # A list of class(es) derived from BaseHook
    hooks = []
    # A list of references to inject into the macros namespace
    macros = []
    # A list of Blueprint object created from flask.Blueprint. For use with the flask_appbuilder based GUI
    flask_blueprints = []
    # A list of dictionaries containing FlaskAppBuilder BaseView object and some metadata. See example below
    appbuilder_views = []
    # A list of dictionaries containing kwargs for FlaskAppBuilder add_link. See example below
    appbuilder_menu_items = []

    # A callback to perform actions when airflow starts and the plugin is loaded.
    # NOTE: Ensure your plugin has *args, and **kwargs in the method definition
    #   to protect against extra parameters injected into the on_load(...)
    #   function in future changes
    def on_load(*args, **kwargs):
        # ... perform Plugin boot actions
        pass

    # A list of global operator extra links that can redirect users to
    # external systems. These extra links will be available on the
    # task page in the form of buttons.
    #
    # Note: the global operator extra link can be overridden at each
    # operator level.
    global_operator_extra_links = []

    # A list of operator extra links to override or add operator links
    # to existing Airflow Operators.
    # These extra links will be available on the task page in form of
    # buttons.
    operator_extra_links = []

    # A list of timetable classes to register so they can be used in DAGs.
    timetables = []

    # A list of Listeners that plugin provides. Listeners can register to
    # listen to particular events that happen in Airflow, like
    # TaskInstance state changes. Listeners are python modules.
    listeners = []

您可以通过继承来派生它(请参考下面的示例)。在该示例中,所有选项都已定义为类属性,但如果您需要执行额外的初始化,也可以将它们定义为属性。请注意,必须在此类中指定 name

确保在更改插件后重新启动 Web 服务器和调度程序,以便它们生效。

示例

下面的代码定义了一个插件,该插件在 Airflow 中注入了一组说明性对象定义。

# This is the class you derive to create a plugin
from airflow.plugins_manager import AirflowPlugin
from airflow.security import permissions
from airflow.www.auth import has_access

from flask import Blueprint
from flask_appbuilder import expose, BaseView as AppBuilderBaseView

# Importing base classes that we need to derive
from airflow.hooks.base import BaseHook
from airflow.providers.amazon.aws.transfers.gcs_to_s3 import GCSToS3Operator


# Will show up in Connections screen in a future version
class PluginHook(BaseHook):
    pass


# Will show up under airflow.macros.test_plugin.plugin_macro
# and in templates through {{ macros.test_plugin.plugin_macro }}
def plugin_macro():
    pass


# Creating a flask blueprint to integrate the templates and static folder
bp = Blueprint(
    "test_plugin",
    __name__,
    template_folder="templates",  # registers airflow/plugins/templates as a Jinja template folder
    static_folder="static",
    static_url_path="/static/test_plugin",
)


# Creating a flask appbuilder BaseView
class TestAppBuilderBaseView(AppBuilderBaseView):
    default_view = "test"

    @expose("/")
    @has_access(
        [
            (permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE),
        ]
    )
    def test(self):
        return self.render_template("test_plugin/test.html", content="Hello galaxy!")


# Creating a flask appbuilder BaseView
class TestAppBuilderBaseNoMenuView(AppBuilderBaseView):
    default_view = "test"

    @expose("/")
    @has_access(
        [
            (permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE),
        ]
    )
    def test(self):
        return self.render_template("test_plugin/test.html", content="Hello galaxy!")


v_appbuilder_view = TestAppBuilderBaseView()
v_appbuilder_package = {
    "name": "Test View",
    "category": "Test Plugin",
    "view": v_appbuilder_view,
}

v_appbuilder_nomenu_view = TestAppBuilderBaseNoMenuView()
v_appbuilder_nomenu_package = {"view": v_appbuilder_nomenu_view}

# Creating flask appbuilder Menu Items
appbuilder_mitem = {
    "name": "Google",
    "href": "https://www.google.com",
    "category": "Search",
}
appbuilder_mitem_toplevel = {
    "name": "Apache",
    "href": "https://apache.org/",
}


# Defining the plugin class
class AirflowTestPlugin(AirflowPlugin):
    name = "test_plugin"
    hooks = [PluginHook]
    macros = [plugin_macro]
    flask_blueprints = [bp]
    appbuilder_views = [v_appbuilder_package, v_appbuilder_nomenu_package]
    appbuilder_menu_items = [appbuilder_mitem, appbuilder_mitem_toplevel]

从 CSRF 保护中排除视图

我们强烈建议您使用 CSRF 保护所有视图。但如果需要,您可以使用装饰器排除某些视图。

from airflow.www.app import csrf


@csrf.exempt
def my_handler():
    # ...
    return "ok"

作为 Python 包的插件

可以通过 setuptools entrypoint 机制加载插件。为此,请使用包中的入口点链接您的插件。如果安装了该包,Airflow 将自动从入口点列表加载已注册的插件。

注意

入口点名称(例如,my_plugin)和插件类的名称都不会影响插件本身的模块和类名。

# my_package/my_plugin.py
from airflow.plugins_manager import AirflowPlugin
from flask import Blueprint

# Creating a flask blueprint to integrate the templates and static folder
bp = Blueprint(
    "test_plugin",
    __name__,
    template_folder="templates",  # registers airflow/plugins/templates as a Jinja template folder
    static_folder="static",
    static_url_path="/static/test_plugin",
)


class MyAirflowPlugin(AirflowPlugin):
    name = "my_namespace"
    flask_blueprints = [bp]

然后在 pyproject.toml 中

[project.entry-points."airflow.plugins"]
my_plugin = "my_package.my_plugin:MyAirflowPlugin"

自动重载 Web 服务器

要启用 Web 服务器的自动重载,当检测到插件目录中的更改时,您应该将 [webserver] 部分中的 reload_on_plugin_change 选项设置为 True

注意

有关设置配置的更多信息,请参阅 设置配置选项

注意

有关 Python 和 Airflow 如何管理模块的详细信息,请参阅 模块管理

故障排除

您可以使用 Flask CLI 来排除故障。要运行此命令,您需要将变量 FLASK_APP 设置为 airflow.www.app:create_app

例如,要打印所有路由,请运行

FLASK_APP=airflow.www.app:create_app flask routes

此条目是否有帮助?