使用 TaskFlow

本教程建立在常规 Airflow 教程的基础上,并特别侧重于使用 TaskFlow API 范式编写数据管道,该范式作为 Airflow 2.0 的一部分引入,并与使用传统范式编写的 DAG 形成对比。

此处选择的数据管道是一个简单的模式,具有三个独立的提取、转换和加载任务。

示例“TaskFlow API”管道

这是一个使用 TaskFlow API 范式的非常简单的管道。下面将给出更详细的解释。

airflow/example_dags/tutorial_taskflow_api.py[源代码]


import json

import pendulum

from airflow.decorators import dag, task
@dag(
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
)
def tutorial_taskflow_api():
    """
    ### TaskFlow API Tutorial Documentation
    This is a simple data pipeline example which demonstrates the use of
    the TaskFlow API using three simple tasks for Extract, Transform, and Load.
    Documentation that goes along with the Airflow TaskFlow API tutorial is
    located
    [here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html)
    """
    @task()
    def extract():
        """
        #### Extract task
        A simple Extract task to get data ready for the rest of the data
        pipeline. In this case, getting data is simulated by reading from a
        hardcoded JSON string.
        """
        data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'

        order_data_dict = json.loads(data_string)
        return order_data_dict
    @task(multiple_outputs=True)
    def transform(order_data_dict: dict):
        """
        #### Transform task
        A simple Transform task which takes in the collection of order data and
        computes the total order value.
        """
        total_order_value = 0

        for value in order_data_dict.values():
            total_order_value += value

        return {"total_order_value": total_order_value}
    @task()
    def load(total_order_value: float):
        """
        #### Load task
        A simple Load task which takes in the result of the Transform task and
        instead of saving it to end user review, just prints it out.
        """

        print(f"Total order value is: {total_order_value:.2f}")
    order_data = extract()
    order_summary = transform(order_data)
    load(order_summary["total_order_value"])
tutorial_taskflow_api()

它是一个 DAG 定义文件

如果这是您正在查看的第一个 DAG 文件,请注意,此 Python 脚本由 Airflow 解释,并且是数据管道的配置文件。有关 DAG 文件的完整介绍,请参阅核心基础教程,其中广泛涵盖了 DAG 结构和定义。

实例化 DAG

我们正在创建一个 DAG,它是我们任务的集合,任务之间存在依赖关系。这是一个非常简单的定义,因为我们只希望在使用 Airflow 设置此 DAG 时运行它,而无需任何重试或复杂的调度。在本例中,请注意,我们使用如下所示的 @dag 装饰器创建此 DAG,其中 Python 函数名称充当 DAG 标识符。

airflow/example_dags/tutorial_taskflow_api.py[源代码]

@dag(
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
)
def tutorial_taskflow_api():
    """
    ### TaskFlow API Tutorial Documentation
    This is a simple data pipeline example which demonstrates the use of
    the TaskFlow API using three simple tasks for Extract, Transform, and Load.
    Documentation that goes along with the Airflow TaskFlow API tutorial is
    located
    [here](https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html)
    """

现在,要真正使其能够作为 DAG 运行,我们调用使用前面显示的 @dag 装饰器设置的 Python 函数 tutorial_taskflow_api,如下所示。

airflow/example_dags/tutorial_taskflow_api.py[源代码]

tutorial_taskflow_api()

在版本 2.4 中已更改: 如果该 DAG 在 with 块内使用,或者它是 @dag 装饰函数的结果,则不再需要将 DAG“注册”到全局变量中,Airflow 才能检测到该 DAG。

任务

在此数据管道中,任务是使用如下所示的 @task 装饰器根据 Python 函数创建的。函数名称充当任务的唯一标识符。

airflow/example_dags/tutorial_taskflow_api.py[源代码]

@task()
def extract():
    """
    #### Extract task
    A simple Extract task to get data ready for the rest of the data
    pipeline. In this case, getting data is simulated by reading from a
    hardcoded JSON string.
    """
    data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'

    order_data_dict = json.loads(data_string)
    return order_data_dict

