2025 年 Airflow 峰会将于 10 月 07-09 日举行。立即注册,获取早鸟票!

TaskFlow

在 2.0 版本中添加。

如果你主要使用纯 Python 代码而不是 Operator 来编写大部分 DAG,那么 TaskFlow API 将使你更容易编写简洁的 DAG,无需额外的模板代码,只需使用 @task 装饰器。

TaskFlow 负责使用 XCom 在你的任务之间传递输入和输出,并自动计算依赖关系 - 当你在 DAG 文件中调用 TaskFlow 函数时,它不会立即执行,而是会返回一个代表结果 XCom 的对象(一个 XComArg),然后你可以将此对象用作下游任务或 Operator 的输入。例如

from airflow.sdk import task
from airflow.providers.smtp.operators.smtp import EmailOperator

@task
def get_ip():
    return my_ip_service.get_main_ip()

@task(multiple_outputs=True)
def compose_email(external_ip):
    return {
        'subject':f'Server connected from {external_ip}',
        'body': f'Your server executing Airflow is connected from the external IP {external_ip}<br>'
    }

email_info = compose_email(get_ip())

EmailOperator(
    task_id='send_email_notification',
    to='example@example.com',
    subject=email_info['subject'],
    html_content=email_info['body']
)

这里有三个任务 - get_ipcompose_emailsend_email_notification

前两个使用 TaskFlow 声明,并自动将 get_ip 的返回值传递给 compose_email,这不仅链接了 XCom,还自动声明 compose_emailget_ip下游任务。

send_email_notification 是一个更传统的 Operator,但即使它也可以使用 compose_email 的返回值来设置其参数,并且同样可以自动判断出它是 compose_email下游任务。

你也可以使用一个普通值或变量来调用 TaskFlow 函数 - 例如,这将如你预期地工作(当然,在 DAG 执行之前,任务内部的代码不会运行 - name 值会一直作为任务参数持久化直到那时)。

@task
def hello_name(name: str):
    print(f'Hello {name}!')

hello_name('Airflow users')

如果你想了解更多关于使用 TaskFlow 的信息,请查阅 TaskFlow 教程

上下文

你可以通过将 Airflow 上下文变量 作为关键字参数添加来访问它们,如下例所示

from airflow.models.taskinstance import TaskInstance
from airflow.models.dagrun import DagRun


@task
def print_ti_info(task_instance: TaskInstance, dag_run: DagRun):
    print(f"Run ID: {task_instance.run_id}")  # Run ID: scheduled__2023-08-09T00:00:00+00:00
    print(f"Duration: {task_instance.duration}")  # Duration: 0.972019
    print(f"DAG Run queued at: {dag_run.queued_at}")  # 2023-08-10 00:00:01+02:20

或者,你可以在任务的签名中添加 **kwargs,所有 Airflow 上下文变量都将在 kwargs 字典中可访问。

from airflow.models.taskinstance import TaskInstance
from airflow.models.dagrun import DagRun


@task
def print_ti_info(**kwargs):
    ti: TaskInstance = kwargs["task_instance"]
    print(f"Run ID: {ti.run_id}")  # Run ID: scheduled__2023-08-09T00:00:00+00:00
    print(f"Duration: {ti.duration}")  # Duration: 0.972019

    dr: DagRun = kwargs["dag_run"]
    print(f"DAG Run queued at: {dr.queued_at}")  # 2023-08-10 00:00:01+02:20

有关上下文变量的完整列表,请参阅 上下文变量

日志记录

要在任务函数中使用日志记录,只需导入并使用 Python 的日志记录系统即可。

logger = logging.getLogger("airflow.task")

通过这种方式创建的每一行日志都将记录在任务日志中。

将任意对象作为参数传递

在 2.5.0 版本中添加。

如前所述,TaskFlow 使用 XCom 将变量传递给每个任务。这要求用作参数的变量必须能够被序列化。Airflow 开箱即用地支持所有内置类型(如 int 或 str),并且支持用 @dataclass@attr.define 装饰的对象。以下示例展示了如何将用 @attr.define 装饰的 Asset 与 TaskFlow 一起使用。

注意

使用 Asset 的一个额外好处是,如果它用作输入参数,它会自动注册为 inlet。如果任务的返回值是一个 Asset 或一个 list[Asset]],它也会自动注册为 outlet

import json
import pendulum
import requests

from airflow import Asset
from airflow.sdk import dag, task

SRC = Asset(
    "https://www.ncei.noaa.gov/access/monitoring/climate-at-a-glance/global/time-series/globe/land_ocean/ytd/12/1880-2022.json"
)
now = pendulum.now()


@dag(start_date=now, schedule="@daily", catchup=False)
def etl():
    @task()
    def retrieve(src: Asset) -> dict:
        resp = requests.get(url=src.uri)
        data = resp.json()
        return data["data"]

    @task()
    def to_fahrenheit(temps: dict[int, dict[str, float]]) -> dict[int, float]:
        ret: dict[int, float] = {}
        for year, info in temps.items():
            ret[year] = float(info["anomaly"]) * 1.8 + 32

        return ret

    @task()
    def load(fahrenheit: dict[int, float]) -> Asset:
        filename = "/tmp/fahrenheit.json"
        s = json.dumps(fahrenheit)
        f = open(filename, "w")
        f.write(s)
        f.close()

        return Asset(f"file:///{filename}")

    data = retrieve(SRC)
    fahrenheit = to_fahrenheit(data)
    load(fahrenheit)


etl()

自定义对象

你可能希望传递自定义对象。通常情况下,你会用 @dataclass@attr.define 装饰你的类,Airflow 会自动处理。有时你可能希望自己控制序列化。为此,请在你的类中添加 serialize() 方法和静态方法 deserialize(data: dict, version: int)。例如

from typing import ClassVar


class MyCustom:
    __version__: ClassVar[int] = 1

    def __init__(self, x):
        self.x = x

    def serialize(self) -> dict:
        return dict({"x": self.x})

    @staticmethod
    def deserialize(data: dict, version: int):
        if version > 1:
            raise TypeError(f"version > {MyCustom.version}")
        return MyCustom(data["x"])

对象版本控制

对用于序列化的对象进行版本控制是一种良好的实践。为此,请在你的类中添加 __version__: ClassVar[int] = <x>。Airflow 假定你的类向后兼容,以便版本 2 能够反序列化版本 1。如果你需要自定义反序列化逻辑,请确保指定了 deserialize(data: dict, version: int)

注意

__version__ 需要指定类型且必须是 ClassVar[int]

Sensor 与 TaskFlow API

在 2.5.0 版本中添加。

有关使用 TaskFlow API 编写 Sensor 的示例,请参阅 将 TaskFlow API 与 Sensor Operator 结合使用

历史

TaskFlow API 是 Airflow 2.0 版本新增的,你可能会遇到为旧版本 Airflow 编写的 DAG,它们使用 PythonOperator 来实现类似目标,尽管代码量大得多。

有关 TaskFlow API 添加和设计的更多背景信息可以在其 Airflow 改进提案中找到 AIP-31:“TaskFlow API”:更清晰/更简单的 DAG 定义

此条目有帮助吗?