PythonOperator

使用 PythonOperator 执行 Python 可调用对象。

提示

建议使用 @task 装饰器而不是经典的 PythonOperator 来执行 Python 可调用对象。

airflow/example_dags/example_python_decorator.py[源代码]

@task(task_id="print_the_context")
def print_context(ds=None, **kwargs):
    """Print the Airflow context and ds variable from the context."""
    pprint(kwargs)
    print(ds)
    return "Whatever you return gets printed in the logs"

run_this = print_context()

airflow/example_dags/example_python_operator.py[源代码]

def print_context(ds=None, **kwargs):
    """Print the Airflow context and ds variable from the context."""
    print("::group::All kwargs")
    pprint(kwargs)
    print("::endgroup::")
    print("::group::Context variable ds")
    print(ds)
    print("::endgroup::")
    return "Whatever you return gets printed in the logs"

run_this = PythonOperator(task_id="print_the_context", python_callable=print_context)

传入参数

像使用普通 Python 函数一样,将额外的参数传递给 @task 装饰的函数。

airflow/example_dags/example_python_decorator.py[源代码]

# Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectively
@task
def my_sleeping_function(random_base):
    """This is a function that will run within the DAG execution"""
    time.sleep(random_base)

for i in range(5):
    sleeping_task = my_sleeping_function.override(task_id=f"sleep_for_{i}")(random_base=i / 10)

    run_this >> log_the_sql >> sleeping_task

airflow/example_dags/example_python_operator.py[源代码]

# Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectively
def my_sleeping_function(random_base):
    """This is a function that will run within the DAG execution"""
    time.sleep(random_base)

for i in range(5):
    sleeping_task = PythonOperator(
        task_id=f"sleep_for_{i}", python_callable=my_sleeping_function, op_kwargs={"random_base": i / 10}
    )

    run_this >> log_the_sql >> sleeping_task

模板

Airflow 传入一组额外的关键字参数:每个 Jinja 模板变量 一个,以及一个 templates_dict 参数。

templates_dict 参数是模板化的,因此字典中的每个值都被评估为 Jinja 模板

airflow/example_dags/example_python_decorator.py[源代码]

@task(task_id="log_sql_query", templates_dict={"query": "sql/sample.sql"}, templates_exts=[".sql"])
def log_sql(**kwargs):
    log.info("Python task decorator query: %s", str(kwargs["templates_dict"]["query"]))

log_the_sql = log_sql()

airflow/example_dags/example_python_operator.py[源代码]

def log_sql(**kwargs):
    log.info("Python task decorator query: %s", str(kwargs["templates_dict"]["query"]))

log_the_sql = PythonOperator(
    task_id="log_sql_query",
    python_callable=log_sql,
    templates_dict={"query": "sql/sample.sql"},
    templates_exts=[".sql"],
)

PythonVirtualenvOperator

使用 PythonVirtualenvOperator 装饰器在新 Python 虚拟环境中执行 Python 可调用对象。virtualenv 包需要安装在运行 Airflow 的环境中(作为可选依赖项 pip install apache-airflow[virtualenv] --constraint ...)。

提示

建议使用 @task.virtualenv 装饰器而不是经典的 PythonVirtualenvOperator 在新的 Python 虚拟环境中执行 Python 可调用对象。

airflow/example_dags/example_python_decorator.py[源代码]

    @task.virtualenv(
        task_id="virtualenv_python", requirements=["colorama==0.4.0"], system_site_packages=False
    )
    def callable_virtualenv():
        """
        Example function that will be performed in a virtual environment.

        Importing at the module level ensures that it will not attempt to import the
        library before it is installed.
        """
        from time import sleep

        from colorama import Back, Fore, Style

        print(Fore.RED + "some red text")
        print(Back.GREEN + "and with a green background")
        print(Style.DIM + "and in dim text")
        print(Style.RESET_ALL)
        for _ in range(4):
            print(Style.DIM + "Please wait...", flush=True)
            sleep(1)
        print("Finished")

    virtualenv_task = callable_virtualenv()