返回值(在本例中为字典)将在以后的任务中使用。

转换和加载任务的创建方式与上面显示的提取任务相同。

DAG 的主要流程

现在我们已经根据 Python 函数定义了提取、转换和加载任务,我们可以转到 DAG 的主要部分。

airflow/example_dags/tutorial_taskflow_api.py[源代码]

order_data = extract()
order_summary = transform(order_data)
load(order_summary["total_order_value"])

就是这样,我们完成了!我们调用了提取任务,从那里获取了订单数据并将其发送到转换任务进行汇总,然后使用汇总数据调用了加载任务。任务之间的依赖关系以及在网络上不同节点的不同工作器上运行的这些任务之间的数据传递都由 Airflow 处理。

现在,要真正使其能够作为 DAG 运行,我们调用使用前面显示的 @dag 装饰器设置的 Python 函数 tutorial_taskflow_api,如下所示。

airflow/example_dags/tutorial_taskflow_api.py[源代码]

tutorial_taskflow_api()

但是如何?

对于经验丰富的 Airflow DAG 作者来说,这非常简单!让我们将其与下面显示的 Airflow 2.0 之前必须如何编写此 DAG 进行对比

airflow/example_dags/tutorial_dag.py[源代码]


import json
import textwrap

import pendulum

# The DAG object; we'll need this to instantiate a DAG
from airflow.models.dag import DAG

# Operators; we need this to operate!
from airflow.operators.python import PythonOperator
with DAG(
    "tutorial_dag",
    # These args will get passed on to each operator
    # You can override them on a per-task basis during operator initialization
    default_args={"retries": 2},
    description="DAG tutorial",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
) as dag:
    dag.doc_md = __doc__
    def extract(**kwargs):
        ti = kwargs["ti"]
        data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
        ti.xcom_push("order_data", data_string)
    def transform(**kwargs):
        ti = kwargs["ti"]
        extract_data_string = ti.xcom_pull(task_ids="extract", key="order_data")
        order_data = json.loads(extract_data_string)

        total_order_value = 0
        for value in order_data.values():
            total_order_value += value

        total_value = {"total_order_value": total_order_value}
        total_value_json_string = json.dumps(total_value)
        ti.xcom_push("total_order_value", total_value_json_string)
    def load(**kwargs):
        ti = kwargs["ti"]
        total_value_string = ti.xcom_pull(task_ids="transform", key="total_order_value")
        total_order_value = json.loads(total_value_string)

        print(total_order_value)
    extract_task = PythonOperator(
        task_id="extract",
        python_callable=extract,
    )
    extract_task.doc_md = textwrap.dedent(
        """\
    #### Extract task
    A simple Extract task to get data ready for the rest of the data pipeline.
    In this case, getting data is simulated by reading from a hardcoded JSON string.
    This data is then put into xcom, so that it can be processed by the next task.
    """
    )

    transform_task = PythonOperator(
        task_id="transform",
        python_callable=transform,
    )
    transform_task.doc_md = textwrap.dedent(
        """\
    #### Transform task
    A simple Transform task which takes in the collection of order data from xcom
    and computes the total order value.
    This computed value is then put into xcom, so that it can be processed by the next task.
    """
    )

    load_task = PythonOperator(
        task_id="load",
        python_callable=load,
    )
    load_task.doc_md = textwrap.dedent(
        """\
    #### Load task
    A simple Load task which takes in the result of the Transform task, by reading it
    from xcom and instead of saving it to end user review, just prints it out.
    """
    )

    extract_task >> transform_task >> load_task

上面显示的所有处理也在新的 Airflow 2.0 DAG 中完成,但所有这些处理都从 DAG 开发人员那里抽象出来了。

让我们通过单独查看转换任务来详细检查这一点,因为它位于数据管道的中间。在 Airflow 1.x 中,此任务的定义如下所示

airflow/example_dags/tutorial_dag.py[源代码]

