操作符¶
操作符在概念上是预定义任务的模板,您可以在 DAG 中以声明方式定义它们
with DAG("my-dag") as dag:
ping = HttpOperator(endpoint="http://example.com/update/")
email = EmailOperator(to="[email protected]", subject="Update complete")
ping >> email
Airflow 提供了非常广泛的操作符集合,其中一些内置于核心或预安装的提供程序中。一些来自核心的常用操作符包括
BashOperator
- 执行 bash 命令PythonOperator
- 调用任意 Python 函数EmailOperator
- 发送电子邮件使用
@task
装饰器执行任意 Python 函数。它不支持渲染作为参数传递的 jinja 模板。
注意
建议使用 @task
装饰器,而不是传统的 PythonOperator
来执行 Python 可调用对象,其参数中没有模板渲染。
有关所有核心操作符的列表,请参阅:核心操作符和钩子参考。
如果默认情况下 Airflow 没有安装您需要的操作符,您可能会在我们的庞大社区提供程序软件包中找到它。这里的一些常用操作符包括
MySqlOperator
PostgresOperator
MsSqlOperator
OracleOperator
JdbcOperator
但是还有很多很多 - 您可以在我们的提供程序软件包文档中查看所有社区管理的操作符、钩子、传感器和传输的完整列表。
注意
在 Airflow 的代码中,我们经常混用任务和操作符的概念,它们在很大程度上是可互换的。但是,当我们谈论任务时,我们指的是 DAG 的通用“执行单元”;当我们谈论操作符时,我们指的是一个可重用的、预先制作的任务模板,其逻辑已全部为您完成,只需要一些参数。
Jinja 模板¶
Airflow 利用了Jinja 模板的强大功能,这可以与宏结合使用。
例如,假设您想使用 BashOperator
将数据间隔的开始作为环境变量传递给 Bash 脚本
# The start of the data interval as YYYY-MM-DD
date = "{{ ds }}"
t = BashOperator(
task_id="test_env",
bash_command="/tmp/test.sh ",
dag=dag,
env={"DATA_INTERVAL_START": date},
)
在这里,{{ ds }}
是一个模板变量,并且由于 BashOperator
的 env
参数使用 Jinja 进行模板化,因此数据间隔的开始日期将作为名为 DATA_INTERVAL_START
的环境变量在您的 Bash 脚本中可用。
当 Python 比 Jinja 模板更具可读性时,您也可以传入一个可调用对象。可调用对象必须接受两个命名参数 context
和 jinja_env
def build_complex_command(context, jinja_env):
with open("file.csv") as f:
return do_complex_things(f)
t = BashOperator(
task_id="complex_templated_echo",
bash_command=build_complex_command,
dag=dag,
)
由于每个模板字段只渲染一次,因此可调用对象的返回值不会再次进行渲染。因此,可调用对象必须手动渲染任何模板。这可以通过像这样调用当前任务上的 render_template()
来完成
def build_complex_command(context, jinja_env):
with open("file.csv") as f:
data = do_complex_things(f)
return context["task"].render_template(data, context, jinja_env)
您可以对文档中标记为“templated”的每个参数使用模板。模板替换发生在调用操作符的 pre_execute
函数之前。
您还可以对嵌套字段使用模板,只要这些嵌套字段在它们所属的结构中标记为模板化即可:在 template_fields
属性中注册的字段将提交给模板替换,如下例中的 path
字段
class MyDataReader:
template_fields: Sequence[str] = ("path",)
def __init__(self, my_path):
self.path = my_path
# [additional code here...]
t = PythonOperator(
task_id="transform_data",
python_callable=transform_data,
op_args=[MyDataReader("/tmp/{{ ds }}/my_file")],
dag=dag,
)
注意
template_fields
属性是一个类变量,保证是 Sequence[str]
类型(即字符串的列表或元组)。
也可以替换深度嵌套的字段,只要所有中间字段都标记为模板字段
class MyDataTransformer:
template_fields: Sequence[str] = ("reader",)
def __init__(self, my_reader):
self.reader = my_reader
# [additional code here...]
class MyDataReader:
template_fields: Sequence[str] = ("path",)
def __init__(self, my_path):
self.path = my_path
# [additional code here...]
t = PythonOperator(
task_id="transform_data",
python_callable=transform_data,
op_args=[MyDataTransformer(MyDataReader("/tmp/{{ ds }}/my_file"))],
dag=dag,
)
您可以在创建 DAG 时将自定义选项传递给 Jinja Environment
。一个常见的用法是避免 Jinja 从模板字符串中删除尾随换行符
my_dag = DAG(
dag_id="my-dag",
jinja_environment_kwargs={
"keep_trailing_newline": True,
# some other jinja2 Environment options here
},
)
请参阅 Jinja 文档,以查找所有可用选项。
某些操作符还会将以特定后缀(在 template_ext
中定义)结尾的字符串视为渲染字段时对文件的引用。这对于直接从文件加载脚本或查询而不是将它们包含在 DAG 代码中很有用。
例如,考虑一个运行多行 bash 脚本的 BashOperator,这将加载 script.sh
处的文件,并将其内容用作 bash_command
的值
run_script = BashOperator(
task_id="run_script",
bash_command="script.sh",
)
默认情况下,以这种方式提供的路径应相对于 DAG 的文件夹提供(因为这是默认的 Jinja 模板搜索路径),但可以通过在 DAG 上设置 template_searchpath
参数来添加其他路径。
在某些情况下,您可能希望排除字符串的模板化并直接使用它。考虑以下任务
print_script = BashOperator(
task_id="print_script",
bash_command="cat script.sh",
)
这将失败,并显示 TemplateNotFound: cat script.sh
,因为 Airflow 会将该字符串视为文件的路径,而不是命令。我们可以通过将其包装在 literal()
中来防止 Airflow 将此值视为对文件的引用。此方法禁用宏和文件的渲染,并且可以应用于选定的嵌套字段,同时保留其余内容的默认模板规则。
from airflow.utils.template import literal
fixed_print_script = BashOperator(
task_id="fixed_print_script",
bash_command=literal("cat script.sh"),
)
2.8 版本新增: 添加了 literal()
。
或者,如果您想防止 Airflow 将值视为对文件的引用,您可以覆盖 template_ext
fixed_print_script = BashOperator(
task_id="fixed_print_script",
bash_command="cat script.sh",
)
fixed_print_script.template_ext = ()
将字段渲染为原生 Python 对象¶
默认情况下,template_fields
中的所有 Jinja 模板都渲染为字符串。然而,这并不总是期望的。例如,假设一个 extract
任务将字典 {"1001": 301.27, "1002": 433.21, "1003": 502.22}
推送到 XCom
@task(task_id="extract")
def extract():
data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
return json.loads(data_string)
如果一个任务依赖于 extract
,则 order_data
参数将传递一个字符串 "{'1001': 301.27, '1002': 433.21, '1003': 502.22}"
def transform(order_data):
total_order_value = sum(order_data.values()) # Fails because order_data is a str :(
return {"total_order_value": total_order_value}
transform = PythonOperator(
task_id="transform",
op_kwargs={"order_data": "{{ ti.xcom_pull('extract') }}"},
python_callable=transform,
)
extract() >> transform
如果我们想要获得实际的 dict,则有两种解决方案。第一个是使用可调用对象
def render_transform_op_kwargs(context, jinja_env):
order_data = context["ti"].xcom_pull("extract")
return {"order_data": order_data}
transform = PythonOperator(
task_id="transform",
op_kwargs=render_transform_op_kwargs,
python_callable=transform,
)
或者,也可以指示 Jinja 渲染原生 Python 对象。这可以通过将 render_template_as_native_obj=True
传递给 DAG 来完成。这使得 Airflow 使用 NativeEnvironment 而不是默认的 SandboxedEnvironment
with DAG(
dag_id="example_template_as_python_object",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
render_template_as_native_obj=True,
):
transform = PythonOperator(
task_id="transform",
op_kwargs={"order_data": "{{ ti.xcom_pull('extract') }}"},
python_callable=transform,
)
保留的 params 关键字¶
在 Apache Airflow 2.2.0 中,params
变量在 DAG 序列化期间使用。请不要在第三方操作符中使用该名称。如果您升级您的环境并收到以下错误
AttributeError: 'str' object has no attribute '__module__'
请更改操作符中的 params
名称。