Airflow 有一个简单的内置插件管理器,只需将文件放入 $AIRFLOW_HOME/plugins
文件夹中的 Python 模块会被导入,并且 宏 和 Web 视图 会集成到 Airflow 的主集合中,并可供使用。
要排查插件问题,可以使用 airflow plugins
在 2.0 版本中更改: 不再支持通过 airflow.{operators,sensors,hooks}.<plugin_name>
在插件中导入操作符、传感器、钩子,这些扩展应该像普通的 Python 模块一样导入。有关更多信息,请参阅:模块管理 和 创建自定义操作符
Airflow 提供了一个通用的数据处理工具箱。不同的组织有不同的堆栈和不同的需求。使用 Airflow 插件可以成为公司自定义 Airflow 安装以反映其生态系统的一种方式。
一套用于解析 Hive 日志并公开 Hive 元数据(CPU / IO / 阶段/倾斜/...)的工具
一个配置驱动的 SLA 监控工具,允许您设置受监控的表以及它们应该在什么时间落地,向人们发出警报,并公开中断的可视化
为什么要基于 Airflow 构建?¶
Airflow 有许多组件可以在构建应用程序时重用
一个 Web 服务器,可用于渲染您的视图
Airflow 已部署,您可以直接在其部署物流上进行搭载
默认情况下,插件是延迟加载的,一旦加载,它们将永远不会重新加载(除了 UI 插件在 Web 服务器中自动加载)。要在每个 Airflow 进程启动时加载它们,请在 airflow.cfg
中设置 [core] lazy_load_plugins = False
这意味着如果您对插件进行任何更改,并且您希望 Web 服务器或调度程序使用该新代码,则需要重新启动这些进程。但是,在调度程序启动后,它不会反映在新的正在运行的任务中。
默认情况下,任务执行使用 fork。这避免了创建新的 Python 解释器并重新解析 Airflow 的所有代码和启动例程所带来的速度下降。这种方法提供了显著的优势,特别是对于较短的任务。这确实意味着,如果您在任务中使用插件,并且希望它们更新,则需要重新启动工作进程(如果使用 CeleryExecutor)或调度程序(本地或顺序执行器)。另一种选择是您可以接受启动时的速度损失,将 core.execute_tasks_new_python_interpreter
配置设置为 True,这将导致为任务启动一个全新的 Python 解释器。
(另一方面,仅由 DAG 文件导入的模块不会遇到此问题,因为 DAG 文件不会在任何长时间运行的 Airflow 进程中加载/解析。)
要创建插件,您需要派生 airflow.plugins_manager.AirflowPlugin
类,并引用您要插入 Airflow 的对象。这是您需要派生的类的样子
class AirflowPlugin:
# The name of your plugin (str)
name = None
# A list of class(es) derived from BaseHook
hooks = []
# A list of references to inject into the macros namespace
macros = []
# A list of Blueprint object created from flask.Blueprint. For use with the flask_appbuilder based GUI
flask_blueprints = []
# A list of dictionaries containing FlaskAppBuilder BaseView object and some metadata. See example below
appbuilder_views = []
# A list of dictionaries containing kwargs for FlaskAppBuilder add_link. See example below
appbuilder_menu_items = []
# A callback to perform actions when airflow starts and the plugin is loaded.
# NOTE: Ensure your plugin has *args, and **kwargs in the method definition
# to protect against extra parameters injected into the on_load(...)
# function in future changes
def on_load(*args, **kwargs):
# ... perform Plugin boot actions
# A list of global operator extra links that can redirect users to
# external systems. These extra links will be available on the
# task page in the form of buttons.
# Note: the global operator extra link can be overridden at each
# operator level.
global_operator_extra_links = []
# A list of operator extra links to override or add operator links
# to existing Airflow Operators.
# These extra links will be available on the task page in form of
# buttons.
operator_extra_links = []
# A list of timetable classes to register so they can be used in DAGs.
timetables = []
# A list of Listeners that plugin provides. Listeners can register to
# listen to particular events that happen in Airflow, like
# TaskInstance state changes. Listeners are python modules.
listeners = []
您可以通过继承来派生它(请参考下面的示例)。在该示例中,所有选项都已定义为类属性,但如果您需要执行额外的初始化,也可以将它们定义为属性。请注意,必须在此类中指定 name
确保在更改插件后重新启动 Web 服务器和调度程序,以便它们生效。
下面的代码定义了一个插件,该插件在 Airflow 中注入了一组说明性对象定义。
# This is the class you derive to create a plugin
from airflow.plugins_manager import AirflowPlugin
from airflow.security import permissions
from airflow.www.auth import has_access
from flask import Blueprint
from flask_appbuilder import expose, BaseView as AppBuilderBaseView
# Importing base classes that we need to derive
from airflow.hooks.base import BaseHook
from airflow.providers.amazon.aws.transfers.gcs_to_s3 import GCSToS3Operator
# Will show up in Connections screen in a future version
class PluginHook(BaseHook):
# Will show up under airflow.macros.test_plugin.plugin_macro
# and in templates through {{ macros.test_plugin.plugin_macro }}
def plugin_macro():
# Creating a flask blueprint to integrate the templates and static folder
bp = Blueprint(
template_folder="templates", # registers airflow/plugins/templates as a Jinja template folder
# Creating a flask appbuilder BaseView
class TestAppBuilderBaseView(AppBuilderBaseView):
default_view = "test"
(permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE),
def test(self):
return self.render_template("test_plugin/test.html", content="Hello galaxy!")
# Creating a flask appbuilder BaseView
class TestAppBuilderBaseNoMenuView(AppBuilderBaseView):
default_view = "test"
(permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE),
def test(self):
return self.render_template("test_plugin/test.html", content="Hello galaxy!")
v_appbuilder_view = TestAppBuilderBaseView()
v_appbuilder_package = {
"name": "Test View",
"category": "Test Plugin",
"view": v_appbuilder_view,
v_appbuilder_nomenu_view = TestAppBuilderBaseNoMenuView()
v_appbuilder_nomenu_package = {"view": v_appbuilder_nomenu_view}
# Creating flask appbuilder Menu Items
appbuilder_mitem = {
"name": "Google",
"href": "https://www.google.com",
"category": "Search",
appbuilder_mitem_toplevel = {
"name": "Apache",
"href": "https://apache.org/",
# Defining the plugin class
class AirflowTestPlugin(AirflowPlugin):
name = "test_plugin"
hooks = [PluginHook]
macros = [plugin_macro]
flask_blueprints = [bp]
appbuilder_views = [v_appbuilder_package, v_appbuilder_nomenu_package]
appbuilder_menu_items = [appbuilder_mitem, appbuilder_mitem_toplevel]
从 CSRF 保护中排除视图¶
我们强烈建议您使用 CSRF 保护所有视图。但如果需要,您可以使用装饰器排除某些视图。
from airflow.www.app import csrf
def my_handler():
# ...
return "ok"
作为 Python 包的插件¶
可以通过 setuptools entrypoint 机制加载插件。为此,请使用包中的入口点链接您的插件。如果安装了该包,Airflow 将自动从入口点列表加载已注册的插件。
# my_package/my_plugin.py
from airflow.plugins_manager import AirflowPlugin
from flask import Blueprint
# Creating a flask blueprint to integrate the templates and static folder
bp = Blueprint(
template_folder="templates", # registers airflow/plugins/templates as a Jinja template folder
class MyAirflowPlugin(AirflowPlugin):
name = "my_namespace"
flask_blueprints = [bp]
然后在 pyproject.toml 中
my_plugin = "my_package.my_plugin:MyAirflowPlugin"