操作符¶
从概念上来说,操作符是预定义的 任务 的模板,您可以在 DAG 中以声明方式定义它
with DAG("my-dag") as dag:
ping = HttpOperator(endpoint="http://example.com/update/")
email = EmailOperator(to="[email protected]", subject="Update complete")
ping >> email
Airflow 拥有非常广泛的操作符集,其中一些内置于核心或预安装的提供程序中。核心中的部分常用操作符包括
BashOperator
- 执行 bash 命令PythonOperator
- 调用任意 Python 函数EmailOperator
- 发送电子邮件使用
@task
装饰器执行任意 Python 函数。它不支持呈现作为参数传递的 Jinja 模板。
注意
@task
装饰器推荐用于执行 Python 可调用对象,其参数中没有模板呈现,而不是经典的 PythonOperator
。
有关所有核心操作符的列表,请参阅:核心操作符和钩子参考。
如果所需的操作符未随 Airflow 默认安装,你可能会在我们的庞大社区 提供程序包 中找到它。此处一些流行的操作符包括
但还有更多 - 你可以在我们的 提供程序包 文档中查看所有社区管理的操作符、钩子、传感器和传输的完整列表。
注意
在 Airflow 的代码中,我们经常混合 任务 和操作符的概念,它们基本上可以互换。但是,当我们谈论任务时,我们指的是 DAG 的通用“执行单元”;当我们谈论操作符时,我们指的是一个可重用的、预制的任务模板,其逻辑已为你完成,只需一些参数即可。
Jinja 模板¶
Airflow 利用了 Jinja 模板 的强大功能,这与 宏 结合使用时,可以成为一个强大的工具。
例如,假设您想使用 BashOperator
将数据间隔的开始作为环境变量传递给 Bash 脚本
# The start of the data interval as YYYY-MM-DD
date = "{{ ds }}"
t = BashOperator(
task_id="test_env",
bash_command="/tmp/test.sh ",
dag=dag,
env={"DATA_INTERVAL_START": date},
)
此处,{{ ds }}
是一个模板变量,并且由于 BashOperator
的 env
参数使用 Jinja 进行模板化,因此数据间隔的开始日期将作为名为 DATA_INTERVAL_START
的环境变量在您的 Bash 脚本中可用。
您可以对文档中标记为“templated”的每个参数使用 Jinja 模板。模板替换发生在调用操作符的 pre_execute
函数之前。
您还可以对嵌套字段使用 Jinja 模板,只要这些嵌套字段在其所属的结构中标记为模板即可:在 template_fields
属性中注册的字段将提交给模板替换,例如以下示例中的 path
字段
class MyDataReader:
template_fields: Sequence[str] = ("path",)
def __init__(self, my_path):
self.path = my_path
# [additional code here...]
t = PythonOperator(
task_id="transform_data",
python_callable=transform_data,
op_args=[MyDataReader("/tmp/{{ ds }}/my_file")],
dag=dag,
)
注意
template_fields
属性是一个类变量,并且保证为 Sequence[str]
类型(即字符串的列表或元组)。
只要所有中间字段都标记为模板字段,也可以替换深度嵌套字段
class MyDataTransformer:
template_fields: Sequence[str] = ("reader",)
def __init__(self, my_reader):
self.reader = my_reader
# [additional code here...]
class MyDataReader:
template_fields: Sequence[str] = ("path",)
def __init__(self, my_path):
self.path = my_path
# [additional code here...]
t = PythonOperator(
task_id="transform_data",
python_callable=transform_data,
op_args=[MyDataTransformer(MyDataReader("/tmp/{{ ds }}/my_file"))],
dag=dag,
)
在创建 DAG 时,您可以将自定义选项传递给 Jinja Environment
。一种常见的用法是避免 Jinja 从模板字符串中删除尾随换行符
my_dag = DAG(
dag_id="my-dag",
jinja_environment_kwargs={
"keep_trailing_newline": True,
# some other jinja2 Environment options here
},
)
请参阅 Jinja 文档 以查找所有可用的选项。
某些操作符还会将以特定后缀结尾的字符串(在 template_ext
中定义)视为呈现字段时的文件引用。这对于直接从文件加载脚本或查询(而不是将它们包含到 DAG 代码中)很有用。
例如,考虑一个运行多行 bash 脚本的 BashOperator,它将加载 script.sh
中的文件,并将其内容用作 bash_command
的值
run_script = BashOperator(
task_id="run_script",
bash_command="script.sh",
)
默认情况下,以这种方式提供的路径应相对于 DAG 的文件夹提供(因为这是默认的 Jinja 模板搜索路径),但可以通过在 DAG 上设置 template_searchpath
参数来添加其他路径。
在某些情况下,您可能希望从模板化中排除一个字符串并直接使用它。考虑以下任务
print_script = BashOperator(
task_id="print_script",
bash_command="cat script.sh",
)
这将失败,并显示 TemplateNotFound: cat script.sh
,因为 Airflow 会将该字符串视为文件路径,而不是命令。我们可以通过将 literal()
包裹该值来防止 Airflow 将此值视为对文件的引用。此方法禁用宏和文件的呈现,并且可以应用于选定的嵌套字段,同时为其余内容保留默认模板化规则。
from airflow.utils.template import literal
fixed_print_script = BashOperator(
task_id="fixed_print_script",
bash_command=literal("cat script.sh"),
)
2.8 版中的新增功能: 添加了 literal()
。
或者,如果您想阻止 Airflow 将值视为对文件的引用,则可以覆盖 template_ext
fixed_print_script = BashOperator(
task_id="fixed_print_script",
bash_command="cat script.sh",
)
fixed_print_script.template_ext = ()
将字段呈现为原生 Python 对象¶
默认情况下,所有 template_fields
都呈现为字符串。
示例,假设 extract
任务将一个字典(示例:{"1001": 301.27, "1002": 433.21, "1003": 502.22}
)推送到 XCom 表中。现在,当运行以下任务时,order_data
参数会传递一个字符串,例如:'{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
。
transform = PythonOperator(
task_id="transform",
op_kwargs={"order_data": "{{ti.xcom_pull('extract')}}"},
python_callable=transform,
)
如果你希望渲染后的模板字段返回一个原生 Python 对象(在我们的示例中是 dict
),你可以将 render_template_as_native_obj=True
传递给 DAG,如下所示
dag = DAG(
dag_id="example_template_as_python_object",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
render_template_as_native_obj=True,
)
@task(task_id="extract")
def extract():
data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
return json.loads(data_string)
def transform(order_data):
print(type(order_data))
total_order_value = 0
for value in order_data.values():
total_order_value += value
return {"total_order_value": total_order_value}
extract_task = extract()
transform_task = PythonOperator(
task_id="transform",
op_kwargs={"order_data": "{{ti.xcom_pull('extract')}}"},
python_callable=transform,
)
extract_task >> transform_task
在这种情况下,order_data
参数被传递:{"1001": 301.27, "1002": 433.21, "1003": 502.22}
。
当 render_template_as_native_obj
设置为 True
时,Airflow 使用 Jinja 的 NativeEnvironment。使用 NativeEnvironment
,渲染模板会生成一个原生 Python 类型。
保留的 params 关键字¶
在 Apache Airflow 2.2.0 中,params
变量在 DAG 序列化期间使用。请不要在第三方运算符中使用该名称。如果你升级了你的环境并收到了以下错误
AttributeError: 'str' object has no attribute '__module__'
请更改你的运算符中的 params
名称。