操作符

操作符在概念上是预定义任务的模板,您可以在 DAG 中以声明方式定义它们

with DAG("my-dag") as dag:
    ping = HttpOperator(endpoint="http://example.com/update/")
    email = EmailOperator(to="[email protected]", subject="Update complete")

    ping >> email

Airflow 提供了非常广泛的操作符集合,其中一些内置于核心或预安装的提供程序中。一些来自核心的常用操作符包括

  • BashOperator - 执行 bash 命令

  • PythonOperator - 调用任意 Python 函数

  • EmailOperator - 发送电子邮件

  • 使用 @task 装饰器执行任意 Python 函数。它不支持渲染作为参数传递的 jinja 模板。

注意

建议使用 @task 装饰器,而不是传统的 PythonOperator 来执行 Python 可调用对象,其参数中没有模板渲染。

有关所有核心操作符的列表,请参阅:核心操作符和钩子参考

如果默认情况下 Airflow 没有安装您需要的操作符,您可能会在我们的庞大社区提供程序软件包中找到它。这里的一些常用操作符包括

但是还有很多很多 - 您可以在我们的提供程序软件包文档中查看所有社区管理的操作符、钩子、传感器和传输的完整列表。

注意

在 Airflow 的代码中,我们经常混用任务和操作符的概念,它们在很大程度上是可互换的。但是,当我们谈论任务时,我们指的是 DAG 的通用“执行单元”;当我们谈论操作符时,我们指的是一个可重用的、预先制作的任务模板,其逻辑已全部为您完成,只需要一些参数。

Jinja 模板

Airflow 利用了Jinja 模板的强大功能,这可以与结合使用。

例如,假设您想使用 BashOperator 将数据间隔的开始作为环境变量传递给 Bash 脚本

# The start of the data interval as YYYY-MM-DD
date = "{{ ds }}"
t = BashOperator(
    task_id="test_env",
    bash_command="/tmp/test.sh ",
    dag=dag,
    env={"DATA_INTERVAL_START": date},
)

在这里,{{ ds }} 是一个模板变量,并且由于 BashOperatorenv 参数使用 Jinja 进行模板化,因此数据间隔的开始日期将作为名为 DATA_INTERVAL_START 的环境变量在您的 Bash 脚本中可用。

当 Python 比 Jinja 模板更具可读性时,您也可以传入一个可调用对象。可调用对象必须接受两个命名参数 contextjinja_env

def build_complex_command(context, jinja_env):
    with open("file.csv") as f:
        return do_complex_things(f)


t = BashOperator(
    task_id="complex_templated_echo",
    bash_command=build_complex_command,
    dag=dag,
)

由于每个模板字段只渲染一次,因此可调用对象的返回值不会再次进行渲染。因此,可调用对象必须手动渲染任何模板。这可以通过像这样调用当前任务上的 render_template() 来完成

def build_complex_command(context, jinja_env):
    with open("file.csv") as f:
        data = do_complex_things(f)
    return context["task"].render_template(data, context, jinja_env)

您可以对文档中标记为“templated”的每个参数使用模板。模板替换发生在调用操作符的 pre_execute 函数之前。

您还可以对嵌套字段使用模板,只要这些嵌套字段在它们所属的结构中标记为模板化即可:在 template_fields 属性中注册的字段将提交给模板替换,如下例中的 path 字段

class MyDataReader:
    template_fields: Sequence[str] = ("path",)

    def __init__(self, my_path):
        self.path = my_path

    # [additional code here...]


t = PythonOperator(
    task_id="transform_data",
    python_callable=transform_data,
    op_args=[MyDataReader("/tmp/{{ ds }}/my_file")],
    dag=dag,
)

注意

template_fields 属性是一个类变量,保证是 Sequence[str] 类型(即字符串的列表或元组)。

也可以替换深度嵌套的字段,只要所有中间字段都标记为模板字段

class MyDataTransformer:
    template_fields: Sequence[str] = ("reader",)

    def __init__(self, my_reader):
        self.reader = my_reader

    # [additional code here...]


class MyDataReader:
    template_fields: Sequence[str] = ("path",)

    def __init__(self, my_path):
        self.path = my_path

    # [additional code here...]