airflow/example_dags/example_python_operator.py[源代码]

    def callable_virtualenv():
        """
        Example function that will be performed in a virtual environment.

        Importing at the module level ensures that it will not attempt to import the
        library before it is installed.
        """
        from time import sleep

        from colorama import Back, Fore, Style

        print(Fore.RED + "some red text")
        print(Back.GREEN + "and with a green background")
        print(Style.DIM + "and in dim text")
        print(Style.RESET_ALL)
        for _ in range(4):
            print(Style.DIM + "Please wait...", flush=True)
            sleep(1)
        print("Finished")

    virtualenv_task = PythonVirtualenvOperator(
        task_id="virtualenv_python",
        python_callable=callable_virtualenv,
        requirements=["colorama==0.4.0"],
        system_site_packages=False,
    )

传入参数

像使用普通 Python 函数一样,将额外的参数传递给 @task.virtualenv 装饰的函数。不幸的是,由于与底层库不兼容,Airflow 不支持序列化 vartitask_instance。对于 Airflow 上下文变量,请确保您可以通过将 system_site_packages 设置为 True 来访问 Airflow,或者将 apache-airflow 添加到 requirements 参数中。否则,您将无法在 op_kwargs 中访问 Airflow 的大多数上下文变量。如果您希望上下文与日期时间对象(如 data_interval_start)相关,则可以添加 pendulumlazy_object_proxy

重要

定义为要执行的 Python 函数体将从 DAG 中剪切到一个临时文件中,不带周围的代码。如示例所示,您需要再次添加所有导入,并且不能依赖全局 Python 上下文中的变量。

如果要将变量传递给经典的 PythonVirtualenvOperator,请使用 op_argsop_kwargs

如果需要用于包安装的附加参数,请通过 pip_install_options 参数传递它们,或者使用 requirements.txt,如下例所示

SomePackage==0.2.1 --pre --index-url http://some.archives.com/archives
AnotherPackage==1.4.3 --no-index --find-links /my/local/archives

所有支持的选项都列在 需求文件格式 中。

虚拟环境设置选项

虚拟环境是根据您的工作器上的全局 python pip 配置创建的。在您的环境中使用额外的 ENV 或调整常规 pip 配置,如 pip 配置 中所述。

如果要使用其他特定于任务的私有 python 存储库来设置虚拟环境,则可以传递 index_urls 参数,该参数将调整 pip 安装配置。传递的索引 URL 将替换标准系统配置的索引 URL 设置。为了防止在 DAG 代码中向私有存储库添加机密,您可以使用 Airflow 连接和钩子。为此,可以使用连接类型 包索引 (Python)

在特殊情况下,如果要阻止远程调用来设置虚拟环境,请将 index_urls 作为空列表传递,如 index_urls=[],这将强制 pip 安装程序使用 --no-index 选项。

缓存和重用

虚拟环境的设置是针对临时目录中的每次任务执行进行的。执行后,虚拟环境将再次被删除。确保您的工作器上的 $tmp 文件夹有足够的磁盘空间。通常(如果没有另外配置),将使用本地 pip 缓存,以防止每次执行都重新下载包。

但是,每次执行都设置虚拟环境仍然需要一些时间。对于重复执行,您可以将选项 venv_cache_path 设置为工作器上的文件系统文件夹。在这种情况下,虚拟环境将设置一次并被重用。如果使用虚拟环境缓存,则在缓存路径中为每个唯一的要求集创建不同的虚拟环境子文件夹。因此,根据您的系统中 DAG 的变化,需要足够的磁盘空间。

请注意,在缓存模式下不会进行自动清理。所有工作器插槽共享相同的虚拟环境,但如果任务在不同的工作器上反复调度,则可能会在多个工作器上分别创建虚拟环境。此外,如果工作器在 Kubernetes POD 中启动,则重新启动工作器将丢弃缓存(假设 venv_cache_path 不在持久卷上)。

