airflow.providers.amazon.aws.sensors.emr

模块内容

EmrBaseSensor

包含 EMR 的通用传感器行为。

EmrServerlessJobSensor

轮询作业运行的状态,直到它达到终端状态;如果作业运行失败则失败。

EmrServerlessApplicationSensor

轮询应用程序的状态,直到它达到终端状态;如果应用程序失败则失败。

EmrContainerSensor

轮询作业运行的状态,直到它达到终端状态;如果作业运行失败则失败。

EmrNotebookExecutionSensor

轮询 EMR 笔记本,直到它达到任何目标状态;失败时引发 AirflowException。

EmrJobFlowSensor

轮询 EMR JobFlow 集群,直到它达到任何目标状态;失败时引发 AirflowException。

EmrStepSensor

轮询步骤的状态,直到它达到任何目标状态;失败时引发 AirflowException。

class airflow.providers.amazon.aws.sensors.emr.EmrBaseSensor(*, aws_conn_id='aws_default', **kwargs)[源代码]

基类: airflow.sensors.base.BaseSensorOperator

包含 EMR 的通用传感器行为。

子类应实现以下方法
  • get_emr_response()

  • state_from_response()

  • failure_message_from_response()

子类应设置 target_statesfailed_states 字段。

参数

aws_conn_id (str | None) – 用于 AWS 凭证的 Airflow 连接。如果此项为 None 或为空,则使用默认的 boto3 行为。如果在分布式模式下运行 Airflow 且 aws_conn_id 为 None 或为空,则将使用默认的 boto3 配置(并且必须在每个工作节点上维护)。

ui_color = '#66c3ff'[源代码]
hook()[源代码]
poke(context)[源代码]

在派生此类时覆盖。

abstract get_emr_response(context)[源代码]

使用 boto3 发出 API 调用并获取响应。

返回

响应

返回类型

dict[str, Any]

abstract static state_from_response(response)[源代码]

从 boto3 响应中获取状态。

参数

response (dict[str, Any]) – 来自 AWS API 的响应

返回

状态

返回类型

str

abstract static failure_message_from_response(response)[源代码]

从 boto3 响应中获取状态。

参数

response (dict[str, Any]) – 来自 AWS API 的响应

返回

失败消息

返回类型

str | None

class airflow.providers.amazon.aws.sensors.emr.EmrServerlessJobSensor(*, application_id, job_run_id, target_states=frozenset(EmrServerlessHook.JOB_SUCCESS_STATES), aws_conn_id='aws_default', **kwargs)[源代码]

基类: airflow.sensors.base.BaseSensorOperator

轮询作业运行的状态,直到它达到终端状态;如果作业运行失败则失败。

另请参阅

有关如何使用此传感器的更多信息,请查看指南: 等待 EMR Serverless 作业状态

参数
  • application_id (str) – 要检查其状态的 application_id

  • job_run_id (str) – 要检查其状态的 job_run_id

  • target_states (set | frozenset) – 要等待的一组状态,默认为 ‘SUCCESS’

  • aws_conn_id (str | None) – 要使用的 aws 连接,默认为 ‘aws_default’。如果此项为 None 或为空,则使用默认的 boto3 行为。如果在分布式模式下运行 Airflow 且 aws_conn_id 为 None 或为空,则将使用默认的 boto3 配置(并且必须在每个工作节点上维护)。

template_fields: collections.abc.Sequence[str] = ('application_id', 'job_run_id')[源代码]
poke(context)[源代码]

在派生此类时覆盖。

hook()[source]

创建并返回一个 EmrServerlessHook。

static failure_message_from_response(response)[source]

从响应字典中获取失败消息。

参数

response (dict[str, Any]) – 来自 AWS API 的响应

返回

失败消息

返回类型

str | None

class airflow.providers.amazon.aws.sensors.emr.EmrServerlessApplicationSensor(*, application_id, target_states=frozenset(EmrServerlessHook.APPLICATION_SUCCESS_STATES), aws_conn_id='aws_default', **kwargs)[source]

基类: airflow.sensors.base.BaseSensorOperator

