Airflow 峰会 2025 将于 10 月 07-09 日举行。立即注册获取早鸟票!

操作符

操作符在概念上是一个预定义任务的模板,你可以在 DAG 中声明式地定义它

with DAG("my-dag") as dag:
    ping = HttpOperator(endpoint="http://example.com/update/")
    email = EmailOperator(to="admin@example.com", subject="Update complete")

    ping >> email

Airflow 提供了非常丰富的操作符集,其中一些是内置于核心或预安装的 Provider 中的。核心模块中一些常用的操作符包括

  • BashOperator - 执行一个 bash 命令

  • PythonOperator - 调用一个任意的 Python 函数

  • 使用 @task 装饰器执行任意 Python 函数。它不支持渲染作为参数传递的 Jinja 模板。

注意

建议使用 @task 装饰器而非传统的 PythonOperator 来执行不带参数模板渲染的 Python 可调用对象。

有关所有核心操作符的列表,请参阅:核心操作符和钩子参考

如果你需要的操作符默认未随 Airflow 安装,你很可能在我们庞大的社区 provider 集中找到它。这里的一些常用操作符包括

但还有非常非常多 - 你可以在我们的 provider 包 文档中看到所有社区管理的操作符、钩子、传感器和传输器的完整列表。

注意

在 Airflow 代码内部,我们经常混用任务和操作符的概念,并且它们大部分是可以互换的。然而,当我们谈论*任务*时,我们指的是 DAG 的通用“执行单元”;当我们谈论*操作符*时,我们指的是一个可重用、预制的任务模板,其逻辑已为你完成,只需传入一些参数。

Jinja 模板

Airflow 利用了 Jinja 模板 的强大功能,结合 使用时,这会成为一个强大的工具。

例如,假设你想使用 BashOperator 将数据区间的开始时间作为环境变量传递给 Bash 脚本

# The start of the data interval as YYYY-MM-DD
date = "{{ ds }}"
t = BashOperator(
    task_id="test_env",
    bash_command="/tmp/test.sh ",
    dag=dag,
    env={"DATA_INTERVAL_START": date},
)

在这里,{{ ds }} 是一个模板变量,并且由于 BashOperatorenv 参数使用了 Jinja 模板,数据区间的开始日期将作为名为 DATA_INTERVAL_START 的环境变量在你的 Bash 脚本中可用。

当 Python 比 Jinja 模板更易读时,你也可以传入一个可调用对象。该可调用对象必须接受两个命名参数 contextjinja_env

def build_complex_command(context, jinja_env):
    with open("file.csv") as f:
        return do_complex_things(f)


t = BashOperator(
    task_id="complex_templated_echo",
    bash_command=build_complex_command,
    dag=dag,
)

由于每个模板字段只渲染一次,可调用对象的返回值不会再次通过渲染。因此,可调用对象必须手动渲染任何模板。这可以通过在当前任务上调用 render_template() 来完成,如下所示

def build_complex_command(context, jinja_env):
    with open("file.csv") as f:
        data = do_complex_things(f)
    return context["task"].render_template(data, context, jinja_env)

你可以对文档中标注为“模板化”的每个参数使用模板。模板替换发生在调用操作符的 pre_execute 函数之前。

你还可以对嵌套字段使用模板,只要这些嵌套字段在其所属结构中被标记为模板化:在 template_fields 属性中注册的字段将进行模板替换,如下例中的 path 字段。

class MyDataReader:
    template_fields: Sequence[str] = ("path",)

    def __init__(self, my_path):
        self.path = my_path

    # [additional code here...]


t = PythonOperator(
    task_id="transform_data",
    python_callable=transform_data,
    op_args=[MyDataReader("/tmp/{{ ds }}/my_file")],
    dag=dag,
)

注意

template_fields 属性是一个类变量,并保证是 Sequence[str] 类型(即字符串的列表或元组)。

深度嵌套字段也可以被替换,只要所有中间字段都标记为模板字段。

class MyDataTransformer:
    template_fields: Sequence[str] = ("reader",)

    def __init__(self, my_reader):
        self.reader = my_reader

    # [additional code here...]


class MyDataReader:
    template_fields: Sequence[str] = ("path",)

    def __init__(self, my_path):
        self.path = my_path

    # [additional code here...]


t = PythonOperator(
    task_id="transform_data",
    python_callable=transform_data,
    op_args=[MyDataTransformer(MyDataReader("/tmp/{{ ds }}/my_file"))],
    dag=dag,
)

你可以在创建 DAG 时向 Jinja 的 Environment 传入自定义选项。一个常用例子是避免 Jinja 删除模板字符串中的尾随换行符。

my_dag = DAG(
    dag_id="my-dag",
    jinja_environment_kwargs={
        "keep_trailing_newline": True,
        # some other jinja2 Environment options here
    },
)

请参阅 Jinja 文档 以找到所有可用选项。