def transform(**kwargs):
    ti = kwargs["ti"]
    extract_data_string = ti.xcom_pull(task_ids="extract", key="order_data")
    order_data = json.loads(extract_data_string)

    total_order_value = 0
    for value in order_data.values():
        total_order_value += value

    total_value = {"total_order_value": total_order_value}
    total_value_json_string = json.dumps(total_value)
    ti.xcom_push("total_order_value", total_value_json_string)

正如我们在这里看到的,在转换函数中处理的数据是使用 XCom 变量传递给它的。反过来,来自转换函数的汇总数据也被放入另一个 XCom 变量中,然后加载任务将使用该变量。

与 Airflow 2.0 中的 TaskFlow API 对比,如下所示。

airflow/example_dags/tutorial_taskflow_api.py[源代码]

@task(multiple_outputs=True)
def transform(order_data_dict: dict):
    """
    #### Transform task
    A simple Transform task which takes in the collection of order data and
    computes the total order value.
    """
    total_order_value = 0

    for value in order_data_dict.values():
        total_order_value += value

    return {"total_order_value": total_order_value}

在 Airflow 2.0 中,用于在这些任务之间传递数据的所有 XCom 使用都从 DAG 作者那里抽象出来了。但是,XCom 变量在幕后使用,并且可以在需要时使用 Airflow UI 查看,以进行调试或 DAG 监控。

类似地,任务依赖关系是根据任务的函数调用在 TaskFlow 中自动生成的。在 Airflow 1.x 中,必须显式创建任务并指定依赖关系,如下所示。

airflow/example_dags/tutorial_dag.py[源代码]

extract_task = PythonOperator(
    task_id="extract",
    python_callable=extract,
)
extract_task.doc_md = textwrap.dedent(
    """\
#### Extract task
A simple Extract task to get data ready for the rest of the data pipeline.
In this case, getting data is simulated by reading from a hardcoded JSON string.
This data is then put into xcom, so that it can be processed by the next task.
"""
)

transform_task = PythonOperator(
    task_id="transform",
    python_callable=transform,
)
transform_task.doc_md = textwrap.dedent(
    """\
#### Transform task
A simple Transform task which takes in the collection of order data from xcom
and computes the total order value.
This computed value is then put into xcom, so that it can be processed by the next task.
"""
)

load_task = PythonOperator(
    task_id="load",
    python_callable=load,
)
load_task.doc_md = textwrap.dedent(
    """\
#### Load task
A simple Load task which takes in the result of the Transform task, by reading it
from xcom and instead of saving it to end user review, just prints it out.
"""
)

extract_task >> transform_task >> load_task

相比之下,使用 Airflow 2.0 中的 TaskFlow API,调用本身会自动生成依赖关系,如下所示。

airflow/example_dags/tutorial_taskflow_api.py[源代码]

order_data = extract()
order_summary = transform(order_data)
load(order_summary["total_order_value"])

重用装饰过的任务

装饰过的任务非常灵活。您可以在多个 DAG 中重用装饰过的任务,覆盖任务参数,例如 task_idqueuepool 等。

以下是如何在多个 DAG 中重用装饰过的任务的示例

from airflow.decorators import task, dag
from datetime import datetime


@task
def add_task(x, y):
    print(f"Task args: x={x}, y={y}")
    return x + y


@dag(start_date=datetime(2022, 1, 1))
def mydag():
    start = add_task.override(task_id="start")(1, 2)
    for i in range(3):
        start >> add_task.override(task_id=f"add_start_{i}")(start, i)


@dag(start_date=datetime(2022, 1, 1))
def mydag2():
    start = add_task(1, 2)
    for i in range(3):
        start >> add_task.override(task_id=f"new_add_task_{i}")(start, i)


first_dag = mydag()
second_dag = mydag2()

您还可以导入上面的 add_task 并在另一个 DAG 文件中使用它。假设 add_task 代码位于名为 common.py 的文件中。您可以这样做

from common import add_task
from airflow.decorators import dag
from datetime import datetime


@dag(start_date=datetime(2022, 1, 1))
def use_add_task():
    start = add_task.override(priority_weight=3)(1, 2)
    for i in range(3):
        start >> add_task.override(task_id=f"new_add_task_{i}", retries=4)(start, i)