如果您在运行时遇到缓存虚拟环境损坏的问题,您可以通过将 Airflow 变量 PythonVirtualenvOperator.cache_key 设置为任何文本来影响缓存目录哈希值。此变量的内容用于向量中以计算缓存目录键。

请注意,对缓存虚拟环境的任何修改(例如二进制路径中的临时文件、后安装其他需求)都可能会污染缓存虚拟环境,并且操作员不会维护或清理缓存路径。

ExternalPythonOperator

ExternalPythonOperator 可以帮助您使用与其他任务(以及主 Airflow 环境)不同的 Python 库集来运行某些任务。这可以是虚拟环境,也可以是预先安装并在运行 Airflow 任务的环境中可用的任何 Python 安装。操作员将 Python 二进制文件作为 python 参数。请注意,即使在虚拟环境的情况下,python 路径也应该指向虚拟环境内的 python 二进制文件(通常位于虚拟环境的 bin 子目录中)。与常规使用虚拟环境相反,这里不需要 activation 环境。仅仅使用 python 二进制文件就会自动激活它。在以下两个示例中,PATH_TO_PYTHON_BINARY 都是这样的路径,指向可执行的 Python 二进制文件。

使用 ExternalPythonOperator 在预定义的环境中执行 Python 可调用对象。virtualenv 包应该预先安装在运行 Python 的环境中。如果使用 dill,则必须将其预先安装在环境中(与安装在主 Airflow 环境中的版本相同)。

提示

建议使用 @task.external_python 装饰器而不是经典的 ExternalPythonOperator 在预定义的 Python 环境中执行 Python 代码。

airflow/example_dags/example_python_decorator.py[源代码]

    @task.external_python(task_id="external_python", python=PATH_TO_PYTHON_BINARY)
    def callable_external_python():
        """
        Example function that will be performed in a virtual environment.

        Importing at the module level ensures that it will not attempt to import the
        library before it is installed.
        """
        import sys
        from time import sleep

        print(f"Running task via {sys.executable}")
        print("Sleeping")
        for _ in range(4):
            print("Please wait...", flush=True)
            sleep(1)
        print("Finished")

    external_python_task = callable_external_python()

airflow/example_dags/example_python_operator.py[源代码]

    def callable_external_python():
        """
        Example function that will be performed in a virtual environment.

        Importing at the module level ensures that it will not attempt to import the
        library before it is installed.
        """
        import sys
        from time import sleep

        print(f"Running task via {sys.executable}")
        print("Sleeping")
        for _ in range(4):
            print("Please wait...", flush=True)
            sleep(1)
        print("Finished")

    external_python_task = ExternalPythonOperator(
        task_id="external_python",
        python_callable=callable_external_python,
        python=PATH_TO_PYTHON_BINARY,
    )

传递参数

将额外的参数传递给 @task.external_python 装饰的函数,就像使用普通的 Python 函数一样。不幸的是,由于与底层库不兼容,Airflow 不支持序列化 varti / task_instance。对于 Airflow 上下文变量,请确保 Airflow 也作为 virtualenv 环境的一部分安装,并且版本与运行任务的 Airflow 版本相同。否则,您将无法在 op_kwargs 中访问 Airflow 的大多数上下文变量。如果您希望上下文与日期时间对象相关,例如 data_interval_start,您可以将 pendulumlazy_object_proxy 添加到您的虚拟环境中。

重要

定义为要执行的 Python 函数体将从 DAG 中剪切到一个临时文件中,不带周围的代码。如示例所示,您需要再次添加所有导入,并且不能依赖全局 Python 上下文中的变量。

如果要将变量传递给经典的 ExternalPythonOperator,请使用 op_argsop_kwargs

PythonBranchOperator

使用 PythonBranchOperator 执行 Python 分支 任务。

提示

建议使用 @task.branch 装饰器而不是经典的 PythonBranchOperator 来执行 Python 代码。

airflow/example_dags/example_branch_operator_decorator.py[源代码]

    @task.branch()
    def branching(choices: list[str]) -> str:
        return f"branch_{random.choice(choices)}"