轮询应用程序的状态,直到它达到终端状态;如果应用程序失败则失败。

另请参阅

有关如何使用此传感器的更多信息,请查看指南:等待 EMR Serverless 应用程序状态

参数
  • application_id (str) – 要检查其状态的 application_id

  • target_states (set | frozenset) – 要等待的状态集,默认为 {‘CREATED’, ‘STARTED’}

  • aws_conn_id (str | None) – 要使用的 aws 连接,默认为 ‘aws_default’。如果此项为 None 或为空,则使用默认的 boto3 行为。如果在分布式模式下运行 Airflow 且 aws_conn_id 为 None 或为空,则将使用默认的 boto3 配置(并且必须在每个工作节点上维护)。

template_fields: collections.abc.Sequence[str] = ('application_id',)[source]
poke(context)[source]

在派生此类时覆盖。

hook()[source]

创建并返回一个 EmrServerlessHook。

static failure_message_from_response(response)[source]

从响应字典中获取失败消息。

参数

response (dict[str, Any]) – 来自 AWS API 的响应

返回

失败消息

返回类型

str | None

class airflow.providers.amazon.aws.sensors.emr.EmrContainerSensor(*, virtual_cluster_id, job_id, max_retries=None, aws_conn_id='aws_default', poll_interval=10, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]

基类: airflow.sensors.base.BaseSensorOperator

轮询作业运行的状态,直到它达到终端状态;如果作业运行失败则失败。

另请参阅

有关如何使用此传感器的更多信息,请查看指南:等待 Amazon EMR 虚拟集群作业

参数
  • job_id (str) – 要检查状态的 job_id

  • max_retries (int | None) – 返回当前状态之前轮询查询状态的次数,默认为 None

  • aws_conn_id (str | None) – 要使用的 aws 连接,默认为 ‘aws_default’。如果此项为 None 或为空,则使用默认的 boto3 行为。如果在分布式模式下运行 Airflow 且 aws_conn_id 为 None 或为空,则将使用默认的 boto3 配置(并且必须在每个工作节点上维护)。

  • poll_interval (int) – 两次连续调用以检查 athena 上的查询状态之间等待的时间(以秒为单位),默认为 10

  • deferrable (bool) – 在可延迟模式下运行传感器。

INTERMEDIATE_STATES = ('PENDING', 'SUBMITTED', 'RUNNING')[source]
FAILURE_STATES = ('FAILED', 'CANCELLED', 'CANCEL_PENDING')[source]
SUCCESS_STATES = ('COMPLETED',)[source]
template_fields: collections.abc.Sequence[str] = ('virtual_cluster_id', 'job_id')[source]
template_ext: collections.abc.Sequence[str] = ()[source]
ui_color = '#66c3ff'[source]
hook()[source]
poke(context)[source]

在派生此类时覆盖。

execute(context)[source]

在创建运算符时派生。

上下文与渲染 Jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

execute_complete(context, event=None)[source]
class airflow.providers.amazon.aws.sensors.emr.EmrNotebookExecutionSensor(notebook_execution_id, target_states=None, failed_states=None, **kwargs)[source]

基类: EmrBaseSensor

轮询 EMR 笔记本,直到它达到任何目标状态;失败时引发 AirflowException。

另请参阅

有关如何使用此传感器的更多信息,请查看指南: 等待 EMR notebook 执行状态

参数

notebook_execution_id (str) – 要轮询的 notebook 执行的唯一 ID。

Target_states

传感器将等待执行达到的状态。默认的 target_states 为 FINISHED

Failed_states

如果执行达到任何 failed_states,传感器将失败。默认的 failed_states 为 FAILED

template_fields: collections.abc.Sequence[str] = ('notebook_execution_id',)[source]
FAILURE_STATES[source]
COMPLETED_STATES[source]
get_emr_response(context)[source]

使用 boto3 发出 API 调用并获取响应。

返回

响应

返回类型

dict[str, Any]

static state_from_response(response)[source]

使用 boto3 进行 API 调用并获取集群级详细信息。

返回

响应

返回类型

str

static failure_message_from_response(response)[source]