created_dag = use_add_task()

将 TaskFlow API 与复杂/冲突的 Python 依赖项一起使用

如果您的任务需要复杂或冲突的要求,那么您将能够将 TaskFlow API 与 Python 虚拟环境(自 2.0.2 起)、Docker 容器(自 2.2.0 起)、ExternalPythonOperator(自 2.4.0 起)或 KubernetesPodOperator(自 2.4.0 起)一起使用。

此功能为 TaskFlow API 提供了更全面的用例范围,因为您不仅限于 Airflow 工作器的软件包和系统库。对于下面描述的所有装饰函数的情况,您必须确保函数是可序列化的,并且它们仅对您使用的其他依赖项使用本地导入。这些导入的其他库必须在目标环境中可用 - 它们不需要在主 Airflow 环境中可用。

您应该使用哪些运算符取决于几个因素

  • 您是否在可以访问 Docker 引擎或 Kubernetes 的情况下运行 Airflow

  • 您是否可以负担动态创建具有新依赖项的虚拟环境的开销

  • 您是否可以为所有 Airflow 组件部署预先存在的、不可变的 Python 环境。

这些选项应该为希望保持其工作流程更简单、更 Pythonic 的用户提供更大的灵活性 - 并允许您将 DAG 的完整逻辑保留在 DAG 本身中。

您还可以在处理冲突/复杂 Python 依赖项的最佳实践中获得有关管理冲突依赖项的方法的更多上下文,包括对每个选项的边界和后果的更详细说明

为每个任务动态创建的虚拟环境

最简单的方法是在同一台机器上动态地(每次运行任务时)创建一个单独的虚拟环境,您可以使用 @task.virtualenv 装饰器。该装饰器允许您使用自定义库甚至不同的 Python 版本动态创建新的虚拟环境来运行您的函数。

示例(动态创建的虚拟环境)

airflow/example_dags/example_python_operator.py[源代码]

    def callable_virtualenv():
        """
        Example function that will be performed in a virtual environment.

        Importing at the module level ensures that it will not attempt to import the
        library before it is installed.
        """
        from time import sleep

        from colorama import Back, Fore, Style

        print(Fore.RED + "some red text")
        print(Back.GREEN + "and with a green background")
        print(Style.DIM + "and in dim text")
        print(Style.RESET_ALL)
        for _ in range(4):
            print(Style.DIM + "Please wait...", flush=True)
            sleep(1)
        print("Finished")

    virtualenv_task = PythonVirtualenvOperator(
        task_id="virtualenv_python",
        python_callable=callable_virtualenv,
        requirements=["colorama==0.4.0"],
        system_site_packages=False,
    )

使用预装依赖项的 Python 环境

稍微复杂一点的 @task.external_python 装饰器允许您在预定义的、不可变的虚拟环境(或在系统级别安装的没有虚拟环境的 Python 二进制文件)中运行 Airflow 任务。此虚拟环境或系统 python 也可以安装不同的自定义库集,并且必须在可以执行任务的所有工作器中在相同位置可用。

使用 @task.external_python 的示例(使用不可变的、预先存在的虚拟环境)

airflow/example_dags/example_python_operator.py[源代码]

    def callable_external_python():
        """
        Example function that will be performed in a virtual environment.

        Importing at the module level ensures that it will not attempt to import the
        library before it is installed.
        """
        import sys
        from time import sleep

        print(f"Running task via {sys.executable}")
        print("Sleeping")
        for _ in range(4):
            print("Please wait...", flush=True)
            sleep(1)
        print("Finished")

    external_python_task = ExternalPythonOperator(
        task_id="external_python",
        python_callable=callable_external_python,
        python=PATH_TO_PYTHON_BINARY,
    )

使用 Docker Operator 进行依赖项分离

如果您的 Airflow 工作器可以访问 docker 引擎,则可以使用 DockerOperator 并添加任何必要的参数以正确运行任务。请注意,docker 镜像必须安装了可用的 Python,并将 bash 命令作为 command 参数。