一些操作符在渲染字段时,也会将以特定后缀(在 template_ext 中定义)结尾的字符串视为对文件的引用。这对于直接从文件加载脚本或查询非常有用,而无需将它们包含在 DAG 代码中。

例如,考虑一个运行多行 bash 脚本的 BashOperator,它将加载文件 script.sh 并将其内容用作 bash_command 的值。

run_script = BashOperator(
    task_id="run_script",
    bash_command="script.sh",
)

默认情况下,以这种方式提供的路径应相对于 DAG 文件夹(因为这是默认的 Jinja 模板搜索路径),但可以通过在 DAG 上设置 template_searchpath 参数来添加额外路径。

在某些情况下,你可能希望将字符串排除在模板化之外并直接使用它。考虑以下任务

print_script = BashOperator(
    task_id="print_script",
    bash_command="cat script.sh",
)

这将失败,并出现 TemplateNotFound: cat script.sh 错误,因为 Airflow 会将该字符串视为文件路径而非命令。我们可以通过将其包装在 literal() 中来阻止 Airflow 将此值视为对文件的引用。此方法禁用宏和文件的渲染,可应用于选定的嵌套字段,同时保留其余内容的默认模板化规则。

from airflow.sdk import literal


fixed_print_script = BashOperator(
    task_id="fixed_print_script",
    bash_command=literal("cat script.sh"),
)

自 2.8 版新增:已新增 literal()

或者,如果你想阻止 Airflow 将某个值视为对文件的引用,可以覆盖 template_ext

fixed_print_script = BashOperator(
    task_id="fixed_print_script",
    bash_command="cat script.sh",
)
fixed_print_script.template_ext = ()

将字段渲染为原生 Python 对象

默认情况下,template_fields 中的所有 Jinja 模板都渲染为字符串。然而,这并非总是期望的行为。例如,假设一个 extract 任务将字典 {"1001": 301.27, "1002": 433.21, "1003": 502.22} 推送到 XCom

@task(task_id="extract")
def extract():
    data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
    return json.loads(data_string)

如果一个任务依赖于 extractorder_data 参数将传入字符串 "{'1001': 301.27, '1002': 433.21, '1003': 502.22}"

def transform(order_data):
    total_order_value = sum(order_data.values())  # Fails because order_data is a str :(
    return {"total_order_value": total_order_value}


transform = PythonOperator(
    task_id="transform",
    op_kwargs={"order_data": "{{ ti.xcom_pull('extract') }}"},
    python_callable=transform,
)

extract() >> transform

如果想获得实际的字典对象,有两种解决方案。第一种是使用可调用对象

def render_transform_op_kwargs(context, jinja_env):
    order_data = context["ti"].xcom_pull("extract")
    return {"order_data": order_data}


transform = PythonOperator(
    task_id="transform",
    op_kwargs=render_transform_op_kwargs,
    python_callable=transform,
)

或者,也可以指示 Jinja 渲染原生 Python 对象。这可以通过向 DAG 传递 render_template_as_native_obj=True 来实现。这将使 Airflow 使用 NativeEnvironment 而非默认的 SandboxedEnvironment

with DAG(
    dag_id="example_template_as_python_object",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    render_template_as_native_obj=True,
):
    transform = PythonOperator(
        task_id="transform",
        op_kwargs={"order_data": "{{ ti.xcom_pull('extract') }}"},
        python_callable=transform,
    )

保留的 params 关键字

在 Apache Airflow 2.2.0 中,params 变量在 DAG 序列化期间使用。请不要在第三方操作符中使用该名称。如果你升级环境并遇到以下错误

AttributeError: 'str' object has no attribute '__module__'

请更改操作符中 params 的名称。

模板与 f-string 的冲突

使用 Python f-string 构造模板化字段(例如 BashOperator 中的 bash_command)的字符串时,请注意 f-string 插值与 Jinja 模板语法的交互。两者都使用大括号 ({})。

Python f-string 将双大括号 ({{}}`) 解释为文字单大括号 ({}`) 的转义序列。然而,Jinja 使用双大括号 ({{ variable }}`) 来表示模板化变量。

如果你需要在用 f-string 定义的字符串中字面包含一个 Jinja 模板表达式(例如,{{ ds }}`),以便 Airflow 的 Jinja 引擎稍后可以处理它,你必须通过将 f-string 的大括号*再次*加倍来转义它们。这意味着使用四个大括号

t1 = BashOperator(
    task_id="fstring_templating_correct",
    bash_command=f"echo Data interval start: {{{{ ds }}}}",
    dag=dag,
)

python_var = "echo Data interval start:"

t2 = BashOperator(
    task_id="fstring_templating_simple",
    bash_command=f"{python_var} {{{{ ds }}}}",
    dag=dag,
)

这确保 f-string 处理的结果字符串包含 Jinja 所需的字面双大括号,Airflow 随后可以在执行前正确地对其进行模板化。未能做到这一点是初学者常犯的错误,可能导致 DAG 解析期间的错误,或在模板化未按预期发生时导致运行时出现意外行为。

此条目是否有帮助?