从响应字典中获取失败消息。

参数

response (dict[str, Any]) – 来自 AWS API 的响应

返回

失败消息

返回类型

str | None

class airflow.providers.amazon.aws.sensors.emr.EmrJobFlowSensor(*, job_flow_id, target_states=None, failed_states=None, max_attempts=60, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]

基类: EmrBaseSensor

轮询 EMR JobFlow 集群,直到它达到任何目标状态;失败时引发 AirflowException。

使用默认的目标状态,传感器会等待集群终止。当 target_states 设置为 ['RUNNING', 'WAITING'] 时,传感器会等待作业流就绪(在 'STARTING' 和 'BOOTSTRAPPING' 状态之后)

另请参阅

有关如何使用此传感器的更多信息,请查看指南: 等待 Amazon EMR 作业流状态

参数
  • job_flow_id (str) – 要检查状态的 job_flow_id

  • target_states (collections.abc.Iterable[str] | None) – 目标状态,传感器会等待作业流达到这些状态中的任何一个。在可延迟模式下,它将运行直到达到终端状态。

  • failed_states (collections.abc.Iterable[str] | None) – 失败状态,当作业流达到这些状态中的任何一个时,传感器将失败

  • max_attempts (int) – 失败前的最大尝试次数

  • deferrable (bool) – 在可延迟模式下运行传感器。

template_fields: collections.abc.Sequence[str] = ('job_flow_id', 'target_states', 'failed_states')[source]
template_ext: collections.abc.Sequence[str] = ()[source]
get_emr_response(context)[source]

使用 boto3 进行 API 调用并获取集群级详细信息。

返回

响应

返回类型

dict[str, Any]

static state_from_response(response)[source]

从响应字典中获取状态。

参数

response (dict[str, Any]) – 来自 AWS API 的响应

返回

集群的当前状态

返回类型

str

static failure_message_from_response(response)[source]

从响应字典中获取失败消息。

参数

response (dict[str, Any]) – 来自 AWS API 的响应

返回

失败消息

返回类型

str | None

execute(context)[source]

在创建运算符时派生。

上下文与渲染 Jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

execute_complete(context, event=None)[source]
class airflow.providers.amazon.aws.sensors.emr.EmrStepSensor(*, job_flow_id, step_id, target_states=None, failed_states=None, max_attempts=60, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]

基类: EmrBaseSensor

轮询步骤的状态,直到它达到任何目标状态;失败时引发 AirflowException。

使用默认目标状态,传感器会等待步骤完成。

另请参阅

有关如何使用此传感器的更多信息,请查看指南:等待 Amazon EMR 步骤状态

参数
  • job_flow_id (str) – 包含要检查状态的步骤的 job_flow_id

  • step_id (str) – 要检查状态的步骤

  • target_states (collections.abc.Iterable[str] | None) – 目标状态,传感器会等待直到步骤达到这些状态中的任何一个。如果是可延迟的传感器,它将等待达到最终状态

  • failed_states (collections.abc.Iterable[str] | None) – 失败状态,当步骤达到这些状态中的任何一个时,传感器将失败

  • max_attempts (int) – 失败前的最大尝试次数

  • deferrable (bool) – 在可延迟模式下运行传感器。

template_fields: collections.abc.Sequence[str] = ('job_flow_id', 'step_id', 'target_states', 'failed_states')[source]
template_ext: collections.abc.Sequence[str] = ()[source]
get_emr_response(context)[source]

使用 boto3 进行 API 调用并获取有关集群步骤的详细信息。

返回

响应

返回类型

dict[str, Any]

static state_from_response(response)[source]

从响应字典中获取状态。

参数

response (dict[str, Any]) – 来自 AWS API 的响应

返回

集群步骤的执行状态

返回类型

str

static failure_message_from_response(response)[source]

从响应字典中获取失败消息。

参数

response (dict[str, Any]) – 来自 AWS API 的响应

返回

失败消息

返回类型

str | None

execute(context)[source]

在创建运算符时派生。

上下文与渲染 Jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

execute_complete(context, event=None)[source]

此条目是否有帮助?