值得注意的是,Python 源代码(从装饰函数中提取)和任何可调用参数都通过(编码和序列化)环境变量发送到容器,因此它们的长度不是无限的(确切限制取决于系统设置)。

下面是使用 @task.docker 装饰器运行 Python 任务的示例。

tests/system/providers/docker/example_taskflow_api_docker_virtualenv.py[源代码]

@task.docker(image="python:3.9-slim-bookworm", multiple_outputs=True)
def transform(order_data_dict: dict):
    """
    #### Transform task
    A simple Transform task which takes in the collection of order data and
    computes the total order value.
    """
    total_order_value = 0

    for value in order_data_dict.values():
        total_order_value += value

    return {"total_order_value": total_order_value}

使用操作符的注意事项

注意

在早期 Airflow 版本中使用 @task.docker 装饰器

由于 @task.docker 装饰器在 docker provider 中可用,您可能会尝试在 2.2 之前的 Airflow 版本中使用它,但这行不通。如果您尝试这样做,将会收到此错误

AttributeError: '_TaskDecorator' object has no attribute 'docker'

您应该升级到 Airflow 2.2 或更高版本才能使用它。

使用 Kubernetes Pod Operator 进行依赖项分离

如果您的 Airflow worker 可以访问 Kubernetes,则可以使用 KubernetesPodOperator 并添加任何必要的参数以正确运行任务。

下面是使用 @task.kubernetes 装饰器运行 Python 任务的示例。

tests/system/providers/cncf/kubernetes/example_kubernetes_decorator.py[源代码]

@task.kubernetes(
    image="python:3.8-slim-buster",
    name="k8s_test",
    namespace="default",
    in_cluster=False,
    config_file="/path/to/.kube/config",
)
def execute_in_k8s_pod():
    import time

    print("Hello from k8s pod")
    time.sleep(2)

@task.kubernetes(image="python:3.8-slim-buster", namespace="default", in_cluster=False)
def print_pattern():
    n = 5
    for i in range(n):
        # inner loop to handle number of columns
        # values changing acc. to outer loop
        for j in range(i + 1):
            # printing stars
            print("* ", end="")

        # ending line after each row
        print("\r")

execute_in_k8s_pod_instance = execute_in_k8s_pod()
print_pattern_instance = print_pattern()

execute_in_k8s_pod_instance >> print_pattern_instance

使用操作符的注意事项

注意

在早期 Airflow 版本中使用 @task.kubernetes 装饰器

由于 @task.kubernetes 装饰器在 docker provider 中可用,您可能会尝试在 2.4 之前的 Airflow 版本中使用它,但这行不通。如果您尝试这样做,将会收到此错误

AttributeError: '_TaskDecorator' object has no attribute 'kubernetes'

您应该升级到 Airflow 2.4 或更高版本才能使用它。

将 TaskFlow API 与 Sensor 操作符一起使用

您可以应用 @task.sensor 装饰器将常规 Python 函数转换为 BaseSensorOperator 类的实例。Python 函数实现了 poke 逻辑,并返回 PokeReturnValue 类的实例,就像 BaseSensorOperator 中的 poke() 方法一样。在 Airflow 2.3 中,sensor 操作符将能够返回 XCOM 值。这是通过在 poke() 方法结束时返回 PokeReturnValue 对象的实例来实现的

from airflow.sensors.base import PokeReturnValue


class SensorWithXcomValue(BaseSensorOperator):
    def poke(self, context: Context) -> Union[bool, PokeReturnValue]:
        # ...
        is_done = ...  # set to true if the sensor should stop poking.
        xcom_value = ...  # return value of the sensor operator to be pushed to XCOM.
        return PokeReturnValue(is_done, xcom_value)

要实现一个推送 XCOM 值并同时支持 2.3 及之前版本的 sensor 操作符,您需要在 2.3 之前的版本中显式推送 XCOM 值。

try:
    from airflow.sensors.base import PokeReturnValue
except ImportError:
    PokeReturnValue = None


