airflow.providers.apache.livy.operators.livy

LivyOperator

封装 Apache Livy 批处理 REST API,允许将 Spark 应用提交到底层集群。

模块内容

class airflow.providers.apache.livy.operators.livy.LivyOperator(*, file, class_name=None, args=None, conf=None, jars=None, py_files=None, files=None, driver_memory=None, driver_cores=None, executor_memory=None, executor_cores=None, num_executors=None, archives=None, queue=None, name=None, proxy_user=None, livy_conn_id='livy_default', livy_conn_auth_type=None, livy_endpoint_prefix=None, polling_interval=0, extra_options=None, extra_headers=None, retry_args=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), openlineage_inject_parent_job_info=conf.getboolean('openlineage', 'spark_inject_parent_job_info', fallback=False), openlineage_inject_transport_info=conf.getboolean('openlineage', 'spark_inject_transport_info', fallback=False), **kwargs)[source]

Bases: airflow.providers.common.compat.sdk.BaseOperator

封装 Apache Livy 批处理 REST API,允许将 Spark 应用提交到底层集群。

参数:
  • file (str) – 要执行的应用文件路径(必需)。(支持模板)

  • class_name (str | None) – 应用的 Java/Spark 主类名。(支持模板)

  • args (collections.abc.Sequence[str | int | float] | None) – 应用的命令行参数。(支持模板)

  • jars (collections.abc.Sequence[str] | None) – 本会话使用的 jar 包。(支持模板)

  • py_files (collections.abc.Sequence[str] | None) – 本会话使用的 Python 文件。(支持模板)

  • files (collections.abc.Sequence[str] | None) – 本会话使用的其他文件。(支持模板)

  • driver_memory (str | None) – driver 进程使用的内存量。(支持模板)

  • driver_cores (int | str | None) – driver 进程使用的 CPU 核数。(支持模板)

  • executor_memory (str | None) – 每个 executor 进程使用的内存量。(支持模板)

  • executor_cores (int | str | None) – 每个 executor 使用的 CPU 核数。(支持模板)

  • num_executors (int | str | None) – 为该会话启动的 executor 数量。(支持模板)

  • archives (collections.abc.Sequence[str] | None) – 本会话使用的归档文件。(支持模板)

  • queue (str | None) – 提交应用的 YARN 队列名称。(支持模板)

  • name (str | None) – 本会话的名称。(支持模板)

  • conf (dict[Any, Any] | None) – Spark 配置属性。(支持模板)

  • proxy_user (str | None) – 运行作业时要冒充的用户。(支持模板)

  • livy_conn_id (str) – 对预定义 Livy 连接的引用。

  • livy_conn_auth_type (Any | None) – Livy 连接的身份验证类型。

  • polling_interval (int) – 轮询作业完成状态的时间间隔(秒)。不要使用 <= 0 的值。

  • extra_options (dict[str, Any] | None) – 选项字典,键为字符串,值取决于具体的选项。

  • extra_headers (dict[str, Any] | None) – 传递给 Livy HTTP 请求的头部字典。

  • retry_args (dict[str, Any] | None) – 定义重试行为的参数。参见 Tenacity 文档:https://github.com/jd/tenacity

  • deferrable (bool) – 在可延迟模式下运行算子

template_fields: collections.abc.Sequence[str] = ('spark_params',)[source]
template_fields_renderers[source]
spark_params[source]
retry_args = None[source]
deferrable[source]
openlineage_inject_parent_job_info[source]
openlineage_inject_transport_info[source]
property hook: airflow.providers.apache.livy.hooks.livy.LivyHook[source]

获取有效的 hook。

返回:

LivyHook

返回类型:

airflow.providers.apache.livy.hooks.livy.LivyHook

execute(context)[source]

在创建算子时派生。

执行任务的主要方法。Context 是与渲染 jinja 模板时使用的相同字典。

有关更多上下文,请参考 get_template_context。

poll_for_termination(batch_id)[source]

为批处理终止进行池化。

参数:

batch_id (int | str) – 要监控的批处理会话 ID。

on_kill()[source]

当任务实例被终止时,重写此方法以清理子进程。

在算子内部使用 threading、subprocess 或 multiprocessing 模块时,需要进行清理,否则会留下僵尸进程。

kill()[source]

删除当前批处理会话。

execute_complete(context, event)[source]

当触发器触发时执行 —— 立即返回。

依赖触发器抛出异常,否则默认认为执行成功。

此条目是否有帮助?