使用 TaskFlow¶
本教程基于常规的 Airflow 教程,特别关注使用 TaskFlow API 范例编写数据管道,这是 Airflow 2.0 的一部分,并将其与使用传统范例编写的 DAG 进行对比。
此处选择的数据管道是一个简单的模式,包含三个独立的提取、转换和加载任务。
“TaskFlow API” 管道示例¶
这是一个使用 TaskFlow API 范例的非常简单的管道。下面将给出更详细的解释。
import json
import pendulum
from airflow.decorators import dag, task
@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
)
def tutorial_taskflow_api():
"""
### TaskFlow API Tutorial Documentation
This is a simple data pipeline example which demonstrates the use of
the TaskFlow API using three simple tasks for Extract, Transform, and Load.
Documentation that goes along with the Airflow TaskFlow API tutorial is
located
[here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html)
"""
@task()
def extract():
"""
#### Extract task
A simple Extract task to get data ready for the rest of the data
pipeline. In this case, getting data is simulated by reading from a
hardcoded JSON string.
"""
data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
order_data_dict = json.loads(data_string)
return order_data_dict
@task(multiple_outputs=True)
def transform(order_data_dict: dict):
"""
#### Transform task
A simple Transform task which takes in the collection of order data and
computes the total order value.
"""
total_order_value = 0
for value in order_data_dict.values():
total_order_value += value
return {"total_order_value": total_order_value}
@task()
def load(total_order_value: float):
"""
#### Load task
A simple Load task which takes in the result of the Transform task and
instead of saving it to end user review, just prints it out.
"""
print(f"Total order value is: {total_order_value:.2f}")
order_data = extract()
order_summary = transform(order_data)
load(order_summary["total_order_value"])
tutorial_taskflow_api()
这是一个 DAG 定义文件¶
如果这是您看到的第一个 DAG 文件,请注意,此 Python 脚本由 Airflow 解释,并且是您的数据管道的配置文件。有关 DAG 文件的完整介绍,请参阅核心的基础教程,其中广泛涵盖了 DAG 结构和定义。
实例化 DAG¶
我们正在创建一个 DAG,它是我们的任务集合,任务之间存在依赖关系。这是一个非常简单的定义,因为我们只想在通过 Airflow 设置时运行 DAG,而无需任何重试或复杂的调度。在此示例中,请注意,我们正在使用如下所示的 @dag
装饰器创建此 DAG,其中 Python 函数名称充当 DAG 标识符。
@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
)
def tutorial_taskflow_api():
"""
### TaskFlow API Tutorial Documentation
This is a simple data pipeline example which demonstrates the use of
the TaskFlow API using three simple tasks for Extract, Transform, and Load.
Documentation that goes along with the Airflow TaskFlow API tutorial is
located
[here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html)
"""
现在要实际启用它作为 DAG 运行,我们调用使用前面所示的 @dag
装饰器设置的 Python 函数 tutorial_taskflow_api
,如下所示。
tutorial_taskflow_api()
在 2.4 版本中更改:如果 DAG 在 with
块内部使用,或者它是 @dag
装饰函数的返回值,则不再需要将 DAG “注册”到全局变量中,Airflow 才能检测到 DAG。
任务¶
在此数据管道中,任务是基于使用如下所示的 @task
装饰器的 Python 函数创建的。函数名称充当任务的唯一标识符。
@task()
def extract():
"""
#### Extract task
A simple Extract task to get data ready for the rest of the data
pipeline. In this case, getting data is simulated by reading from a
hardcoded JSON string.
"""
data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
order_data_dict = json.loads(data_string)
return order_data_dict
返回值(在本例中是一个字典)将可用于以后的任务。
转换和加载任务的创建方式与上面显示的提取任务相同。
DAG 的主要流程¶
现在我们已经根据 Python 函数定义了提取、转换和加载任务,我们可以转到 DAG 的主要部分。
order_data = extract()
order_summary = transform(order_data)
load(order_summary["total_order_value"])
就是这样,我们完成了!我们调用了提取任务,从那里获得了订单数据,并将其发送到转换任务进行汇总,然后使用汇总数据调用了加载任务。任务之间的依赖关系以及在网络上不同节点上的不同工作人员上运行的这些任务之间的数据传递都由 Airflow 处理。
现在要实际启用它作为 DAG 运行,我们调用使用前面所示的 @dag
装饰器设置的 Python 函数 tutorial_taskflow_api
,如下所示。
tutorial_taskflow_api()
但是如何?¶
对于经验丰富的 Airflow DAG 作者来说,这非常简单!让我们将此与 Airflow 2.0 之前必须编写此 DAG 的方式进行对比,如下所示
import json
import textwrap
import pendulum
# The DAG object; we'll need this to instantiate a DAG
from airflow.models.dag import DAG
# Operators; we need this to operate!
from airflow.operators.python import PythonOperator
with DAG(
"tutorial_dag",
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args={"retries": 2},
description="DAG tutorial",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
) as dag:
dag.doc_md = __doc__
def extract(**kwargs):
ti = kwargs["ti"]
data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
ti.xcom_push("order_data", data_string)
def transform(**kwargs):
ti = kwargs["ti"]
extract_data_string = ti.xcom_pull(task_ids="extract", key="order_data")
order_data = json.loads(extract_data_string)
total_order_value = 0
for value in order_data.values():
total_order_value += value
total_value = {"total_order_value": total_order_value}
total_value_json_string = json.dumps(total_value)
ti.xcom_push("total_order_value", total_value_json_string)
def load(**kwargs):
ti = kwargs["ti"]
total_value_string = ti.xcom_pull(task_ids="transform", key="total_order_value")
total_order_value = json.loads(total_value_string)
print(total_order_value)
extract_task = PythonOperator(
task_id="extract",
python_callable=extract,
)
extract_task.doc_md = textwrap.dedent(
"""\
#### Extract task
A simple Extract task to get data ready for the rest of the data pipeline.
In this case, getting data is simulated by reading from a hardcoded JSON string.
This data is then put into xcom, so that it can be processed by the next task.
"""
)
transform_task = PythonOperator(
task_id="transform",
python_callable=transform,
)
transform_task.doc_md = textwrap.dedent(
"""\
#### Transform task
A simple Transform task which takes in the collection of order data from xcom
and computes the total order value.
This computed value is then put into xcom, so that it can be processed by the next task.
"""
)
load_task = PythonOperator(
task_id="load",
python_callable=load,
)
load_task.doc_md = textwrap.dedent(
"""\
#### Load task
A simple Load task which takes in the result of the Transform task, by reading it
from xcom and instead of saving it to end user review, just prints it out.
"""
)
extract_task >> transform_task >> load_task
上面显示的所有处理也在新的 Airflow 2.0 DAG 中完成,但它都从 DAG 开发人员那里抽象出来。
让我们通过隔离地查看转换任务来详细检查这一点,因为它位于数据管道的中间。在 Airflow 1.x 中,此任务的定义如下所示
def transform(**kwargs):
ti = kwargs["ti"]
extract_data_string = ti.xcom_pull(task_ids="extract", key="order_data")
order_data = json.loads(extract_data_string)
total_order_value = 0
for value in order_data.values():
total_order_value += value
total_value = {"total_order_value": total_order_value}
total_value_json_string = json.dumps(total_value)
ti.xcom_push("total_order_value", total_value_json_string)
正如我们在这里看到的,在转换函数中处理的数据是使用 XCom 变量传递给它的。反过来,来自转换函数的汇总数据也被放入另一个 XCom 变量中,该变量随后将被加载任务使用。
将其与 Airflow 2.0 中的 TaskFlow API 进行对比,如下所示。
@task(multiple_outputs=True)
def transform(order_data_dict: dict):
"""
#### Transform task
A simple Transform task which takes in the collection of order data and
computes the total order value.
"""
total_order_value = 0
for value in order_data_dict.values():
total_order_value += value
return {"total_order_value": total_order_value}
Airflow 2.0 中,所有用于在这些任务之间传递数据的 XCom 使用都从 DAG 作者那里抽象出来。但是,XCom 变量在后台使用,并且可以使用 Airflow UI 查看,以方便调试或 DAG 监控。
类似地,任务依赖关系在 TaskFlows 中基于任务的功能调用自动生成。在 Airflow 1.x 中,必须显式创建任务并指定依赖关系,如下所示。
extract_task = PythonOperator(
task_id="extract",
python_callable=extract,
)
extract_task.doc_md = textwrap.dedent(
"""\
#### Extract task
A simple Extract task to get data ready for the rest of the data pipeline.
In this case, getting data is simulated by reading from a hardcoded JSON string.
This data is then put into xcom, so that it can be processed by the next task.
"""
)
transform_task = PythonOperator(
task_id="transform",
python_callable=transform,
)
transform_task.doc_md = textwrap.dedent(
"""\
#### Transform task
A simple Transform task which takes in the collection of order data from xcom
and computes the total order value.
This computed value is then put into xcom, so that it can be processed by the next task.
"""
)
load_task = PythonOperator(
task_id="load",
python_callable=load,
)
load_task.doc_md = textwrap.dedent(
"""\
#### Load task
A simple Load task which takes in the result of the Transform task, by reading it
from xcom and instead of saving it to end user review, just prints it out.
"""
)
extract_task >> transform_task >> load_task
相比之下,使用 Airflow 2.0 中的 TaskFlow API,调用本身会自动生成依赖关系,如下所示。
order_data = extract()
order_summary = transform(order_data)
load(order_summary["total_order_value"])
重用装饰任务¶
装饰任务是灵活的。您可以在多个 DAG 中重用装饰任务,覆盖任务参数,例如 task_id
、queue
、pool
等。
下面是如何在多个 DAG 中重用装饰任务的示例
from airflow.decorators import task, dag
from datetime import datetime
@task
def add_task(x, y):
print(f"Task args: x={x}, y={y}")
return x + y
@dag(start_date=datetime(2022, 1, 1))
def mydag():
start = add_task.override(task_id="start")(1, 2)
for i in range(3):
start >> add_task.override(task_id=f"add_start_{i}")(start, i)
@dag(start_date=datetime(2022, 1, 1))
def mydag2():
start = add_task(1, 2)
for i in range(3):
start >> add_task.override(task_id=f"new_add_task_{i}")(start, i)
first_dag = mydag()
second_dag = mydag2()
您还可以导入上面的 add_task
并在另一个 DAG 文件中使用它。假设 add_task
代码位于名为 common.py
的文件中。您可以这样做
from common import add_task
from airflow.decorators import dag
from datetime import datetime
@dag(start_date=datetime(2022, 1, 1))
def use_add_task():
start = add_task.override(priority_weight=3)(1, 2)
for i in range(3):
start >> add_task.override(task_id=f"new_add_task_{i}", retries=4)(start, i)
created_dag = use_add_task()
将 TaskFlow API 与复杂/冲突的 Python 依赖项一起使用¶
如果您的任务需要复杂或冲突的需求,那么您将能够将 TaskFlow API 与 Python 虚拟环境(自 2.0.2 起)、Docker 容器(自 2.2.0 起)、ExternalPythonOperator(自 2.4.0 起)或 KubernetesPodOperator(自 2.4.0 起)一起使用。
此功能允许 TaskFlow API 具有更全面的用例,因为您不受 Airflow 工作线程的软件包和系统库的限制。对于下面描述的装饰函数的所有情况,您必须确保函数是可序列化的,并且它们仅使用本地导入来获取您使用的其他依赖项。这些导入的其他库必须在目标环境中可用 - 它们不需要在主要的 Airflow 环境中可用。
您应该使用哪个运算符取决于几个因素
您是否正在使用对 Docker 引擎或 Kubernetes 的访问权限来运行 Airflow
您是否可以承受动态创建具有新依赖项的虚拟环境的开销
您是否可以为所有 Airflow 组件部署预先存在的、不可变的 Python 环境。
这些选项应该为希望保持工作流程更简单、更 Pythonic 的用户提供更大的灵活性 - 并允许您将 DAG 的完整逻辑保留在 DAG 本身中。
您还可以获得有关管理冲突依赖项方法的更多上下文,包括有关每个选项的边界和后果的更详细说明,请参阅处理冲突/复杂 Python 依赖项的最佳实践
为每个任务动态创建 Virtualenv¶
最简单的方法是在同一台计算机上动态创建(每次运行任务时)单独的虚拟环境,您可以使用 @task.virtualenv
装饰器。该装饰器允许您动态创建具有自定义库甚至不同 Python 版本的新的虚拟环境来运行您的函数。
示例(动态创建的虚拟环境)
def callable_virtualenv():
"""
Example function that will be performed in a virtual environment.
Importing at the module level ensures that it will not attempt to import the
library before it is installed.
"""
from time import sleep
from colorama import Back, Fore, Style
print(Fore.RED + "some red text")
print(Back.GREEN + "and with a green background")
print(Style.DIM + "and in dim text")
print(Style.RESET_ALL)
for _ in range(4):
print(Style.DIM + "Please wait...", flush=True)
sleep(1)
print("Finished")
virtualenv_task = PythonVirtualenvOperator(
task_id="virtualenv_python",
python_callable=callable_virtualenv,
requirements=["colorama==0.4.0"],
system_site_packages=False,
)
使用预安装依赖项的 Python 环境¶
稍微复杂一点的 @task.external_python
装饰器允许您在预定义的、不可变的虚拟环境(或在系统级别安装的没有虚拟环境的 Python 二进制文件)中运行 Airflow 任务。此虚拟环境或系统 python 也可以安装一组不同的自定义库,并且必须在所有可以执行任务的工作线程中的相同位置提供。
使用 @task.external_python
的示例(使用不可变的、预先存在的虚拟环境)
def callable_external_python():
"""
Example function that will be performed in a virtual environment.
Importing at the module level ensures that it will not attempt to import the
library before it is installed.
"""
import sys
from time import sleep
print(f"Running task via {sys.executable}")
print("Sleeping")
for _ in range(4):
print("Please wait...", flush=True)
sleep(1)
print("Finished")
external_python_task = ExternalPythonOperator(
task_id="external_python",
python_callable=callable_external_python,
python=PATH_TO_PYTHON_BINARY,
)
使用 Docker Operator 分离依赖项¶
如果您的 Airflow 工作线程有权访问 docker 引擎,则可以改为使用 DockerOperator
并添加任何需要的参数以正确运行该任务。请注意,docker 镜像必须安装有可用的 Python,并将 bash 命令作为 command
参数。
值得注意的是,Python 源代码(从装饰函数中提取)和任何可调用参数都通过(编码和 pickle)环境变量发送到容器,因此它们的长度不是无限的(确切的限制取决于系统设置)。
下面是使用 @task.docker
装饰器运行 Python 任务的示例。
@task.docker(image="python:3.9-slim-bookworm", multiple_outputs=True)
def transform(order_data_dict: dict):
"""
#### Transform task
A simple Transform task which takes in the collection of order data and
computes the total order value.
"""
total_order_value = 0
for value in order_data_dict.values():
total_order_value += value
return {"total_order_value": total_order_value}
关于使用操作符的注意事项
注意
在早期 Airflow 版本中使用 @task.docker
装饰器
由于 @task.docker
装饰器在 Docker provider 中可用,你可能会想在 2.2 之前的 Airflow 版本中使用它,但这行不通。如果你尝试这样做,你会收到此错误:
AttributeError: '_TaskDecorator' object has no attribute 'docker'
你应该升级到 Airflow 2.2 或更高版本才能使用它。
使用 Kubernetes Pod Operator 进行依赖隔离¶
如果你的 Airflow worker 可以访问 Kubernetes,你可以改为使用 KubernetesPodOperator
并添加任何需要的参数来正确运行任务。
下面是使用 @task.kubernetes
装饰器运行 Python 任务的示例。
@task.kubernetes(
image="python:3.8-slim-buster",
name="k8s_test",
namespace="default",
in_cluster=False,
config_file="/path/to/.kube/config",
)
def execute_in_k8s_pod():
import time
print("Hello from k8s pod")
time.sleep(2)
@task.kubernetes(image="python:3.8-slim-buster", namespace="default", in_cluster=False)
def print_pattern():
n = 5
for i in range(n):
# inner loop to handle number of columns
# values changing acc. to outer loop
for _ in range(i + 1):
# printing stars
print("* ", end="")
# ending line after each row
print("\r")
execute_in_k8s_pod_instance = execute_in_k8s_pod()
print_pattern_instance = print_pattern()
execute_in_k8s_pod_instance >> print_pattern_instance
关于使用操作符的注意事项
注意
在早期 Airflow 版本中使用 @task.kubernetes
装饰器
由于 @task.kubernetes
装饰器在 Docker provider 中可用,你可能会想在 2.4 之前的 Airflow 版本中使用它,但这行不通。如果你尝试这样做,你会收到此错误:
AttributeError: '_TaskDecorator' object has no attribute 'kubernetes'
你应该升级到 Airflow 2.4 或更高版本才能使用它。
将 TaskFlow API 与 Sensor 操作符一起使用¶
你可以应用 @task.sensor
装饰器将常规 Python 函数转换为 BaseSensorOperator 类的实例。Python 函数实现 poke 逻辑并返回 PokeReturnValue
类的实例,就像 BaseSensorOperator 中的 poke()
方法一样。在 Airflow 2.3 中,sensor 操作符将能够返回 XCOM 值。这是通过在 poke()
方法的末尾返回 PokeReturnValue
对象的实例来实现的。
from airflow.sensors.base import PokeReturnValue class SensorWithXcomValue(BaseSensorOperator): def poke(self, context: Context) -> Union[bool, PokeReturnValue]: # ... is_done = ... # set to true if the sensor should stop poking. xcom_value = ... # return value of the sensor operator to be pushed to XCOM. return PokeReturnValue(is_done, xcom_value)
要实现一个推送 XCOM 值并同时支持 2.3 和 pre-2.3 版本的 sensor 操作符,如果版本是 pre-2.3,你需要显式推送 XCOM 值。
try: from airflow.sensors.base import PokeReturnValue except ImportError: PokeReturnValue = None class SensorWithXcomValue(BaseSensorOperator): def poke(self, context: Context) -> bool: # ... is_done = ... # set to true if the sensor should stop poking. xcom_value = ... # return value of the sensor operator to be pushed to XCOM. if PokeReturnValue is not None: return PokeReturnValue(is_done, xcom_value) else: if is_done: context["ti"].xcom_push(key="xcom_key", value=xcom_value) return is_done
或者,在传感器不需要推送 XCOM 值的情况下:poke()
和包装的函数都可以返回一个类似布尔值的值,其中 True
表示传感器的操作已完成,False
表示传感器的操作未完成。
import pendulum
from airflow.decorators import dag, task
from airflow.sensors.base import PokeReturnValue
@dag(
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
)
def example_sensor_decorator():
# Using a sensor operator to wait for the upstream data to be ready.
@task.sensor(poke_interval=60, timeout=3600, mode="reschedule")
def wait_for_upstream() -> PokeReturnValue:
return PokeReturnValue(is_done=True, xcom_value="xcom_value")
@task
def dummy_operator() -> None:
pass
wait_for_upstream() >> dummy_operator()
tutorial_etl_dag = example_sensor_decorator()
多输出推断¶
任务还可以通过使用 dict Python 类型来推断多个输出。
@task
def identity_dict(x: int, y: int) -> dict[str, int]:
return {"x": x, "y": y}
通过使用类型 dict
或任何其他符合 typing.Mapping
协议的类,函数的返回类型,multiple_outputs
参数会自动设置为 true。
注意:如果你手动设置 multiple_outputs
参数,则推断将被禁用,并且使用参数值。
在装饰任务和传统任务之间添加依赖关系¶
上面的教程展示了如何在 TaskFlow 函数之间创建依赖关系。但是,也可以在传统任务(例如 BashOperator
或 FileSensor
)和 TaskFlow 函数之间设置依赖关系。
以下代码显示了如何建立此依赖关系
@task()
def extract_from_file():
"""
#### Extract from file task
A simple Extract task to get data ready for the rest of the data
pipeline, by reading the data from a file into a pandas dataframe
"""
order_data_file = "/tmp/order_data.csv"
order_data_df = pd.read_csv(order_data_file)
return order_data_df
file_task = FileSensor(task_id="check_file", filepath="/tmp/order_data.csv")
order_data = extract_from_file()
file_task >> order_data
在上面的代码块中,一个新的 TaskFlow 函数被定义为 extract_from_file
,它从已知的文件位置读取数据。在主 DAG 中,定义了一个新的 FileSensor
任务来检查此文件。请注意,这是一个等待文件的 Sensor 任务。TaskFlow 函数调用被放入变量 order_data
中。最后,使用该变量指定此 Sensor 任务和 TaskFlow 函数之间的依赖关系。
在装饰任务和传统任务之间使用 XComs¶
如上所述,TaskFlow API 允许以一种从 DAG 作者那里抽象出来的方式在任务之间使用或传递 XComs。本节深入探讨了如何不仅在 TaskFlow 函数之间,而且在 TaskFlow 函数和传统任务之间实现此功能的详细示例。
你可能会发现有必要从传统任务中使用 XCom,无论是在任务执行期间推送的还是通过其返回值推送的,作为下游任务的输入。你可以通过使用为所有操作符公开的 .output
属性来访问推送的 XCom(也称为 XComArg
)。
默认情况下,使用 .output
属性检索 XCom 结果等同于
task_instance.xcom_pull(task_ids="my_task_id", key="return_value")
要检索除 return_value
之外的键的 XCom 结果,可以使用
my_op = MyOperator(...)
my_op_output = my_op.output["some_other_xcom_key"]
# OR
my_op_output = my_op.output.get("some_other_xcom_key")
注意
仅当操作符参数列为 template_field
时,才支持将 .output
属性用作另一个任务的输入。
在下面的代码示例中,通过 XComs 捕获 HttpOperator
的结果。然后,将此 XCom 结果(即任务输出)传递给 TaskFlow 函数,该函数将响应解析为 JSON。
get_api_results_task = HttpOperator(
task_id="get_api_results",
endpoint="/api/query",
do_xcom_push=True,
http_conn_id="http",
)
@task
def parse_results(api_results):
return json.loads(api_results)
parsed_results = parse_results(api_results=get_api_results_task.output)
也可以反过来:将 TaskFlow 函数的输出作为传统任务的输入传递。
@task(retries=3)
def create_queue():
"""This is a Python function that creates an SQS queue"""
hook = SqsHook()
result = hook.create_queue(queue_name="sample-queue")
return result["QueueUrl"]
sqs_queue = create_queue()
publish_to_queue = SqsPublishOperator(
task_id="publish_to_queue",
sqs_queue=sqs_queue,
message_content="{{ task_instance }}-{{ execution_date }}",
message_attributes=None,
delay_seconds=0,
)
请注意上面的代码示例,来自 create_queue
TaskFlow 函数的输出(即新创建的 Amazon SQS 队列的 URL)随后作为 sqs_queue
参数传递给 SqsPublishOperator
任务。
最后,你不仅可以将传统操作符的输出用作 TaskFlow 函数的输入,还可以用作其他传统操作符的输入。在下面的示例中,SalesforceToS3Operator
任务的输出(它是目标文件位置的 S3 URI)用作 S3CopyObjectOperator
任务的输入,以将同一文件复制到 S3 中按日期分区的存储位置,以便在数据湖中长期存储。
BASE_PATH = "salesforce/customers"
FILE_NAME = "customer_daily_extract_{{ ds_nodash }}.csv"
upload_salesforce_data_to_s3_landing = SalesforceToS3Operator(
task_id="upload_salesforce_data_to_s3",
salesforce_query="SELECT Id, Name, Company, Phone, Email, LastModifiedDate, IsActive FROM Customers",
s3_bucket_name="landing-bucket",
s3_key=f"{BASE_PATH}/{FILE_NAME}",
salesforce_conn_id="salesforce",
aws_conn_id="s3",
replace=True,
)
store_to_s3_data_lake = S3CopyObjectOperator(
task_id="store_to_s3_data_lake",
aws_conn_id="s3",
source_bucket_key=upload_salesforce_data_to_s3_landing.output,
dest_bucket_name="data_lake",
dest_bucket_key=f"""{BASE_PATH}/{"{{ execution_date.strftime('%Y/%m/%d') }}"}/{FILE_NAME}""",
)
访问装饰任务中的上下文变量¶
在运行你的可调用对象时,Airflow 将传递一组可在函数中使用的关键字参数。这组 kwargs 与你可以在 Jinja 模板中使用的内容完全对应。要使此功能正常工作,你可以将要在函数中接收的上下文键添加为关键字参数。
例如,以下代码块中的可调用对象将获取 ti
和 next_ds
上下文变量的值
@task
def my_python_callable(*, ti, next_ds):
pass
在版本 2.8 中更改:以前,上下文键参数必须提供默认值,例如 ti=None
。现在不需要了。
你还可以选择使用 **kwargs
接收整个上下文。请注意,这可能会导致轻微的性能损失,因为 Airflow 将需要扩展整个上下文,其中可能包含许多你实际上不需要的内容。因此,更建议你使用显式参数,如上一段所示。
@task
def my_python_callable(**kwargs):
ti = kwargs["ti"]
next_ds = kwargs["next_ds"]
此外,有时你可能希望在堆栈的深处访问上下文,但不希望从任务可调用对象传递上下文变量。你仍然可以通过 get_current_context
方法访问执行上下文。
from airflow.operators.python import get_current_context
def some_function_in_your_library():
context = get_current_context()
ti = context["ti"]
当前上下文仅在任务执行期间可访问。该上下文在 pre_execute
或 post_execute
期间不可访问。在执行上下文之外调用此方法将引发错误。
在装饰任务中使用模板¶
传递给你的装饰函数的参数会自动进行模板化。
你还可以使用 templates_exts
参数对整个文件进行模板化。
@task(templates_exts=[".sql"])
def template_test(sql):
print(f"sql: {sql}")
template_test(sql="sql/test.sql")
这将读取 sql/test.sql
的内容并替换所有模板变量。你还可以传递文件列表,所有这些文件都将被模板化。
你可以通过 params 参数向模板引擎传递其他参数。
但是,params
参数必须传递给装饰器,而不是直接传递给你的函数,例如 @task(templates_exts=['.sql'], params={'my_param'})
,然后可以在你的模板化文件和函数参数中使用 {{ params.my_param }}
。
或者,你也可以使用 .override()
方法传递它
@task()
def template_test(input_var):
print(f"input_var: {input_var}")
template_test.override(params={"my_param": "wow"})(
input_var="my param is: {{ params.my_param }}",
)
最后,你还可以手动呈现模板
@task(params={"my_param": "wow"})
def template_test():
template_str = "run_id: {{ run_id }}; params.my_param: {{ params.my_param }}"
context = get_current_context()
rendered_template = context["task"].render_template(
template_str,
context,
)
这是一个完整的示例,演示了以上所有内容
import pendulum
from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
@dag(
schedule="@daily",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
params={"foobar": "param_from_dag", "other_param": "from_dag"},
)
def tutorial_taskflow_templates():
"""
### TaskFlow API Tutorial Documentation
This is a simple data pipeline example which demonstrates the use of
the templates in the TaskFlow API.
Documentation that goes along with the Airflow TaskFlow API tutorial is
located
[here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html)
"""
@task(
# Causes variables that end with `.sql` to be read and templates
# within to be rendered.
templates_exts=[".sql"],
)
def template_test(sql, test_var, data_interval_end):
context = get_current_context()
# Will print...
# select * from test_data
# where 1=1
# and run_id = 'scheduled__2024-10-09T00:00:00+00:00'
# and something_else = 'param_from_task'
print(f"sql: {sql}")
# Will print `scheduled__2024-10-09T00:00:00+00:00`
print(f"test_var: {test_var}")
# Will print `2024-10-10 00:00:00+00:00`.
# Note how we didn't pass this value when calling the task. Instead
# it was passed by the decorator from the context
print(f"data_interval_end: {data_interval_end}")
# Will print...
# run_id: scheduled__2024-10-09T00:00:00+00:00; params.other_param: from_dag
template_str = "run_id: {{ run_id }}; params.other_param: {{ params.other_param }}"
rendered_template = context["task"].render_template(
template_str,
context,
)
print(f"rendered template: {rendered_template}")
# Will print the full context dict
print(f"context: {context}")
template_test.override(
# Will be merged with the dict defined in the dag
# and override existing parameters.
#
# Must be passed into the decorator's parameters
# through `.override()` not into the actual task
# function
params={"foobar": "param_from_task"},
)(
sql="sql/test.sql",
test_var="{{ run_id }}",
)
tutorial_taskflow_templates()
有条件地跳过任务¶
run_if()
和 skip_if()
是 TaskFlow 的语法糖,允许你根据条件跳过一个 Task
。你可以使用它们来简单地设置执行条件,而无需更改 DAG
或 Task
的结构。
它还允许你使用 Context
设置条件,这本质上与使用 pre_execute
相同。
以下是 run_if()
的一个使用示例:
@task.run_if(lambda context: context["task_instance"].task_id == "run")
@task.bash()
def echo() -> str:
return "echo 'run'"
上述代码中定义的 echo
只有在 task_id
为 run
时才会执行。
如果想在跳过任务时留下日志,你有两个选择。
@task.run_if(lambda context: context["task_instance"].task_id == "run", skip_message="only task_id is 'run'")
@task.bash()
def echo() -> str:
return "echo 'run'"
@task.run_if(
lambda context: (context["task_instance"].task_id == "run", f"{context['ts']}: only task_id is 'run'")
)
@task.bash()
def echo() -> str:
return "echo 'run'"
还有一个 skip_if()
,其工作方式与 run_if()
相反,使用方式相同。
@task.skip_if(lambda context: context["task_instance"].task_id == "skip")
@task.bash()
def echo() -> str:
return "echo 'run'"
下一步是什么?¶
你已经了解了在 Airflow 2.0 中使用 TaskFlow API 范式编写 DAG 有多么简单。接下来你可以尝试以下几个步骤:
另请参阅
继续学习本教程的下一步:构建运行中的 Pipeline
阅读概念部分,详细了解 Airflow 的概念,例如 DAG、Task、Operator 等。
查看有关 TaskFlow API 和
@task
装饰器的部分。