t = PythonOperator(
    task_id="transform_data",
    python_callable=transform_data,
    op_args=[MyDataTransformer(MyDataReader("/tmp/{{ ds }}/my_file"))],
    dag=dag,
)

您可以在创建 DAG 时将自定义选项传递给 Jinja Environment。一个常见的用法是避免 Jinja 从模板字符串中删除尾随换行符

my_dag = DAG(
    dag_id="my-dag",
    jinja_environment_kwargs={
        "keep_trailing_newline": True,
        # some other jinja2 Environment options here
    },
)

请参阅 Jinja 文档,以查找所有可用选项。

某些操作符还会将以特定后缀(在 template_ext 中定义)结尾的字符串视为渲染字段时对文件的引用。这对于直接从文件加载脚本或查询而不是将它们包含在 DAG 代码中很有用。

例如,考虑一个运行多行 bash 脚本的 BashOperator,这将加载 script.sh 处的文件,并将其内容用作 bash_command 的值

run_script = BashOperator(
    task_id="run_script",
    bash_command="script.sh",
)

默认情况下,以这种方式提供的路径应相对于 DAG 的文件夹提供(因为这是默认的 Jinja 模板搜索路径),但可以通过在 DAG 上设置 template_searchpath 参数来添加其他路径。

在某些情况下,您可能希望排除字符串的模板化并直接使用它。考虑以下任务

print_script = BashOperator(
    task_id="print_script",
    bash_command="cat script.sh",
)

这将失败,并显示 TemplateNotFound: cat script.sh,因为 Airflow 会将该字符串视为文件的路径,而不是命令。我们可以通过将其包装在 literal() 中来防止 Airflow 将此值视为对文件的引用。此方法禁用宏和文件的渲染,并且可以应用于选定的嵌套字段,同时保留其余内容的默认模板规则。

from airflow.utils.template import literal


fixed_print_script = BashOperator(
    task_id="fixed_print_script",
    bash_command=literal("cat script.sh"),
)

2.8 版本新增: 添加了 literal()

或者,如果您想防止 Airflow 将值视为对文件的引用,您可以覆盖 template_ext

fixed_print_script = BashOperator(
    task_id="fixed_print_script",
    bash_command="cat script.sh",
)
fixed_print_script.template_ext = ()

将字段渲染为原生 Python 对象

默认情况下,template_fields 中的所有 Jinja 模板都渲染为字符串。然而,这并不总是期望的。例如,假设一个 extract 任务将字典 {"1001": 301.27, "1002": 433.21, "1003": 502.22} 推送到 XCom

@task(task_id="extract")
def extract():
    data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
    return json.loads(data_string)

如果一个任务依赖于 extract,则 order_data 参数将传递一个字符串 "{'1001': 301.27, '1002': 433.21, '1003': 502.22}"

def transform(order_data):
    total_order_value = sum(order_data.values())  # Fails because order_data is a str :(
    return {"total_order_value": total_order_value}


transform = PythonOperator(
    task_id="transform",
    op_kwargs={"order_data": "{{ ti.xcom_pull('extract') }}"},
    python_callable=transform,
)

extract() >> transform

如果我们想要获得实际的 dict,则有两种解决方案。第一个是使用可调用对象

def render_transform_op_kwargs(context, jinja_env):
    order_data = context["ti"].xcom_pull("extract")
    return {"order_data": order_data}


transform = PythonOperator(
    task_id="transform",
    op_kwargs=render_transform_op_kwargs,
    python_callable=transform,
)

或者,也可以指示 Jinja 渲染原生 Python 对象。这可以通过将 render_template_as_native_obj=True 传递给 DAG 来完成。这使得 Airflow 使用 NativeEnvironment 而不是默认的 SandboxedEnvironment

with DAG(
    dag_id="example_template_as_python_object",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    render_template_as_native_obj=True,
):
    transform = PythonOperator(
        task_id="transform",
        op_kwargs={"order_data": "{{ ti.xcom_pull('extract') }}"},
        python_callable=transform,
    )

保留的 params 关键字

在 Apache Airflow 2.2.0 中,params 变量在 DAG 序列化期间使用。请不要在第三方操作符中使用该名称。如果您升级您的环境并收到以下错误

AttributeError: 'str' object has no attribute '__module__'

请更改操作符中的 params 名称。

此条目是否有帮助?