airflow/example_dags/example_branch_operator.py[源代码]

    branching = BranchPythonOperator(
        task_id="branching",
        python_callable=lambda: f"branch_{random.choice(options)}",
    )

参数传递和模板选项与 PythonOperator 相同。

BranchPythonVirtualenvOperator

使用 BranchPythonVirtualenvOperator 装饰器执行 Python 分支 任务,它是 PythonBranchOperator 与在虚拟环境中执行的混合体。

提示

建议使用 @task.branch_virtualenv 装饰器而不是经典的 BranchPythonVirtualenvOperator 来执行 Python 代码。

airflow/example_dags/example_branch_operator_decorator.py[源代码]

    # Note: Passing a caching dir allows to keep the virtual environment over multiple runs
    #       Run the example a second time and see that it re-uses it and is faster.
    VENV_CACHE_PATH = tempfile.gettempdir()

    @task.branch_virtualenv(requirements=["numpy~=1.24.4"], venv_cache_path=VENV_CACHE_PATH)
    def branching_virtualenv(choices) -> str:
        import random

        import numpy as np

        print(f"Some numpy stuff: {np.arange(6)}")
        return f"venv_{random.choice(choices)}"

airflow/example_dags/example_branch_operator.py[源代码]

    # Note: Passing a caching dir allows to keep the virtual environment over multiple runs
    #       Run the example a second time and see that it re-uses it and is faster.
    VENV_CACHE_PATH = Path(tempfile.gettempdir())

    def branch_with_venv(choices):
        import random

        import numpy as np

        print(f"Some numpy stuff: {np.arange(6)}")
        return f"venv_{random.choice(choices)}"

    branching_venv = BranchPythonVirtualenvOperator(
        task_id="branching_venv",
        requirements=["numpy~=1.26.0"],
        venv_cache_path=VENV_CACHE_PATH,
        python_callable=branch_with_venv,
        op_args=[options],
    )

参数传递和模板选项与 PythonVirtualenvOperator 相同。

BranchExternalPythonOperator

使用 BranchExternalPythonOperator 执行 Python 分支 任务,它是 PythonBranchOperator 与在外部 Python 环境中执行的混合体。

提示

建议使用 @task.branch_external_python 装饰器而不是经典的 BranchExternalPythonOperator 来执行 Python 代码。

airflow/example_dags/example_branch_operator_decorator.py[源代码]

    @task.branch_external_python(python=PATH_TO_PYTHON_BINARY)
    def branching_ext_python(choices) -> str:
        import random

        return f"ext_py_{random.choice(choices)}"

airflow/example_dags/example_branch_operator.py[源代码]

    def branch_with_external_python(choices):
        import random

        return f"ext_py_{random.choice(choices)}"

    branching_ext_py = BranchExternalPythonOperator(
        task_id="branching_ext_python",
        python=PATH_TO_PYTHON_BINARY,
        python_callable=branch_with_external_python,
        op_args=[options],
    )

参数传递和模板选项与 ExternalPythonOperator 相同。

ShortCircuitOperator

使用 ShortCircuitOperator 控制在满足条件或获得真值时管道是否继续。

此条件和真值的评估是通过可调用对象的输出完成的。如果可调用对象返回 True 或真值,则允许管道继续,并将推送输出的 XCom。如果输出为 False 或假值,则管道将根据配置的短路方式进行短路(稍后将详细介绍)。在下面的示例中,“condition_is_true”任务后面的任务将执行,而“condition_is_false”任务后面的任务将被跳过。

提示

建议使用 @task.short_circuit 装饰器而不是经典的 ShortCircuitOperator 通过 Python 可调用对象来短路管道。

airflow/example_dags/example_short_circuit_decorator.py[源代码]

@task.short_circuit()
def check_condition(condition):
    return condition

ds_true = [EmptyOperator(task_id=f"true_{i}") for i in [1, 2]]
ds_false = [EmptyOperator(task_id=f"false_{i}") for i in [1, 2]]

