Airflow Summit 2025 将于 10 月 07-09 日举行。立即注册获取早鸟票!

创建自定义 Operator

Airflow 允许您创建新的 Operator 来满足您或您的团队的需求。这种可扩展性是使 Apache Airflow 功能强大的众多特性之一。

您可以通过继承 airflow.models.baseoperator.BaseOperator 来创建您想要的任何 Operator。

您需要在派生类中重写两个方法:

  • 构造函数 (Constructor) - 定义 Operator 所需的参数。您只需要指定您的 Operator 特有的参数。您可以在 DAG 文件中指定 default_args。有关详细信息,请参阅 默认参数

  • 执行 (Execute) - 当运行器调用 Operator 时执行的代码。该方法包含 Airflow 上下文作为参数,可用于读取配置值。

注意

实现自定义 Operator 时,请勿在 __init__ 方法中执行任何耗时操作。使用 Operator 的每个任务在每个调度器周期都会实例化一次,执行数据库调用会显著减慢调度速度并浪费资源。

让我们在一个新文件 hello_operator.py 中实现一个示例 HelloOperator

from airflow.sdk import BaseOperator


class HelloOperator(BaseOperator):
    def __init__(self, name: str, **kwargs) -> None:
        super().__init__(**kwargs)
        self.name = name

    def execute(self, context):
        message = f"Hello {self.name}"
        print(message)
        return message

注意

为了使导入生效,您应该将文件放置在 PYTHONPATH 环境变量中存在的目录中。Airflow 默认会将 Airflow Home 中的 dags/plugins/config/ 目录添加到 PYTHONPATH。例如,在我们的示例中,文件位于 custom_operator/ 目录中。有关 Python 和 Airflow 如何管理模块的详细信息,请参阅 模块管理

您现在可以按如下方式使用派生的自定义 Operator:

from custom_operator.hello_operator import HelloOperator

with dag:
    hello_task = HelloOperator(task_id="sample-task", name="foo_bar")

您也可以继续使用 plugins 文件夹来存储自定义 Operator。如果您的文件 hello_operator.py 在 plugins 文件夹中,您可以按如下方式导入 Operator:

from hello_operator import HelloOperator

如果 Operator 与外部服务(API、数据库等)通信,最好使用 Hook 来实现通信层。通过这种方式,实现的逻辑可以被其他用户在不同的 Operator 中重用。相比于为每个外部服务使用 CustomServiceBaseOperator,这种方法提供了更好的解耦和集成利用。

另一个考虑因素是临时状态。如果某个操作需要内存中的状态(例如,在 on_kill 方法中用于取消请求的作业 ID),则该状态应保存在 Operator 中,而不是 Hook 中。这样,服务 Hook 就可以完全无状态,并且操作的整个逻辑都在一个地方 - Operator 中。

Hook

Hook 作为接口用于与 DAG 中的外部共享资源进行通信。例如,DAG 中的多个任务可能需要访问 MySQL 数据库。您可以通过 Hook 获取连接并使用它,而不是为每个任务创建连接。Hook 还有助于避免在 DAG 中存储连接认证参数。有关如何创建和管理连接的信息,请参阅 管理连接,有关如何通过 Provider 添加自定义连接类型的详细信息,请参阅 Provider

让我们扩展前面的示例,从 MySQL 获取名称。

class HelloDBOperator(BaseOperator):
    def __init__(self, name: str, mysql_conn_id: str, database: str, **kwargs) -> None:
        super().__init__(**kwargs)
        self.name = name
        self.mysql_conn_id = mysql_conn_id
        self.database = database

    def execute(self, context):
        hook = MySqlHook(mysql_conn_id=self.mysql_conn_id, schema=self.database)
        sql = "select name from user"
        result = hook.get_first(sql)
        message = f"Hello {result['name']}"
        print(message)
        return message

当 Operator 在 Hook 对象上调用查询时,如果连接不存在,则会创建一个新连接。Hook 从 Airflow 后端检索认证参数(如用户名和密码),并将这些参数传递给 airflow.hooks.base.BaseHook.get_connection()。您只应在 execute 方法或从 execute 调用的任何方法中创建 Hook。构造函数在 Airflow 解析 DAG 时会被调用,这经常发生。在那里实例化 Hook 会导致许多不必要的数据库连接。execute 只在 DAG 运行时被调用。

用户界面

