TaskFlow¶
2.0 版本新增。
如果您使用普通的 Python 代码而不是操作符来编写大多数 DAG,那么 TaskFlow API 将使您更容易编写干净的 DAG,而无需额外的样板代码,所有这些都使用 @task
装饰器。
TaskFlow 负责使用 XComs 在任务之间移动输入和输出,以及自动计算依赖项 - 当您在 DAG 文件中调用 TaskFlow 函数时,您不会执行它,而是会得到一个表示结果的 XCom 的对象(一个 XComArg
),然后您可以将其用作下游任务或操作符的输入。例如
from airflow.decorators import task
from airflow.operators.email import EmailOperator
@task
def get_ip():
return my_ip_service.get_main_ip()
@task(multiple_outputs=True)
def compose_email(external_ip):
return {
'subject':f'Server connected from {external_ip}',
'body': f'Your server executing Airflow is connected from the external IP {external_ip}<br>'
}
email_info = compose_email(get_ip())
EmailOperator(
task_id='send_email_notification',
to='[email protected]',
subject=email_info['subject'],
html_content=email_info['body']
)
这里有三个任务 - get_ip
、compose_email
和 send_email_notification
。
前两个是使用 TaskFlow 声明的,并且自动将 get_ip
的返回值传递到 compose_email
中,不仅链接了 XCom,还自动声明了 compose_email
是 get_ip
的下游。
send_email_notification
是一个更传统的操作符,但即使它也可以使用 compose_email
的返回值来设置其参数,并且再次自动计算出它必须是 compose_email
的下游。
您也可以使用纯值或变量来调用 TaskFlow 函数 - 例如,这将按您的预期工作(当然,在 DAG 执行之前不会运行任务内的代码 - name
值将作为任务参数持久化到那时)
@task
def hello_name(name: str):
print(f'Hello {name}!')
hello_name('Airflow users')
如果您想了解更多关于使用 TaskFlow 的信息,您应该参考 TaskFlow 教程。
上下文¶
您可以通过添加关键字参数来访问 Airflow 上下文变量,如下例所示
from airflow.models.taskinstance import TaskInstance from airflow.models.dagrun import DagRun @task def print_ti_info(task_instance: TaskInstance | None = None, dag_run: DagRun | None = None): print(f"Run ID: {task_instance.run_id}") # Run ID: scheduled__2023-08-09T00:00:00+00:00 print(f"Duration: {task_instance.duration}") # Duration: 0.972019 print(f"DAG Run queued at: {dag_run.queued_at}") # 2023-08-10 00:00:01+02:20
或者,您可以将 **kwargs
添加到任务的签名中,并且所有 Airflow 上下文变量都可以在 kwargs
字典中访问
from airflow.models.taskinstance import TaskInstance from airflow.models.dagrun import DagRun @task def print_ti_info(**kwargs): ti: TaskInstance = kwargs["task_instance"] print(f"Run ID: {ti.run_id}") # Run ID: scheduled__2023-08-09T00:00:00+00:00 print(f"Duration: {ti.duration}") # Duration: 0.972019 dr: DagRun = kwargs["dag_run"] print(f"DAG Run queued at: {dr.queued_at}") # 2023-08-10 00:00:01+02:20
有关上下文变量的完整列表,请参阅 上下文变量。
日志记录¶
要从您的任务函数中使用日志记录,只需导入并使用 Python 的日志系统
logger = logging.getLogger("airflow.task")
以这种方式创建的每一行日志都将记录在任务日志中。
传递任意对象作为参数¶
2.5.0 版本新增。
如前所述,TaskFlow 使用 XCom 将变量传递给每个任务。 这要求用作参数的变量需要能够序列化。 Airflow 开箱即用支持所有内置类型(如 int 或 str),并且它支持使用 @dataclass
或 @attr.define
修饰的对象。 以下示例显示了 Dataset
的使用,它使用 @attr.define
修饰,并与 TaskFlow 一起使用。
注意
使用 Dataset
的另一个好处是,如果它用作输入参数,它会自动注册为 inlet
。 如果您的任务的返回值是 dataset
或 list[Dataset]]
,它也会自动注册为 outlet
。
import json
import pendulum
import requests
from airflow import Dataset
from airflow.decorators import dag, task
SRC = Dataset(
"https://www.ncei.noaa.gov/access/monitoring/climate-at-a-glance/global/time-series/globe/land_ocean/ytd/12/1880-2022.json"
)
now = pendulum.now()
@dag(start_date=now, schedule="@daily", catchup=False)
def etl():
@task()
def retrieve(src: Dataset) -> dict:
resp = requests.get(url=src.uri)
data = resp.json()
return data["data"]
@task()
def to_fahrenheit(temps: dict[int, float]) -> dict[int, float]:
ret: dict[int, float] = {}
for year, celsius in temps.items():
ret[year] = float(celsius) * 1.8 + 32
return ret
@task()
def load(fahrenheit: dict[int, float]) -> Dataset:
filename = "/tmp/fahrenheit.json"
s = json.dumps(fahrenheit)
f = open(filename, "w")
f.write(s)
f.close()
return Dataset(f"file:///{filename}")
data = retrieve(SRC)
fahrenheit = to_fahrenheit(data)
load(fahrenheit)
etl()
自定义对象¶
您可能希望传递自定义对象。 通常,您可以使用 @dataclass
或 @attr.define
修饰您的类,Airflow 会找出它需要做什么。 有时您可能想自己控制序列化。 为此,请将 serialize()
方法添加到您的类,并将静态方法 deserialize(data: dict, version: int)
添加到您的类。 像这样
from typing import ClassVar
class MyCustom:
__version__: ClassVar[int] = 1
def __init__(self, x):
self.x = x
def serialize(self) -> dict:
return dict({"x": self.x})
@staticmethod
def deserialize(data: dict, version: int):
if version > 1:
raise TypeError(f"version > {MyCustom.version}")
return MyCustom(data["x"])
对象版本控制¶
对将用于序列化的对象进行版本控制是一种很好的做法。 为此,请将 __version__: ClassVar[int] = <x>
添加到您的类中。 Airflow 假设您的类是向后兼容的,因此版本 2 能够反序列化版本 1。如果您需要自定义的反序列化逻辑,请确保指定 deserialize(data: dict, version: int)
。
注意
需要 __version__
的类型,并且需要为 ClassVar[int]
历史¶
TaskFlow API 是 Airflow 2.0 的新增功能,您可能会遇到为以前版本的 Airflow 编写的 DAG,它们使用 PythonOperator
来实现类似的目标,尽管代码量要多得多。
有关 TaskFlow API 的添加和设计的更多背景信息,可以在其 Airflow 改进提案 AIP-31:用于更清晰/更简单的 DAG 定义的“TaskFlow API” 中找到