PythonOperator

使用 PythonOperator 来执行 Python 可调用对象。

提示

建议使用 @task 装饰器,而不是经典的 PythonOperator 来执行 Python 可调用对象。

airflow/example_dags/example_python_decorator.py[源代码]

@task(task_id="print_the_context")
def print_context(ds=None, **kwargs):
    """Print the Airflow context and ds variable from the context."""
    pprint(kwargs)
    print(ds)
    return "Whatever you return gets printed in the logs"

run_this = print_context()

airflow/example_dags/example_python_operator.py[源代码]

def print_context(ds=None, **kwargs):
    """Print the Airflow context and ds variable from the context."""
    print("::group::All kwargs")
    pprint(kwargs)
    print("::endgroup::")
    print("::group::Context variable ds")
    print(ds)
    print("::endgroup::")
    return "Whatever you return gets printed in the logs"

run_this = PythonOperator(task_id="print_the_context", python_callable=print_context)

传入参数

像使用普通 Python 函数一样,将额外的参数传递给 @task 装饰的函数。

airflow/example_dags/example_python_decorator.py[源代码]

# Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectively
@task
def my_sleeping_function(random_base):
    """This is a function that will run within the DAG execution"""
    time.sleep(random_base)

for i in range(5):
    sleeping_task = my_sleeping_function.override(task_id=f"sleep_for_{i}")(random_base=i / 10)

    run_this >> log_the_sql >> sleeping_task

airflow/example_dags/example_python_operator.py[源代码]

# Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectively
def my_sleeping_function(random_base):
    """This is a function that will run within the DAG execution"""
    time.sleep(random_base)

for i in range(5):
    sleeping_task = PythonOperator(
        task_id=f"sleep_for_{i}", python_callable=my_sleeping_function, op_kwargs={"random_base": i / 10}
    )

    run_this >> log_the_sql >> sleeping_task

模板

Airflow 传入一组额外的关键字参数:每个 Jinja 模板变量 一个,以及一个 templates_dict 参数。

templates_dictop_argsop_kwargs 参数会被模板化,因此字典中的每个值都会被评估为 Jinja 模板

airflow/example_dags/example_python_decorator.py[源代码]

@task(task_id="log_sql_query", templates_dict={"query": "sql/sample.sql"}, templates_exts=[".sql"])
def log_sql(**kwargs):
    log.info("Python task decorator query: %s", str(kwargs["templates_dict"]["query"]))

log_the_sql = log_sql()

airflow/example_dags/example_python_operator.py[源代码]

def log_sql(**kwargs):
    log.info("Python task decorator query: %s", str(kwargs["templates_dict"]["query"]))

log_the_sql = PythonOperator(
    task_id="log_sql_query",
    python_callable=log_sql,
    templates_dict={"query": "sql/sample.sql"},
    templates_exts=[".sql"],
)

上下文

Context 是一个字典对象,包含有关 DagRun 环境的信息。 例如,选择 task_instance 将获取当前正在运行的 TaskInstance 对象。

它可以隐式使用,例如使用 **kwargs,但也可以使用 get_current_context() 显式使用。 在这种情况下,类型提示可以用于静态分析。

airflow/example_dags/example_python_context_decorator.py

@task(task_id="print_the_context")
def print_context() -> str:
    """Print the Airflow context."""
    from pprint import pprint

    from airflow.providers.standard.operators.python import get_current_context

    context = get_current_context()
    pprint(context)
    return "Whatever you return gets printed in the logs"

print_the_context = print_context()

airflow/example_dags/example_python_context_operator.py

def print_context() -> str:
    """Print the Airflow context."""
    from pprint import pprint

    from airflow.providers.standard.operators.python import get_current_context

    context = get_current_context()
    pprint(context)
    return "Whatever you return gets printed in the logs"

print_the_context = PythonOperator(task_id="print_the_context", python_callable=print_context)

PythonVirtualenvOperator

使用 PythonVirtualenvOperator 装饰器,在新 Python 虚拟环境中执行 Python 可调用对象。 需要在运行 Airflow 的环境中安装 virtualenv 包(作为可选依赖项 pip install apache-airflow[virtualenv] --constraint ...)。

此外,需要使用命令 pip install [cloudpickle] --constraint ... 安装 cloudpickle 包作为可选依赖项。 该软件包是当前使用的 dill 软件包的替代品。 Cloudpickle 在关注标准 pickling 协议方面具有强大的优势,确保更广泛的兼容性和更流畅的数据交换,同时仍然有效地处理函数中的常见 Python 对象和全局变量。

提示