class SensorWithXcomValue(BaseSensorOperator):
    def poke(self, context: Context) -> bool:
        # ...
        is_done = ...  # set to true if the sensor should stop poking.
        xcom_value = ...  # return value of the sensor operator to be pushed to XCOM.
        if PokeReturnValue is not None:
            return PokeReturnValue(is_done, xcom_value)
        else:
            if is_done:
                context["ti"].xcom_push(key="xcom_key", value=xcom_value)
            return is_done

或者,在 sensor 不需要推送 XCOM 值的情况下:poke() 和包装函数都可以返回一个布尔值,其中 True 表示 sensor 的操作已完成,False 表示 sensor 的操作未完成。

airflow/example_dags/example_sensor_decorator.py[源代码]


import pendulum

from airflow.decorators import dag, task
from airflow.sensors.base import PokeReturnValue
@dag(
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
)
def example_sensor_decorator():
    # Using a sensor operator to wait for the upstream data to be ready.
    @task.sensor(poke_interval=60, timeout=3600, mode="reschedule")
    def wait_for_upstream() -> PokeReturnValue:
        return PokeReturnValue(is_done=True, xcom_value="xcom_value")
    @task
    def dummy_operator() -> None:
        pass
    wait_for_upstream() >> dummy_operator()
tutorial_etl_dag = example_sensor_decorator()

多输出推断

任务还可以通过使用 dict Python 类型推断多个输出。

@task
def identity_dict(x: int, y: int) -> dict[str, int]:
    return {"x": x, "y": y}

通过对函数返回类型使用类型 dict 或任何其他符合 typing.Mapping 协议的类,multiple_outputs 参数会自动设置为 true。

注意,如果您手动设置 multiple_outputs 参数,则推断将被禁用,并使用参数值。

在装饰任务和传统任务之间添加依赖项

上面的教程展示了如何在 TaskFlow 函数之间创建依赖项。但是,也可以在传统任务(例如 BashOperatorFileSensor)和 TaskFlow 函数之间设置依赖项。

下面的代码显示了如何构建此依赖项

@task()
def extract_from_file():
    """
    #### Extract from file task
    A simple Extract task to get data ready for the rest of the data
    pipeline, by reading the data from a file into a pandas dataframe
    """
    order_data_file = "/tmp/order_data.csv"
    order_data_df = pd.read_csv(order_data_file)
    return order_data_df


file_task = FileSensor(task_id="check_file", filepath="/tmp/order_data.csv")
order_data = extract_from_file()

file_task >> order_data

在上面的代码块中,一个新的 TaskFlow 函数被定义为 extract_from_file,它从已知文件位置读取数据。在主 DAG 中,定义了一个新的 FileSensor 任务来检查此文件。请注意,这是一个等待文件的 Sensor 任务。TaskFlow 函数调用放在变量 order_data 中。最后,使用该变量指定此 Sensor 任务和 TaskFlow 函数之间的依赖关系。

在装饰任务和传统任务之间使用 XComs

如上所述,TaskFlow API 允许以一种对 DAG 作者抽象的方式在任务之间使用或传递 XComs。本节将进一步深入探讨如何在 TaskFlow 函数之间以及 TaskFlow 函数*和*传统任务之间实现这一点的详细示例。

您可能会发现有必要使用来自传统任务的 XCom(在任务执行期间推送或通过其返回值推送)作为下游任务的输入。您可以使用为所有操作符公开的 .output 属性访问推送的 XCom(也称为 XComArg)。

默认情况下,使用 .output 属性检索 XCom 结果等效于

task_instance.xcom_pull(task_ids="my_task_id", key="return_value")

要检索 return_value 以外的键的 XCom 结果,可以使用

my_op = MyOperator(...)
my_op_output = my_op.output["some_other_xcom_key"]
# OR
my_op_output = my_op.output.get("some_other_xcom_key")

注意

仅当操作符参数列为 template_field 时,才支持使用 .output 属性作为另一个任务的输入。

在下面的代码示例中,HttpOperator 结果通过 XComs 捕获。然后,将此 XCom 结果(即任务输出)传递给 TaskFlow 函数,该函数将响应解析为 JSON。

