airflow.providers.standard.operators.python
¶
模块内容¶
类¶
执行 Python 可调用对象。 |
|
工作流可以在此任务执行后“分支”或遵循路径。 |
|
允许管道根据 |
|
在自动创建和销毁的 virtualenv 中运行函数。 |
|
工作流可以在此任务在虚拟环境中执行后“分支”或遵循路径。 |
|
在不会重新创建的 virtualenv 中运行函数。 |
|
工作流可以在此任务执行后“分支”或遵循路径。 |
函数¶
检索执行上下文字典,而不更改用户方法的签名。 |
属性¶
- class airflow.providers.standard.operators.python.PythonOperator(*, python_callable, op_args=None, op_kwargs=None, templates_dict=None, templates_exts=None, show_return_value_in_logs=True, **kwargs)[源代码]¶
基类:
airflow.models.baseoperator.BaseOperator
执行 Python 可调用对象。
另请参阅
有关如何使用此操作符的更多信息,请参阅指南:PythonOperator
在运行可调用对象时,Airflow 将传递一组可在函数中使用的关键字参数。这组 kwargs 与你在 jinja 模板中使用的内容完全对应。要使此功能正常工作,你需要在函数头中定义
**kwargs
,或者可以直接添加你想要获取的关键字参数 - 例如,使用以下代码,你的可调用对象将获取ti
上下文变量的值。使用显式参数
def my_python_callable(ti): pass
使用 kwargs
def my_python_callable(**kwargs): ti = kwargs["ti"]
- 参数
python_callable (Callable) – 对可调用对象的引用
op_args (collections.abc.Collection[Any] | None) – 调用你的可调用对象时将解包的位置参数列表
op_kwargs (collections.abc.Mapping[str, Any] | None) – 将在你的函数中解包的关键字参数字典
templates_dict (dict[str, Any] | None) – 一个字典,其中的值是模板,这些模板将在
__init__
和execute
之间由 Airflow 引擎进行模板化,并在应用模板后在可调用对象的上下文中可用。(模板化)templates_exts (collections.abc.Sequence[str] | None) – 在处理模板化字段时要解析的文件扩展名列表,例如
['.sql', '.hql']
show_return_value_in_logs (bool) – 一个布尔值,指示是否显示 return_value 日志。默认为 True,允许 return value 日志输出。可以将其设置为 False,以防止在返回大量数据(例如将大量 XCom 传输到 TaskAPI)时输出 return value 的日志。
- template_fields: collections.abc.Sequence[str] = ('templates_dict', 'op_args', 'op_kwargs')[源代码]¶
- shallow_copy_attrs: collections.abc.Sequence[str] = ('python_callable', 'op_kwargs')[源代码]¶
- class airflow.providers.standard.operators.python.BranchPythonOperator(*, python_callable, op_args=None, op_kwargs=None, templates_dict=None, templates_exts=None, show_return_value_in_logs=True, **kwargs)[源代码]¶
基类:
PythonOperator
,airflow.operators.branch.BranchMixIn
工作流可以在此任务执行后“分支”或遵循路径。
它继承自 PythonOperator,并期望一个 Python 函数,该函数返回单个 task_id、单个 task_group_id 或一个 task_ids 和/或 task_group_ids 列表以供后续执行。返回的 task_id(s) 和/或 task_group_id(s) 应指向 {self} 的直接下游任务或任务组。所有其他“分支”或直接下游任务都将被标记为
skipped
状态,以便这些路径无法继续。skipped
状态会向下传播,以允许 DAG 状态填充完整并推断 DAG 运行的状态。
- class airflow.providers.standard.operators.python.ShortCircuitOperator(*, ignore_downstream_trigger_rules=True, **kwargs)[source]¶
基类:
PythonOperator
,airflow.models.skipmixin.SkipMixin
允许管道根据
python_callable
的结果继续执行。ShortCircuitOperator 继承自 PythonOperator,并评估
python_callable
的结果。如果返回结果为 False 或假值,则管道将短路。下游任务将根据配置的短路模式标记为“skipped”状态。如果返回结果为 True 或真值,则下游任务将正常进行,并且将推送返回结果的XCom
。短路可以配置为尊重或忽略为下游任务设置的
trigger_rule
。如果ignore_downstream_trigger_rules
设置为 True(默认设置),则所有下游任务都将被跳过,而无需考虑为任务定义的trigger_rule
。但是,如果此参数设置为 False,则直接下游任务将被跳过,但将尊重其他后续下游任务的指定trigger_rule
。在此模式下,操作符假设直接下游任务被有目的地跳过,但可能不是其他后续任务。另请参阅
有关如何使用此操作符的更多信息,请查看以下指南:ShortCircuitOperator
- 参数
ignore_downstream_trigger_rules (bool) – 如果设置为 True,则此操作符任务的所有下游任务都将被跳过。这是默认行为。如果设置为 False,则直接下游任务将被跳过,但将尊重为所有其他下游任务定义的
trigger_rule
。
- class airflow.providers.standard.operators.python.PythonVirtualenvOperator(*, python_callable, requirements=None, python_version=None, serializer=None, system_site_packages=True, pip_install_options=None, op_args=None, op_kwargs=None, string_args=None, templates_dict=None, templates_exts=None, expect_airflow=True, skip_on_exit_code=None, index_urls=None, venv_cache_path=None, env_vars=None, inherit_env=True, use_airflow_context=False, **kwargs)[source]¶
基类:
_BasePythonVirtualenvOperator
在自动创建和销毁的 virtualenv 中运行函数。
该函数(有一些注意事项)必须使用 def 定义,而不是类的一部分。所有导入都必须在函数内部进行,并且不得引用范围外的变量。将有一个名为 virtualenv_string_args 的全局范围变量可用(由 string_args 填充)。此外,可以通过 op_args 和 op_kwargs 传递内容,并且可以使用返回值。请注意,如果您的虚拟环境运行的 Python 主要版本与 Airflow 不同,则不能使用返回值、op_args、op_kwargs 或使用任何通过插件提供给 Airflow 的宏。但是,可以使用 string_args。
另请参阅
有关如何使用此操作符的更多信息,请查看以下指南:PythonVirtualenvOperator
- 参数
python_callable (Callable) – 一个没有外部变量引用的 python 函数,使用 def 定义,它将在虚拟环境中运行。
requirements (None | collections.abc.Iterable[str] | str) – 一个需求字符串列表,或 pip 指定的(模板化)“需求文件”。
python_version (str | None) – 用于运行虚拟环境的 Python 版本。请注意,2 和 2.7 都是可接受的形式。
serializer (_SerializerTypeDef | None) –
用于序列化参数和结果的序列化器。它可以是以下之一
"pickle"
:(默认)使用 pickle 进行序列化。包含在 Python 标准库中。"cloudpickle"
:使用 cloudpickle 序列化更复杂的类型,这需要将 cloudpickle 包含在您的需求中。"dill"
:使用 dill 序列化更复杂的类型,这需要将 dill 包含在您的需求中。
system_site_packages (bool) – 是否在虚拟环境中包含 system_site_packages。有关更多信息,请参阅 virtualenv 文档。
pip_install_options (list[str] | None) – 安装需求时 pip 安装选项的列表。有关可用选项,请参阅“pip install -h”
op_args (collections.abc.Collection[Any] | None) – 传递给 python_callable 的位置参数列表。
op_kwargs (collections.abc.Mapping[str, Any] | None) – 传递给 python_callable 的关键字参数字典。
string_args (collections.abc.Iterable[str] | None) – 存在于全局变量 virtualenv_string_args 中的字符串,可在运行时作为 list[str] 提供给 python_callable。请注意,参数按换行符分割。
templates_dict (dict | None) – 一个字典,其中的值是将在
__init__
和execute
之间某个时间由 Airflow 引擎模板化的模板,并在应用模板后在可调用对象的上下文中可用templates_exts (list[str] | None) – 在处理模板化字段时要解析的文件扩展名列表,例如
['.sql', '.hql']
expect_airflow (bool) – 期望目标环境中安装了 Airflow。如果为 true,则当未安装 Airflow 时,操作符会发出警告,并且在启动时会尝试加载 Airflow 宏。
skip_on_exit_code (int | collections.abc.Container[int] | None) – 如果 python_callable 以此退出代码退出,则将任务保持在
skipped
状态(默认值:None)。如果设置为None
,则任何非零退出代码都将被视为失败。index_urls (None | collections.abc.Collection[str] | str) – 一个可选的索引 URL 列表,用于从中加载 Python 包。如果未提供,则将使用系统 pip 配置来获取包。
venv_cache_path (None | os.PathLike[str]) – 虚拟环境父文件夹的可选路径,虚拟环境将在此文件夹中缓存,创建一个子文件夹 venv-{hash},其中 hash 将被替换为需求的校验和。如果未提供,则虚拟环境将在每次执行时在临时文件夹中创建和删除。
env_vars (dict[str, str] | None) – 一个字典,其中包含在执行虚拟环境时要为虚拟环境设置的其他环境变量。
inherit_env (bool) – 是否在执行虚拟环境时继承当前环境变量。如果设置为
True
,则虚拟环境将继承父进程的环境变量 (os.environ
)。如果设置为False
,则虚拟环境将在干净的环境中执行。use_airflow_context (bool) – 是否将
get_current_context()
提供给 python_callable。尚未实现 - 等待 AIP-72 上下文序列化。
- class airflow.providers.standard.operators.python.BranchPythonVirtualenvOperator(*, python_callable, requirements=None, python_version=None, serializer=None, system_site_packages=True, pip_install_options=None, op_args=None, op_kwargs=None, string_args=None, templates_dict=None, templates_exts=None, expect_airflow=True, skip_on_exit_code=None, index_urls=None, venv_cache_path=None, env_vars=None, inherit_env=True, use_airflow_context=False, **kwargs)[source]¶
基类:
PythonVirtualenvOperator
,airflow.operators.branch.BranchMixIn
工作流可以在虚拟环境中执行此任务后“分支”或遵循路径。
它派生自 PythonVirtualenvOperator,并期望一个返回单个 task_id、单个 task_group_id 或要遵循的 task_id 和/或 task_group_id 列表的 Python 函数。返回的 task_id 和/或 task_group_id 应指向 {self} 的直接下游的任务或任务组。所有其他“分支”或直接下游任务都标记为
skipped
状态,以便这些路径无法前进。skipped
状态向下游传播,以允许 DAG 状态填充,并推断 DAG 运行的状态。另请参阅
有关如何使用此操作符的更多信息,请查看指南:BranchPythonVirtualenvOperator
- class airflow.providers.standard.operators.python.ExternalPythonOperator(*, python, python_callable, serializer=None, op_args=None, op_kwargs=None, string_args=None, templates_dict=None, templates_exts=None, expect_airflow=True, expect_pendulum=False, skip_on_exit_code=None, env_vars=None, inherit_env=True, use_airflow_context=False, **kwargs)[source]¶
基类:
_BasePythonVirtualenvOperator
在不会重新创建的 virtualenv 中运行函数。
在不创建虚拟环境的情况下按原样重用(有一定的注意事项)。
该函数必须使用 def 定义,而不是类的一部分。所有导入都必须在函数内部进行,并且不能引用范围外的变量。一个名为 virtualenv_string_args 的全局作用域变量将可用(由 string_args 填充)。此外,可以通过 op_args 和 op_kwargs 传递内容,并且可以使用返回值。请注意,如果您的虚拟环境运行的 Python 主要版本与 Airflow 不同,则无法使用返回值、op_args、op_kwargs,或使用通过插件提供给 Airflow 的任何宏。但是可以使用 string_args。
如果 Airflow 安装在外部环境中,其版本与操作符使用的版本不同,则操作符将失败。
另请参阅
有关如何使用此操作符的更多信息,请查看以下指南: ExternalPythonOperator
- 参数
python (str) – 指向虚拟环境内部 Python 二进制文件的完整路径字符串(特定于文件系统)。应该使用绝对路径(因此通常以“/”或“X:/”开头,具体取决于使用的文件系统/操作系统)。(位于
VENV/bin
文件夹中)。python_callable (Callable) – 一个没有外部变量引用的 python 函数,使用 def 定义,它将在虚拟环境中运行。
serializer (_SerializerTypeDef | None) –
用于序列化参数和结果的序列化器。它可以是以下之一
"pickle"
:(默认)使用 pickle 进行序列化。包含在 Python 标准库中。"cloudpickle"
:使用 cloudpickle 序列化更复杂的类型,这需要将 cloudpickle 包含在您的需求中。"dill"
:使用 dill 序列化更复杂的类型,这需要将 dill 包含在您的需求中。
op_args (collections.abc.Collection[Any] | None) – 传递给 python_callable 的位置参数列表。
op_kwargs (collections.abc.Mapping[str, Any] | None) – 传递给 python_callable 的关键字参数字典。
string_args (collections.abc.Iterable[str] | None) – 存在于全局变量 virtualenv_string_args 中的字符串,可在运行时作为 list[str] 提供给 python_callable。请注意,参数按换行符分割。
templates_dict (dict | None) – 一个字典,其中的值是将在
__init__
和execute
之间某个时间由 Airflow 引擎模板化的模板,并在应用模板后在可调用对象的上下文中可用templates_exts (list[str] | None) – 在处理模板化字段时要解析的文件扩展名列表,例如
['.sql', '.hql']
expect_airflow (bool) – 期望目标环境中安装了 Airflow。如果为 true,则当未安装 Airflow 时,操作符会发出警告,并且在启动时会尝试加载 Airflow 宏。
skip_on_exit_code (int | collections.abc.Container[int] | None) – 如果 python_callable 以此退出代码退出,则将任务保持在
skipped
状态(默认值:None)。如果设置为None
,则任何非零退出代码都将被视为失败。env_vars (dict[str, str] | None) – 一个字典,其中包含在执行虚拟环境时要为虚拟环境设置的其他环境变量。
inherit_env (bool) – 是否在执行虚拟环境时继承当前环境变量。如果设置为
True
,则虚拟环境将继承父进程的环境变量 (os.environ
)。如果设置为False
,则虚拟环境将在干净的环境中执行。use_airflow_context (bool) – 是否将
get_current_context()
提供给 python_callable。尚未实现 - 等待 AIP-72 上下文序列化。
- template_fields: collections.abc.Sequence[str][source]¶
- class airflow.providers.standard.operators.python.BranchExternalPythonOperator(*, python, python_callable, serializer=None, op_args=None, op_kwargs=None, string_args=None, templates_dict=None, templates_exts=None, expect_airflow=True, expect_pendulum=False, skip_on_exit_code=None, env_vars=None, inherit_env=True, use_airflow_context=False, **kwargs)[source]¶
基类:
ExternalPythonOperator
,airflow.operators.branch.BranchMixIn
工作流可以在此任务执行后“分支”或遵循路径。
扩展了 ExternalPythonOperator,因此期望获得 Python:应该使用的虚拟环境(位于
VENV/bin
文件夹中)。应该是绝对路径,因此它可以在单独的虚拟环境中运行,类似于 ExternalPythonOperator。另请参阅
有关如何使用此操作符的更多信息,请查看以下指南: BranchExternalPythonOperator
- airflow.providers.standard.operators.python.get_current_context()[source]¶
检索执行上下文字典,而不更改用户方法的签名。
这是检索执行上下文字典的最简单方法。
旧样式
def my_task(**context): ti = context["ti"]
新样式
from airflow.providers.standard.operators.python import get_current_context def my_task(): context = get_current_context() ti = context["ti"]
只有在操作符开始执行后调用此方法时,当前上下文才会有值。