建议使用 @task.virtualenv 装饰器,而不是经典的 PythonVirtualenvOperator 在新的 Python 虚拟环境中执行 Python 可调用对象。

airflow/example_dags/example_python_decorator.py[源代码]

@task.virtualenv(
    task_id="virtualenv_python", requirements=["colorama==0.4.0"], system_site_packages=False
)
def callable_virtualenv():
    """
    Example function that will be performed in a virtual environment.

    Importing at the module level ensures that it will not attempt to import the
    library before it is installed.
    """
    from time import sleep

    from colorama import Back, Fore, Style

    print(Fore.RED + "some red text")
    print(Back.GREEN + "and with a green background")
    print(Style.DIM + "and in dim text")
    print(Style.RESET_ALL)
    for _ in range(4):
        print(Style.DIM + "Please wait...", flush=True)
        sleep(1)
    print("Finished")

virtualenv_task = callable_virtualenv()

airflow/example_dags/example_python_operator.py[源代码]

def callable_virtualenv():
    """
    Example function that will be performed in a virtual environment.

    Importing at the function level ensures that it will not attempt to import the
    library before it is installed.
    """
    from time import sleep

    from colorama import Back, Fore, Style

    print(Fore.RED + "some red text")
    print(Back.GREEN + "and with a green background")
    print(Style.DIM + "and in dim text")
    print(Style.RESET_ALL)
    for _ in range(4):
        print(Style.DIM + "Please wait...", flush=True)
        sleep(1)
    print("Finished")

virtualenv_task = PythonVirtualenvOperator(
    task_id="virtualenv_python",
    python_callable=callable_virtualenv,
    requirements=["colorama==0.4.0"],
    system_site_packages=False,
)

传入参数

像使用普通 Python 函数一样,将额外的参数传递给 @task.virtualenv 装饰的函数。 遗憾的是,由于与底层库不兼容,Airflow 不支持序列化 vartitask_instance。 对于 Airflow 上下文变量,请确保通过将 system_site_packages 设置为 True 或将 apache-airflow 添加到 requirements 参数来访问 Airflow。 否则,您将无法在 op_kwargs 中访问 Airflow 的大多数上下文变量。 如果您想要与日期时间对象相关的上下文,如 data_interval_start,您可以添加 pendulumlazy_object_proxy

重要

为执行而定义的 Python 函数体将从 DAG 中切出,放入一个临时文件中,而不包含周围的代码。 如示例中所示,您需要再次添加所有导入,并且不能依赖于全局 Python 上下文中的变量。

如果想将变量传递到经典的 PythonVirtualenvOperator 中,请使用 op_argsop_kwargs

如果需要其他软件包安装参数,请通过 pip_install_options 参数传递,或者使用如下示例中的 requirements.txt

SomePackage==0.2.1 --pre --index-url http://some.archives.com/archives
AnotherPackage==1.4.3 --no-index --find-links /my/local/archives

requirements 文件格式 中列出了所有支持的选项。

模板

Jinja 模板可以像 PythonOperator 中描述的那样使用。

虚拟环境设置选项

虚拟环境是根据您的工作节点上的全局 python pip 配置创建的。 在您的环境中使用额外的 ENV 或调整常规 pip 配置,如 pip 配置中所述。

如果要使用其他任务特定的私有 python 存储库来设置虚拟环境,您可以传递 index_urls 参数,这将调整 pip 安装配置。 传递的索引 URL 将替换标准系统配置的索引 URL 设置。 为了防止在您的 DAG 代码中向私有存储库添加机密信息,您可以使用 Airflow 连接和钩子。 为此,可以使用连接类型 软件包 索引 (Python)

如果您在特殊情况下要防止远程调用以设置虚拟环境,请将 index_urls 作为空列表传递,如 index_urls=[],这将强制 pip 安装程序使用 --no-index 选项。

缓存和重用

虚拟环境的设置是在临时目录中按任务执行进行的。 执行后,虚拟环境将再次被删除。 请确保您的工作节点上的 $tmp 文件夹有足够的磁盘空间。 通常(如果配置不同),将使用本地 pip 缓存,从而防止每次执行都重新下载软件包。

但是,为每次执行设置虚拟环境仍然需要一些时间。 对于重复执行,您可以将 venv_cache_path 选项设置为工作节点上的文件系统文件夹。 在这种情况下,虚拟环境将被设置一次并被重用。 如果使用虚拟环境缓存,则会在缓存路径中为每个唯一的要求集创建不同的虚拟环境子文件夹。 因此,根据系统中 DAG 的变化,需要足够的磁盘空间。