get_api_results_task = HttpOperator(
    task_id="get_api_results",
    endpoint="/api/query",
    do_xcom_push=True,
    http_conn_id="http",
)


@task
def parse_results(api_results):
    return json.loads(api_results)


parsed_results = parse_results(api_results=get_api_results_task.output)

反过来也可以做到:将 TaskFlow 函数的输出作为输入传递给传统任务。

@task(retries=3)
def create_queue():
    """This is a Python function that creates an SQS queue"""
    hook = SqsHook()
    result = hook.create_queue(queue_name="sample-queue")

    return result["QueueUrl"]


sqs_queue = create_queue()

publish_to_queue = SqsPublishOperator(
    task_id="publish_to_queue",
    sqs_queue=sqs_queue,
    message_content="{{ task_instance }}-{{ execution_date }}",
    message_attributes=None,
    delay_seconds=0,
)

请注意上面的代码示例,create_queue TaskFlow 函数的输出(新创建的 Amazon SQS 队列的 URL)随后被传递给 SqsPublishOperator 任务作为 sqs_queue 参数。

最后,您不仅可以使用传统操作符输出作为 TaskFlow 函数的输入,还可以将其用作其他传统操作符的输入。在下面的示例中,SalesforceToS3Operator 任务的输出(目标文件位置的 S3 URI)用作 S3CopyObjectOperator 任务的输入,以将同一文件复制到 S3 中的日期分区存储位置,以便在数据湖中长期存储。

BASE_PATH = "salesforce/customers"
FILE_NAME = "customer_daily_extract_{{ ds_nodash }}.csv"


upload_salesforce_data_to_s3_landing = SalesforceToS3Operator(
    task_id="upload_salesforce_data_to_s3",
    salesforce_query="SELECT Id, Name, Company, Phone, Email, LastModifiedDate, IsActive FROM Customers",
    s3_bucket_name="landing-bucket",
    s3_key=f"{BASE_PATH}/{FILE_NAME}",
    salesforce_conn_id="salesforce",
    aws_conn_id="s3",
    replace=True,
)


store_to_s3_data_lake = S3CopyObjectOperator(
    task_id="store_to_s3_data_lake",
    aws_conn_id="s3",
    source_bucket_key=upload_salesforce_data_to_s3_landing.output,
    dest_bucket_name="data_lake",
    dest_bucket_key=f"""{BASE_PATH}/{"{{ execution_date.strftime('%Y/%m/%d') }}"}/{FILE_NAME}""",
)

在装饰任务中访问上下文变量

运行可调用对象时,Airflow 将传递一组可在函数中使用的关键字参数。这组 kwargs 与您可以在 Jinja 模板中使用的内容完全对应。为此,您可以添加要在函数中接收的上下文键作为关键字参数。

例如,以下代码块中的可调用对象将获取 tinext_ds 上下文变量的值

@task
def my_python_callable(*, ti, next_ds):
    pass

在 2.8 版中更改: 以前,上下文键参数必须提供默认值,例如 ti=None。现在不再需要这样做了。

您也可以选择使用 **kwargs 接收整个上下文。请注意,这可能会导致轻微的性能损失,因为 Airflow 需要扩展可能包含许多您实际上并不需要的内容的整个上下文。因此,更建议您使用显式参数,如上一段所示。

@task
def my_python_callable(**kwargs):
    ti = kwargs["ti"]
    next_ds = kwargs["next_ds"]

此外,有时您可能希望访问堆栈深处某处的上下文,但您不想从任务可调用对象传递上下文变量。您仍然可以通过 get_current_context 方法访问执行上下文。

from airflow.operators.python import get_current_context


def some_function_in_your_library():
    context = get_current_context()
    ti = context["ti"]

当前上下文只能在任务执行期间访问。在 pre_executepost_execute 期间无法访问上下文。在执行上下文之外调用此方法将引发错误。

下一步是什么?

您已经看到了使用 Airflow 2.0 中的 TaskFlow API 范例编写 DAG 是多么简单。以下是您接下来可能要采取的一些步骤

另请参阅

此条目有帮助吗?