Airflow 还允许开发者控制 Operator 在 DAG UI 中的显示方式。重写 ui_color 以更改 Operator 在 UI 中的背景颜色。重写 ui_fgcolor 以更改标签的颜色。重写 custom_operator_name 以将显示的名称更改为类名之外的其他名称。

class HelloOperator(BaseOperator):
    ui_color = "#ff0000"
    ui_fgcolor = "#000000"
    custom_operator_name = "Howdy"
    # ...

模板化

您可以使用 Jinja 模板 来参数化您的 Operator。Airflow 在渲染 Operator 时,会考虑 template_fields 中存在的字段名称进行模板化。

class HelloOperator(BaseOperator):
    template_fields: Sequence[str] = ("name",)

    def __init__(self, name: str, world: str, **kwargs) -> None:
        super().__init__(**kwargs)
        self.name = name
        self.world = world

    def execute(self, context):
        message = f"Hello {self.world} it's {self.name}!"
        print(message)
        return message

您可以按如下方式使用模板:

with dag:
    hello_task = HelloOperator(
        task_id="task_id_1",
        name="{{ task_instance.task_id }}",
        world="Earth",
    )

在此示例中,Jinja 会查找 name 参数,并将 {{ task_instance.task_id }} 替换为 task_id_1

参数还可以包含文件名,例如 bash 脚本或 SQL 文件。您需要在 template_ext 中添加文件的扩展名。如果 template_field 包含一个以 template_ext 中提到的扩展名结尾的字符串,Jinja 会读取文件内容并将模板替换为实际值。请注意,Jinja 替换的是 Operator 属性而不是参数 (args)。

class HelloOperator(BaseOperator):
    template_fields: Sequence[str] = ("guest_name",)
    template_ext = ".sql"

    def __init__(self, name: str, **kwargs) -> None:
        super().__init__(**kwargs)
        self.guest_name = name

在此示例中,template_fields 应该是 ['guest_name'],而不是 ['name']

此外,您还可以提供一个字典 template_fields_renderers,用于定义模板字段的值在 Web UI 中以何种样式呈现。例如:

class MyRequestOperator(BaseOperator):
    template_fields: Sequence[str] = ("request_body",)
    template_fields_renderers = {"request_body": "json"}

    def __init__(self, request_body: str, **kwargs) -> None:
        super().__init__(**kwargs)
        self.request_body = request_body

如果 template_field 本身是一个字典,还可以指定一个点分隔的键路径来提取和适当渲染单个元素。例如:

class MyConfigOperator(BaseOperator):
    template_fields: Sequence[str] = ("configuration",)
    template_fields_renderers = {
        "configuration": "json",
        "configuration.query.sql": "sql",
    }

    def __init__(self, configuration: dict, **kwargs) -> None:
        super().__init__(**kwargs)
        self.configuration = configuration

然后按如下方式使用此模板:

with dag:
    config_task = MyConfigOperator(
        task_id="task_id_1",
        configuration={"query": {"job_id": "123", "sql": "select * from my_table"}},
    )

这将导致 UI 将 configuration 渲染为 json 格式,此外,配置中位于 query.sql 的值将使用 SQL 词法分析器 (lexer) 进行渲染。

../_images/template_field_renderer_path.png

当前可用的词法分析器 (lexers)

  • bash

  • bash_command

  • doc

  • doc_json

  • doc_md

  • doc_rst

  • doc_yaml

  • doc_md

  • hql

  • html

  • jinja

  • json

  • md

  • mysql

  • postgresql

  • powershell

  • py

  • python_callable

  • rst

  • sql

  • tsql

  • yaml

如果您使用不存在的词法分析器 (lexer),则模板字段的值将渲染为美观打印的对象。

限制

为防止误用,在 Operator 的构造函数中定义和分配模板化字段时,必须遵守以下限制(如果存在构造函数,否则请参阅下文):

1. 传递给构造函数的模板化字段的对应参数必须与字段名称完全一致。以下示例无效,因为传递给构造函数的参数与模板化字段不同:

class HelloOperator(BaseOperator):
    template_fields = "foo"

    def __init__(self, foo_id) -> None:  # should be def __init__(self, foo) -> None
        self.foo = foo_id  # should be self.foo = foo