请注意,不会进行自动清理,并且在缓存模式下,所有工作槽共享相同的虚拟环境。但是,如果任务在不同的工作器上反复调度,则可能会发生在多个工作器上单独创建虚拟环境的情况。此外,如果工作器在 Kubernetes POD 中启动,则工作器重启将删除缓存(假设 venv_cache_path 不在持久卷上)。

如果在运行时遇到缓存的虚拟环境损坏的问题,您可以通过将 Airflow 变量 PythonVirtualenvOperator.cache_key 设置为任意文本来影响缓存目录的哈希值。此变量的内容用于计算缓存目录键的向量。

请注意,对缓存的虚拟环境的任何修改(例如二进制路径中的临时文件、后期安装的其他需求)都可能污染缓存的虚拟环境,并且该操作符不会维护或清理缓存路径。

上下文

在某些限制条件下,您还可以在虚拟环境中使用 Context

重要

在虚拟环境中使用 Context 是一项挑战,因为它涉及到库依赖项和序列化问题。

您可以通过使用 Jinja 模板变量 并将其显式地作为参数传递来在一定程度上绕过此问题。

您还可以像以前一样使用 get_current_context(),但有一些限制。

  • 需要 apache-airflow>=3.0.0

  • use_airflow_context 设置为 True 以在虚拟环境中调用 get_current_context()

  • system_site_packages 设置为 True 或将 expect_airflow 设置为 True

airflow/example_dags/example_python_context_decorator.py

@task.virtualenv(task_id="print_the_context_venv", use_airflow_context=True)
def print_context_venv() -> str:
    """Print the Airflow context in venv."""
    from pprint import pprint

    from airflow.providers.standard.operators.python import get_current_context

    context = get_current_context()
    pprint(context)
    return "Whatever you return gets printed in the logs"

print_the_context_venv = print_context_venv()

airflow/example_dags/example_python_context_operator.py

def print_context_venv() -> str:
    """Print the Airflow context in venv."""
    from pprint import pprint

    from airflow.providers.standard.operators.python import get_current_context

    context = get_current_context()
    pprint(context)
    return "Whatever you return gets printed in the logs"

print_the_context_venv = PythonVirtualenvOperator(
    task_id="print_the_context_venv", python_callable=print_context_venv, use_airflow_context=True
)

ExternalPythonOperator

ExternalPythonOperator 可以帮助您使用与其他任务(以及 Airflow 主环境)不同的 Python 库集运行某些任务。这可能是一个虚拟环境或任何预安装在 Airflow 任务运行环境中的 Python 安装。该操作符将 Python 二进制文件作为 python 参数。请注意,即使是虚拟环境,python 路径也应指向虚拟环境内的 python 二进制文件(通常位于虚拟环境的 bin 子目录中)。与常规使用虚拟环境不同,不需要 activation 环境。仅使用 python 二进制文件即可自动激活它。在下面的两个示例中,PATH_TO_PYTHON_BINARY 是这样的路径,指向可执行的 Python 二进制文件。

使用 ExternalPythonOperator 在预定义的环境中执行 Python 可调用对象。virtualenv 包应预先安装在 Python 运行的环境中。如果使用 dill,则必须预先安装在环境中(与主 Airflow 环境中安装的版本相同)。

提示

建议使用 @task.external_python 装饰器而不是经典的 ExternalPythonOperator 在预定义的 Python 环境中执行 Python 代码。

airflow/example_dags/example_python_decorator.py[源代码]

@task.external_python(task_id="external_python", python=PATH_TO_PYTHON_BINARY)
def callable_external_python():
    """
    Example function that will be performed in a virtual environment.

    Importing at the module level ensures that it will not attempt to import the
    library before it is installed.
    """
    import sys
    from time import sleep

    print(f"Running task via {sys.executable}")
    print("Sleeping")
    for _ in range(4):
        print("Please wait...", flush=True)
        sleep(1)
    print("Finished")

external_python_task = callable_external_python()

airflow/example_dags/example_python_operator.py[源代码]

def callable_external_python():
    """
    Example function that will be performed in a virtual environment.

    Importing at the module level ensures that it will not attempt to import the
    library before it is installed.
    """
    import sys
    from time import sleep

    print(f"Running task via {sys.executable}")
    print("Sleeping")
    for _ in range(4):
        print("Please wait...", flush=True)
        sleep(1)
    print("Finished")

external_python_task = ExternalPythonOperator(
    task_id="external_python",
    python_callable=callable_external_python,
    python=PATH_TO_PYTHON_BINARY,
)

传递参数

