airflow.providers.amazon.aws.hooks.emr

模块内容

EmrHook

与 Amazon Elastic MapReduce 服务 (EMR) 交互。

EmrServerlessHook

与 Amazon EMR Serverless 交互。

EmrContainerHook

与 Amazon EMR Containers (Amazon EMR on EKS) 交互。

class airflow.providers.amazon.aws.hooks.emr.EmrHook(emr_conn_id=default_conn_name, *args, **kwargs)[源代码]

基类:airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook

与 Amazon Elastic MapReduce 服务 (EMR) 交互。

提供 boto3.client("emr") 的厚封装。

参数

emr_conn_id (str | None) – Amazon Elastic MapReduce 连接。 此属性仅在使用 airflow.providers.amazon.aws.hooks.emr.EmrHook.create_job_flow() 时才需要。

可以指定其他参数(如 aws_conn_id),并将其传递给底层 AwsBaseHook。

另请参阅

AwsBaseHook

conn_name_attr = 'emr_conn_id'[源代码]
default_conn_name = 'emr_default'[源代码]
conn_type = 'emr'[源代码]
hook_name = 'Amazon Elastic MapReduce'[源代码]
get_cluster_id_by_name(emr_cluster_name, cluster_states)[源代码]

获取具有给定名称和(可选)状态的 EMR 集群的 ID;仅当找到单个 ID 时才返回。

参数
  • emr_cluster_name (str) – 要查找的集群的名称

  • cluster_states (list[str]) – 要查找的集群的状态

返回

EMR 集群的 ID

返回类型

str | None

create_job_flow(job_flow_overrides)[源代码]

创建并开始运行新的集群(作业流)。

此方法使用 EmrHook.emr_conn_id 来接收初始的 Amazon EMR 集群配置。如果 EmrHook.emr_conn_id 为空或连接不存在,则使用空的初始配置。

参数

job_flow_overrides (dict[str, Any]) – 用于覆盖初始 Amazon EMR 配置集群中的参数。生成的配置将用于 EMR.Client.run_job_flow()

add_job_flow_steps(job_flow_id, steps=None, wait_for_completion=False, waiter_delay=None, waiter_max_attempts=None, execution_role_arn=None)[源代码]

向正在运行的集群添加新的步骤。

参数
  • job_flow_id (str) – 要添加步骤的作业流的 ID

  • steps (list[dict] | str | None) – 作业流要执行的步骤列表

  • wait_for_completion (bool) – 如果为 True,则等待步骤完成。 默认为 False

  • waiter_delay (int | None) – 两次尝试之间等待的时间(以秒为单位)。默认为 5

  • waiter_max_attempts (int | None) – 要进行的最大尝试次数。默认为 100

  • execution_role_arn (str | None) – 集群上步骤的运行时角色的 ARN。

test_connection()[源代码]

返回用于测试 Amazon Elastic MapReduce 连接(无法测试)的失败状态。

我们需要覆盖此方法,因为此 hook 基于 AwsGenericHook,否则它会尝试使用默认 boto3 凭据策略来测试与 AWS STS 的连接。

classmethod get_ui_field_behaviour()[源代码]

为 Amazon Elastic MapReduce 连接返回自定义 UI 字段行为。

class airflow.providers.amazon.aws.hooks.emr.EmrServerlessHook(*args, **kwargs)[source]

基类:airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook

与 Amazon EMR Serverless 交互。

提供围绕 boto3.client("emr-serverless") 的轻量级包装。

可以指定其他参数(如 aws_conn_id),并将其传递给底层 AwsBaseHook。

JOB_INTERMEDIATE_STATES[source]
JOB_FAILURE_STATES[source]
JOB_SUCCESS_STATES[source]
JOB_TERMINAL_STATES[source]
APPLICATION_INTERMEDIATE_STATES[source]
APPLICATION_FAILURE_STATES[source]
APPLICATION_SUCCESS_STATES[source]
cancel_running_jobs(application_id, waiter_config=None, wait_for_completion=True)[source]

取消处于中间状态的作业,并返回已取消的作业数。

如果 wait_for_completion 为 True,则该方法将等待所有作业都取消后再返回。

注意:如果在此操作正在进行时触发了新作业,则会超时并返回错误。

class airflow.providers.amazon.aws.hooks.emr.EmrContainerHook(*args, virtual_cluster_id=None, **kwargs)[source]

基类:airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook

与 Amazon EMR Containers (Amazon EMR on EKS) 交互。

提供围绕 boto3.client("emr-containers") 的厚包装。

参数

virtual_cluster_id (str | None) – EMR on EKS 虚拟集群的集群 ID

可以指定其他参数(如 aws_conn_id),并将其传递给底层 AwsBaseHook。

INTERMEDIATE_STATES = ('PENDING', 'SUBMITTED', 'RUNNING')[source]
FAILURE_STATES = ('FAILED', 'CANCELLED', 'CANCEL_PENDING')[source]
SUCCESS_STATES = ('COMPLETED',)[source]
TERMINAL_STATES = ('COMPLETED', 'FAILED', 'CANCELLED', 'CANCEL_PENDING')[source]
create_emr_on_eks_cluster(virtual_cluster_name, eks_cluster_name, eks_namespace, tags=None)[source]
submit_job(name, execution_role_arn, release_label, job_driver, configuration_overrides=None, client_request_token=None, tags=None, retry_max_attempts=None)[source]

向 EMR Containers API 提交作业并返回作业 ID。

作业运行是一个工作单元,例如 Spark jar、PySpark 脚本或提交到 Amazon EMR on EKS 的 SparkSQL 查询。

参数
  • name (str) – 作业运行的名称。

  • execution_role_arn (str) – 与作业运行关联的 IAM 角色 ARN。

  • release_label (str) – 用于作业运行的 Amazon EMR 发布版本。

  • job_driver (dict) – 作业配置详细信息,例如 Spark 作业参数。

  • configuration_overrides (dict | None) – 作业运行的配置覆盖,特别是应用程序配置或监控配置。

  • client_request_token (str | None) – 作业运行请求的客户端幂等令牌。如果要指定唯一 ID 以防止启动两个作业,请使用此令牌。

  • tags (dict | None) – 分配给作业运行的标签。

  • retry_max_attempts (int | None) – 作业驱动程序的最大尝试次数。

返回

作业运行请求的 ID。

返回类型

str

get_job_failure_reason(job_id)[source]

获取作业失败的原因(例如,错误消息)。返回 None 或原因字符串。

参数

job_id (str) – 作业运行请求的 ID。

check_query_status(job_id)[source]

获取已提交的作业运行的状态。返回 None 或有效的查询状态之一。

参数

job_id (str) – 作业运行请求的 ID。

poll_query_status(job_id, poll_interval=30, max_polling_attempts=None)[source]

轮询已提交的作业运行状态,直到查询状态达到最终状态;返回最终状态。

参数
  • job_id (str) – 作业运行请求的 ID。

  • poll_interval ( int) – 在调用以检查 EMR 上的查询状态之间等待的时间(以秒为单位)

  • max_polling_attempts ( int | None) – 在函数退出之前轮询查询状态的次数

stop_query(job_id)[source]

取消已提交的 job_run。

参数

job_id ( str) – 要取消的作业运行的 ID。

此条目是否有帮助?