2. 模板化字段的实例成员必须通过直接赋值或通过调用父类构造函数(其中这些字段被定义为 template_fields)并明确分配参数的方式,将其对应参数从构造函数中分配给实例成员。以下示例也无效,因为实例成员 self.foo 完全没有被赋值,尽管它是一个模板化字段:

class HelloOperator(BaseOperator):
    template_fields = ("foo", "bar")

    def __init__(self, foo, bar) -> None:
        self.bar = bar

以下示例也无效,因为 MyHelloOperator 的实例成员 self.foo 是作为传递给其父类构造函数的 kwargs 的一部分被隐式初始化的:

class HelloOperator(BaseOperator):
    template_fields = "foo"

    def __init__(self, foo) -> None:
        self.foo = foo


class MyHelloOperator(HelloOperator):
    template_fields = ("foo", "bar")

    def __init__(self, bar, **kwargs) -> None:  # should be def __init__(self, foo, bar, **kwargs)
        super().__init__(**kwargs)  # should be super().__init__(foo=foo, **kwargs)
        self.bar = bar

3. 不允许在构造函数中对参数应用操作。对值的任何操作都应在 execute() 方法中进行。因此,以下示例无效:

class HelloOperator(BaseOperator):
    template_fields = "foo"

    def __init__(self, foo) -> None:
        self.foo = foo.lower()  # assignment should be only self.foo = foo

当 Operator 继承自某个基础 Operator 且自身未定义构造函数时,上述限制不适用。但是,必须根据这些限制在父类中正确设置模板化字段。

因此,以下示例有效:

class HelloOperator(BaseOperator):
    template_fields = "foo"

    def __init__(self, foo) -> None:
        self.foo = foo


class MyHelloOperator(HelloOperator):
    template_fields = "foo"

上述限制由名为 ‘validate-operators-init’ 的 pre-commit 钩子强制执行。

使用子类化添加模板字段

创建自定义 Operator 的一个常见用例是简单地增强现有的 template_fields。可能存在这样一种情况:您希望使用的 Operator 没有将某些参数定义为模板化字段,但您希望能够以 Jinja 表达式的形式动态传递参数。通过对现有 Operator 进行快速子类化,可以轻松实现这一点。

假设您想使用前面定义的 HelloOperator

class HelloOperator(BaseOperator):
    template_fields: Sequence[str] = ("name",)

    def __init__(self, name: str, world: str, **kwargs) -> None:
        super().__init__(**kwargs)
        self.name = name
        self.world = world

    def execute(self, context):
        message = f"Hello {self.world} it's {self.name}!"
        print(message)
        return message

但是,您希望动态参数化 world 参数。由于 template_fields 属性保证是 Sequence[str] 类型(即字符串列表或元组),您可以轻松地对 HelloOperator 进行子类化,以按需修改 template_fields

class MyHelloOperator(HelloOperator):
    template_fields: Sequence[str] = (*HelloOperator.template_fields, "world")

现在您可以像这样使用 MyHelloOperator

with dag:
    hello_task = MyHelloOperator(
        task_id="task_id_1",
        name="{{ task_instance.task_id }}",
        world="{{ var.value.my_world }}",
    )

在此示例中,world 参数将通过 Jinja 表达式动态设置为名为 “my_world” 的 Airflow Variable 的值。

Sensor

Airflow 提供了一种特殊 Operator 的基本功能,其目的是定期轮询某个状态(例如文件是否存在),直到满足成功条件。

您可以通过继承 airflow.sensors.base.BaseSensorOperator 来创建您想要的任何 Sensor,定义一个 poke 方法来轮询您的外部状态并评估成功条件。

Sensor 有一个强大的特性,称为 'reschedule' 模式,它允许传感器任务被重新调度,而不是在轮询间隔期间阻塞工作节点槽。这在您可以容忍较长的轮询间隔并期望长时间轮询时非常有用。

'reschedule' 模式附带一个注意事项:您的传感器无法在重新调度的执行之间维护内部状态。在这种情况下,您应该使用 airflow.sensors.base.poke_mode_only() 装饰您的传感器。这将告知用户您的传感器不适合与 reschedule 模式一起使用。

一个保持内部状态且无法与 reschedule 模式一起使用的传感器示例是 airflow.providers.google.cloud.sensors.gcs.GCSUploadSessionCompleteSensor。它轮询某个前缀下的对象数量(此数量即为传感器的内部状态),并在对象数量在一定时间内没有变化时成功。

此条目有帮助吗?