像使用普通 Python 函数一样,将额外的参数传递给 @task.external_python 装饰的函数。遗憾的是,由于与底层库不兼容,Airflow 不支持序列化 varti / task_instance。对于 Airflow 上下文变量,请确保 Airflow 也作为虚拟环境的一部分安装,其版本与运行任务的 Airflow 版本相同。否则,您将无法在 op_kwargs 中访问 Airflow 的大多数上下文变量。如果您想要与 datetime 对象相关的上下文,例如 data_interval_start,您可以将 pendulumlazy_object_proxy 添加到您的虚拟环境中。

重要

为执行而定义的 Python 函数体将从 DAG 中切出,放入一个临时文件中,而不包含周围的代码。 如示例中所示,您需要再次添加所有导入,并且不能依赖于全局 Python 上下文中的变量。

如果要将变量传递到经典的 ExternalPythonOperator 中,请使用 op_argsop_kwargs

模板化

Jinja 模板可以像 PythonOperator 中描述的那样使用。

上下文

您可以在与 PythonVirtualenvOperator 相同的条件下使用 Context

airflow/example_dags/example_python_context_decorator.py

@task.external_python(
    task_id="print_the_context_external", python=SOME_EXTERNAL_PYTHON, use_airflow_context=True
)
def print_context_external() -> str:
    """Print the Airflow context in external python."""
    from pprint import pprint

    from airflow.providers.standard.operators.python import get_current_context

    context = get_current_context()
    pprint(context)
    return "Whatever you return gets printed in the logs"

print_the_context_external = print_context_external()

airflow/example_dags/example_python_context_operator.py

def print_context_external() -> str:
    """Print the Airflow context in external python."""
    from pprint import pprint

    from airflow.providers.standard.operators.python import get_current_context

    context = get_current_context()
    pprint(context)
    return "Whatever you return gets printed in the logs"

print_the_context_external = ExternalPythonOperator(
    task_id="print_the_context_external",
    python_callable=print_context_external,
    python=SOME_EXTERNAL_PYTHON,
    use_airflow_context=True,
)

PythonBranchOperator

使用 PythonBranchOperator 执行 Python 分支 任务。

提示

建议使用 @task.branch 装饰器而不是经典的 PythonBranchOperator 来执行 Python 代码。

airflow/example_dags/example_branch_operator_decorator.py[源代码]

@task.branch()
def branching(choices: list[str]) -> str:
    return f"branch_{random.choice(choices)}"

airflow/example_dags/example_branch_operator.py[源代码]

branching = BranchPythonOperator(
    task_id="branching",
    python_callable=lambda: f"branch_{random.choice(options)}",
)

传递参数和模板化

参数传递和模板化选项与 PythonOperator 相同。

BranchPythonVirtualenvOperator

使用 BranchPythonVirtualenvOperator 装饰器来执行 Python 分支 任务,它是 PythonBranchOperator 与在虚拟环境中执行的混合体。

提示

建议使用 @task.branch_virtualenv 装饰器而不是经典的 BranchPythonVirtualenvOperator 来执行 Python 代码。

airflow/example_dags/example_branch_operator_decorator.py[源代码]

# Note: Passing a caching dir allows to keep the virtual environment over multiple runs
#       Run the example a second time and see that it re-uses it and is faster.
VENV_CACHE_PATH = tempfile.gettempdir()

@task.branch_virtualenv(requirements=["numpy~=1.26.0"], venv_cache_path=VENV_CACHE_PATH)
def branching_virtualenv(choices) -> str:
    import random

    import numpy as np

    print(f"Some numpy stuff: {np.arange(6)}")
    return f"venv_{random.choice(choices)}"

airflow/example_dags/example_branch_operator.py[源代码]

# Note: Passing a caching dir allows to keep the virtual environment over multiple runs
#       Run the example a second time and see that it re-uses it and is faster.
VENV_CACHE_PATH = Path(tempfile.gettempdir())

def branch_with_venv(choices):
    import random

    import numpy as np

    print(f"Some numpy stuff: {np.arange(6)}")
    return f"venv_{random.choice(choices)}"

branching_venv = BranchPythonVirtualenvOperator(
    task_id="branching_venv",
    requirements=["numpy~=1.26.0"],
    venv_cache_path=VENV_CACHE_PATH,
    python_callable=branch_with_venv,
    op_args=[options],
)

传递参数和模板化

参数传递和模板化选项与 PythonOperator 相同。

BranchExternalPythonOperator

使用 BranchExternalPythonOperator 执行 Python 分支 任务,它是 PythonBranchOperator 与在外部 Python 环境中执行的混合体。

提示

建议使用 @task.branch_external_python 装饰器而不是经典的 BranchExternalPythonOperator 来执行 Python 代码。