condition_is_true = check_condition.override(task_id="condition_is_true")(condition=True)
condition_is_false = check_condition.override(task_id="condition_is_false")(condition=False)

chain(condition_is_true, *ds_true)
chain(condition_is_false, *ds_false)

airflow/example_dags/example_short_circuit_operator.py[源代码]

cond_true = ShortCircuitOperator(
    task_id="condition_is_True",
    python_callable=lambda: True,
)

cond_false = ShortCircuitOperator(
    task_id="condition_is_False",
    python_callable=lambda: False,
)

ds_true = [EmptyOperator(task_id=f"true_{i}") for i in [1, 2]]
ds_false = [EmptyOperator(task_id=f"false_{i}") for i in [1, 2]]

chain(cond_true, *ds_true)
chain(cond_false, *ds_false)

可以将“短路”配置为尊重或忽略为下游任务定义的 触发规则。如果 ignore_downstream_trigger_rules 设置为 True(默认配置),则所有下游任务都将被跳过,而不考虑为任务定义的 trigger_rule。如果此参数设置为 False,则直接下游任务将被跳过,但其他后续下游任务的指定 trigger_rule 将被遵守。在此短路配置中,操作员假定直接下游任务是有意要跳过的,但其他后续任务可能不是。如果管道中只有 *一部分* 应该短路,而不是短路任务后面的所有任务,则此配置特别有用。

在下面的示例中,请注意“short_circuit”任务被配置为遵守下游触发规则。这意味着,虽然“short_circuit”任务后面的任务将被跳过,因为装饰函数返回 False,但“task_7”仍将执行,因为它设置为在上游任务完成运行时执行,而不管状态如何(即 TriggerRule.ALL_DONE 触发规则)。

airflow/example_dags/example_short_circuit_decorator.py[源代码]

[task_1, task_2, task_3, task_4, task_5, task_6] = [
    EmptyOperator(task_id=f"task_{i}") for i in range(1, 7)
]

task_7 = EmptyOperator(task_id="task_7", trigger_rule=TriggerRule.ALL_DONE)

short_circuit = check_condition.override(task_id="short_circuit", ignore_downstream_trigger_rules=False)(
    condition=False
)

chain(task_1, [task_2, short_circuit], [task_3, task_4], [task_5, task_6], task_7)

airflow/example_dags/example_short_circuit_operator.py[源代码]

[task_1, task_2, task_3, task_4, task_5, task_6] = [
    EmptyOperator(task_id=f"task_{i}") for i in range(1, 7)
]

task_7 = EmptyOperator(task_id="task_7", trigger_rule=TriggerRule.ALL_DONE)

short_circuit = ShortCircuitOperator(
    task_id="short_circuit", ignore_downstream_trigger_rules=False, python_callable=lambda: False
)

chain(task_1, [task_2, short_circuit], [task_3, task_4], [task_5, task_6], task_7)

传递参数

将额外的参数传递给 @task.short_circuit 装饰的函数,就像使用普通的 Python 函数一样。

模板

Jinja 模板的使用方式与 PythonOperator 中描述的相同。

PythonSensor

PythonSensor 执行任意可调用对象,并等待其返回值为 True。

提示

建议使用 @task.sensor 装饰器而不是经典的 PythonSensor 来执行 Python 可调用对象以检查 True 条件。

airflow/example_dags/example_sensor_decorator.py[源代码]

# Using a sensor operator to wait for the upstream data to be ready.
@task.sensor(poke_interval=60, timeout=3600, mode="reschedule")
def wait_for_upstream() -> PokeReturnValue:
    return PokeReturnValue(is_done=True, xcom_value="xcom_value")

airflow/example_dags/example_sensors.py[源代码]

t9 = PythonSensor(task_id="success_sensor_python", python_callable=success_callable)

t10 = PythonSensor(
    task_id="failure_timeout_sensor_python", timeout=3, soft_fail=True, python_callable=failure_callable
)

此条目对您有帮助吗?