创建自定义 Operator¶
Airflow 允许您创建新的 operators 以满足您或您的团队的要求。这种可扩展性是使 Apache Airflow 强大的众多功能之一。
您可以通过扩展 airflow.models.baseoperator.BaseOperator
来创建任何您想要的 operator。
您需要在派生类中覆盖两个方法
构造函数 - 定义 operator 所需的参数。您只需要指定特定于您的 operator 的参数。您可以在 DAG 文件中指定
default_args
。有关更多详细信息,请参阅 默认参数。执行 - 当运行器调用 operator 时执行的代码。该方法包含可用于读取配置值的 Airflow 上下文作为参数。
注意
在实现自定义 operators 时,不要在 __init__
方法中进行任何昂贵的操作。这些 operators 将在每个调度周期中为使用它们的每个任务实例化一次,进行数据库调用会显着减慢调度速度并浪费资源。
让我们在新文件 hello_operator.py
中实现一个示例 HelloOperator
from airflow.models.baseoperator import BaseOperator
class HelloOperator(BaseOperator):
def __init__(self, name: str, **kwargs) -> None:
super().__init__(**kwargs)
self.name = name
def execute(self, context):
message = f"Hello {self.name}"
print(message)
return message
注意
为了使导入工作,您应该将该文件放置在 PYTHONPATH
环境变量中存在的目录中。Airflow 默认将 Airflow home 中的 dags/
、plugins/
和 config/
目录添加到 PYTHONPATH
中。例如,在我们的示例中,该文件放置在 custom_operator/
目录中。有关 Python 和 Airflow 如何管理模块的详细信息,请参阅 模块管理。
您现在可以按如下方式使用派生的自定义 operator
from custom_operator.hello_operator import HelloOperator
with dag:
hello_task = HelloOperator(task_id="sample-task", name="foo_bar")
您也可以继续使用您的 plugins 文件夹来存储您的自定义 operators。如果您的 plugins 文件夹中有文件 hello_operator.py
,您可以按如下方式导入 operator
from hello_operator import HelloOperator
如果一个 operator 与外部服务(API、数据库等)通信,那么使用 Hooks 来实现通信层是一个好主意。这样,实现的逻辑可以被其他用户在不同的 operators 中重用。这种方法比为每个外部服务使用 CustomServiceBaseOperator
提供了更好的解耦和添加集成的利用率。
另一个考虑因素是临时状态。如果操作需要内存状态(例如,应该在 on_kill
方法中使用的作业 ID 来取消请求),那么该状态应该保存在 operator 中,而不是在 hook 中。这样,服务 hook 可以是完全无状态的,并且操作的整个逻辑都在一个地方 - 在 operator 中。
Hooks¶
Hooks 充当在 DAG 中与外部共享资源通信的接口。例如,DAG 中的多个任务可能需要访问 MySQL 数据库。您可以从 hook 中检索连接并利用它,而不是为每个任务创建一个连接。Hook 还有助于避免在 DAG 中存储连接身份验证参数。有关如何创建和管理连接的信息,请参阅 管理连接,有关如何通过 providers 添加自定义连接类型的详细信息,请参阅 Provider packages。
让我们扩展之前的示例,从 MySQL 获取名称
class HelloDBOperator(BaseOperator):
def __init__(self, name: str, mysql_conn_id: str, database: str, **kwargs) -> None:
super().__init__(**kwargs)
self.name = name
self.mysql_conn_id = mysql_conn_id
self.database = database
def execute(self, context):
hook = MySqlHook(mysql_conn_id=self.mysql_conn_id, schema=self.database)
sql = "select name from user"
result = hook.get_first(sql)
message = f"Hello {result['name']}"
print(message)
return message
当 operator 在 hook 对象上调用查询时,如果不存在,则会创建一个新连接。hook 从 Airflow 后端检索用户名和密码等身份验证参数,并将这些参数传递给 airflow.hooks.base.BaseHook.get_connection()
。您应该仅在 execute
方法或从 execute
调用的任何方法中创建 hook。每当 Airflow 解析 DAG 时都会频繁调用构造函数。在那里实例化一个 hook 将导致许多不必要的数据库连接。execute
仅在 DAG 运行期间被调用。
用户界面¶
Airflow 还允许开发人员控制 operator 在 DAG UI 中的显示方式。覆盖 ui_color
以更改 UI 中 operator 的背景颜色。覆盖 ui_fgcolor
以更改标签的颜色。覆盖 custom_operator_name
以将显示名称更改为类名以外的名称。
class HelloOperator(BaseOperator):
ui_color = "#ff0000"
ui_fgcolor = "#000000"
custom_operator_name = "Howdy"
# ...
模板¶
您可以使用 Jinja 模板 来参数化您的 operator。Airflow 在呈现 operator 时会考虑 template_fields
中存在的字段名称进行模板化。
class HelloOperator(BaseOperator):
template_fields: Sequence[str] = ("name",)
def __init__(self, name: str, world: str, **kwargs) -> None:
super().__init__(**kwargs)
self.name = name
self.world = world
def execute(self, context):
message = f"Hello {self.world} it's {self.name}!"
print(message)
return message
您可以按如下方式使用模板
with dag:
hello_task = HelloOperator(
task_id="task_id_1",
name="{{ task_instance.task_id }}",
world="Earth",
)
在此示例中,Jinja 查找 name
参数,并将 {{ task_instance.task_id }}
替换为 task_id_1
。
该参数还可以包含文件名,例如 bash 脚本或 SQL 文件。您需要在 template_ext
中添加文件的扩展名。如果 template_field
包含以 template_ext
中提到的扩展名结尾的字符串,则 Jinja 会读取文件的内容并将模板替换为实际值。请注意,Jinja 会替换 operator 的属性,而不是参数。
class HelloOperator(BaseOperator):
template_fields: Sequence[str] = ("guest_name",)
template_ext = ".sql"
def __init__(self, name: str, **kwargs) -> None:
super().__init__(**kwargs)
self.guest_name = name
在示例中,template_fields
应该是 ['guest_name']
而不是 ['name']
此外,您可以提供 template_fields_renderers
字典,该字典定义模板字段中的值在 Web UI 中以何种样式呈现。例如
class MyRequestOperator(BaseOperator):
template_fields: Sequence[str] = ("request_body",)
template_fields_renderers = {"request_body": "json"}
def __init__(self, request_body: str, **kwargs) -> None:
super().__init__(**kwargs)
self.request_body = request_body
如果 template_field
本身是一个字典,也可以指定一个以点分隔的键路径来提取和适当呈现各个元素。例如
class MyConfigOperator(BaseOperator):
template_fields: Sequence[str] = ("configuration",)
template_fields_renderers = {
"configuration": "json",
"configuration.query.sql": "sql",
}
def __init__(self, configuration: dict, **kwargs) -> None:
super().__init__(**kwargs)
self.configuration = configuration
然后按如下方式使用此模板
with dag:
config_task = MyConfigOperator(
task_id="task_id_1",
configuration={"query": {"job_id": "123", "sql": "select * from my_table"}},
)
这将导致 UI 将 configuration
呈现为 json,此外,还会将 query.sql
中包含的值使用 SQL 词法分析器进行呈现。
当前可用的词法分析器
bash
bash_command
doc
doc_json
doc_md
doc_rst
doc_yaml
doc_md
hql
html
jinja
json
md
mysql
postgresql
powershell
py
python_callable
rst
sql
tsql
yaml
如果您使用不存在的词法分析器,则模板字段的值将呈现为经过漂亮打印的对象。
限制¶
为了防止滥用,在操作符的构造函数中定义和分配模板字段时(如果存在构造函数,否则 - 见下文)必须遵守以下限制:
1. 传递给构造函数的模板字段的对应参数必须与字段名称完全相同。以下示例无效,因为传递给构造函数的参数与模板字段不同。
class HelloOperator(BaseOperator):
template_fields = "field_a"
def __init__(field_a_id) -> None: # <- should be def __init__(field_a)-> None
self.field_a = field_a_id # <- should be self.field_a = field_a
2. 模板字段的实例成员必须通过直接赋值或调用父类的构造函数(其中这些字段定义为 template_fields
)并显式赋值参数来分配其对应的构造函数参数。以下示例无效,因为实例成员 self.field_a
根本没有被赋值,尽管它是一个模板字段。
class HelloOperator(BaseOperator):
template_fields = ("field_a", "field_b")
def __init__(field_a, field_b) -> None:
self.field_b = field_b
以下示例也是无效的,因为 MyHelloOperator
的实例成员 self.field_a
是作为传递给其父类构造函数的 kwargs
的一部分隐式初始化的。
class HelloOperator(BaseOperator):
template_fields = "field_a"
def __init__(field_a) -> None:
self.field_a = field_a
class MyHelloOperator(HelloOperator):
template_fields = ("field_a", "field_b")
def __init__(field_b, **kwargs) -> None: # <- should be def __init__(field_a, field_b, **kwargs)
super().__init__(**kwargs) # <- should be super().__init__(field_a=field_a, **kwargs)
self.field_b = field_b
3. 不允许在构造函数中的赋值期间对参数应用操作。对值的任何操作都应在 execute()
方法中应用。因此,以下示例无效。
class HelloOperator(BaseOperator):
template_fields = "field_a"
def __init__(field_a) -> None:
self.field_a = field_a.lower() # <- assignment should be only self.field_a = field_a
当操作符继承自基本操作符并且没有定义自己的构造函数时,以上限制不适用。但是,模板字段必须根据这些限制在父类中正确设置。
因此,以下示例有效。
class HelloOperator(BaseOperator):
template_fields = "field_a"
def __init__(field_a) -> None:
self.field_a = field_a
class MyHelloOperator(HelloOperator):
template_fields = "field_a"
以上限制由一个名为“validate-operators-init”的预提交检查强制执行。
通过子类添加模板字段¶
创建自定义操作符的常见用例是简单地扩充现有的 template_fields
。可能存在这样一种情况:您希望使用的操作符没有将某些参数定义为模板化的,但您希望能够动态地将参数作为 Jinja 表达式传递。通过快速子类化现有操作符可以很容易地实现这一点。
假设您想使用前面定义的 HelloOperator
。
class HelloOperator(BaseOperator):
template_fields: Sequence[str] = ("name",)
def __init__(self, name: str, world: str, **kwargs) -> None:
super().__init__(**kwargs)
self.name = name
self.world = world
def execute(self, context):
message = f"Hello {self.world} it's {self.name}!"
print(message)
return message
但是,您想动态地参数化 world
参数。因为 template_fields
属性保证是 Sequence[str]
类型(即字符串列表或元组),您可以子类化 HelloOperator
以根据需要轻松修改 template_fields
。
class MyHelloOperator(HelloOperator):
template_fields: Sequence[str] = (*HelloOperator.template_fields, "world")
现在您可以像这样使用 MyHelloOperator
。
with dag:
hello_task = MyHelloOperator(
task_id="task_id_1",
name="{{ task_instance.task_id }}",
world="{{ var.value.my_world }}",
)
在这个例子中,world
参数将通过 Jinja 表达式动态设置为名为“my_world”的 Airflow 变量的值。
传感器¶
Airflow 提供了一种特殊类型操作符的原语,其目的是定期轮询某种状态(例如,文件的存在),直到满足成功标准。
您可以通过扩展 airflow.sensors.base.BaseSensorOperator
来创建您想要的任何传感器,定义一个 poke
方法来轮询您的外部状态并评估成功标准。
传感器具有一个强大的功能,称为 'reschedule'
模式,该模式允许重新调度传感器任务,而不是在轮询之间阻塞 worker 插槽。当您可以容忍更长的轮询间隔并希望长时间轮询时,这很有用。
重新调度模式有一个警告,即您的传感器不能在重新调度的执行之间保持内部状态。在这种情况下,您应该使用 airflow.sensors.base.poke_mode_only()
装饰您的传感器。这将让用户知道您的传感器不适合与重新调度模式一起使用。
一个保留内部状态且不能与重新调度模式一起使用的传感器的示例是 airflow.providers.google.cloud.sensors.gcs.GCSUploadSessionCompleteSensor
。它轮询前缀处的对象数量(该数量是传感器的内部状态),并在经过一定时间且对象数量没有变化时成功。