airflow/example_dags/example_branch_operator_decorator.py[源代码]

@task.branch_external_python(python=PATH_TO_PYTHON_BINARY)
def branching_ext_python(choices) -> str:
    import random

    return f"ext_py_{random.choice(choices)}"

airflow/example_dags/example_branch_operator.py[源代码]

def branch_with_external_python(choices):
    import random

    return f"ext_py_{random.choice(choices)}"

branching_ext_py = BranchExternalPythonOperator(
    task_id="branching_ext_python",
    python=PATH_TO_PYTHON_BINARY,
    python_callable=branch_with_external_python,
    op_args=[options],
)

传递参数和模板化

参数传递和模板化选项与 PythonOperator 相同。

ShortCircuitOperator

使用 ShortCircuitOperator 来控制在满足条件或获得真值时是否继续管道。

此条件和真值的评估是通过可调用对象的输出完成的。如果可调用对象返回 True 或真值,则允许管道继续执行,并且将推送输出的 XCom。如果输出为 False 或假值,则管道将根据配置的短路(稍后详细介绍)进行短路。在下面的示例中,将执行“condition_is_true”任务之后的任务,而将跳过“condition_is_false”任务下游的任务。

提示

建议使用 @task.short_circuit 装饰器而不是经典的 ShortCircuitOperator 通过 Python 可调用对象来短路管道。

airflow/example_dags/example_short_circuit_decorator.py[源代码]

@task.short_circuit()
def check_condition(condition):
    return condition

ds_true = [EmptyOperator(task_id=f"true_{i}") for i in [1, 2]]
ds_false = [EmptyOperator(task_id=f"false_{i}") for i in [1, 2]]

condition_is_true = check_condition.override(task_id="condition_is_true")(condition=True)
condition_is_false = check_condition.override(task_id="condition_is_false")(condition=False)

chain(condition_is_true, *ds_true)
chain(condition_is_false, *ds_false)

airflow/example_dags/example_short_circuit_operator.py[源代码]

cond_true = ShortCircuitOperator(
    task_id="condition_is_True",
    python_callable=lambda: True,
)

cond_false = ShortCircuitOperator(
    task_id="condition_is_False",
    python_callable=lambda: False,
)

ds_true = [EmptyOperator(task_id=f"true_{i}") for i in [1, 2]]
ds_false = [EmptyOperator(task_id=f"false_{i}") for i in [1, 2]]

chain(cond_true, *ds_true)
chain(cond_false, *ds_false)

可以将“短路”配置为遵守或忽略为下游任务定义的 触发规则。如果 ignore_downstream_trigger_rules 设置为 True(默认配置),则将跳过所有下游任务,而不考虑为任务定义的 trigger_rule。如果此参数设置为 False,则将跳过直接下游任务,但将遵守为其他后续下游任务指定的 trigger_rule。在此短路配置中,操作符假定直接下游任务是故意要跳过的,但可能不是其他后续任务。如果仅 部分 管道应短路,而不是短路任务之后的所有任务,则此配置特别有用。

在下面的例子中,请注意,“short_circuit”任务被配置为遵循下游触发规则。这意味着,虽然在“short_circuit”任务之后的任务会被跳过,因为装饰器函数返回 False,但“task_7”仍会执行,因为它被设置为在上游任务完成运行后执行,而不管状态如何(即 TriggerRule.ALL_DONE 触发规则)。

airflow/example_dags/example_short_circuit_decorator.py[源代码]

[task_1, task_2, task_3, task_4, task_5, task_6] = [
    EmptyOperator(task_id=f"task_{i}") for i in range(1, 7)
]

task_7 = EmptyOperator(task_id="task_7", trigger_rule=TriggerRule.ALL_DONE)

short_circuit = check_condition.override(task_id="short_circuit", ignore_downstream_trigger_rules=False)(
    condition=False
)

chain(task_1, [task_2, short_circuit], [task_3, task_4], [task_5, task_6], task_7)

airflow/example_dags/example_short_circuit_operator.py[源代码]

[task_1, task_2, task_3, task_4, task_5, task_6] = [
    EmptyOperator(task_id=f"task_{i}") for i in range(1, 7)
]

task_7 = EmptyOperator(task_id="task_7", trigger_rule=TriggerRule.ALL_DONE)

short_circuit = ShortCircuitOperator(
    task_id="short_circuit", ignore_downstream_trigger_rules=False, python_callable=lambda: False
)

chain(task_1, [task_2, short_circuit], [task_3, task_4], [task_5, task_6], task_7)

传递参数和模板化

参数传递和模板化选项与 PythonOperator 相同。

此条目是否有帮助?