airflow.providers.amazon.aws.hooks.emr
¶
模块内容¶
类¶
与 Amazon Elastic MapReduce 服务 (EMR) 交互。 |
|
与 Amazon EMR Serverless 交互。 |
|
与 Amazon EMR Containers (Amazon EMR on EKS) 交互。 |
- class airflow.providers.amazon.aws.hooks.emr.EmrHook(emr_conn_id=default_conn_name, *args, **kwargs)[源代码]¶
基类:
airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook
与 Amazon Elastic MapReduce 服务 (EMR) 交互。
提供
boto3.client("emr")
的厚封装。- 参数
emr_conn_id (str | None) – Amazon Elastic MapReduce 连接。 此属性仅在使用
airflow.providers.amazon.aws.hooks.emr.EmrHook.create_job_flow()
时才需要。
可以指定其他参数(如
aws_conn_id
),并将其传递给底层 AwsBaseHook。另请参阅
- get_cluster_id_by_name(emr_cluster_name, cluster_states)[源代码]¶
获取具有给定名称和(可选)状态的 EMR 集群的 ID;仅当找到单个 ID 时才返回。
- create_job_flow(job_flow_overrides)[源代码]¶
创建并开始运行新的集群(作业流)。
此方法使用
EmrHook.emr_conn_id
来接收初始的 Amazon EMR 集群配置。如果EmrHook.emr_conn_id
为空或连接不存在,则使用空的初始配置。- 参数
job_flow_overrides (dict[str, Any]) – 用于覆盖初始 Amazon EMR 配置集群中的参数。生成的配置将用于
EMR.Client.run_job_flow()
。
- add_job_flow_steps(job_flow_id, steps=None, wait_for_completion=False, waiter_delay=None, waiter_max_attempts=None, execution_role_arn=None)[源代码]¶
向正在运行的集群添加新的步骤。
- test_connection()[源代码]¶
返回用于测试 Amazon Elastic MapReduce 连接(无法测试)的失败状态。
我们需要覆盖此方法,因为此 hook 基于
AwsGenericHook
,否则它会尝试使用默认 boto3 凭据策略来测试与 AWS STS 的连接。
- class airflow.providers.amazon.aws.hooks.emr.EmrServerlessHook(*args, **kwargs)[source]¶
基类:
airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook
与 Amazon EMR Serverless 交互。
提供围绕
boto3.client("emr-serverless")
的轻量级包装。可以指定其他参数(如
aws_conn_id
),并将其传递给底层 AwsBaseHook。
- class airflow.providers.amazon.aws.hooks.emr.EmrContainerHook(*args, virtual_cluster_id=None, **kwargs)[source]¶
基类:
airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook
与 Amazon EMR Containers (Amazon EMR on EKS) 交互。
提供围绕
boto3.client("emr-containers")
的厚包装。- 参数
virtual_cluster_id (str | None) – EMR on EKS 虚拟集群的集群 ID
可以指定其他参数(如
aws_conn_id
),并将其传递给底层 AwsBaseHook。- create_emr_on_eks_cluster(virtual_cluster_name, eks_cluster_name, eks_namespace, tags=None)[source]¶
- submit_job(name, execution_role_arn, release_label, job_driver, configuration_overrides=None, client_request_token=None, tags=None, retry_max_attempts=None)[source]¶
向 EMR Containers API 提交作业并返回作业 ID。
作业运行是一个工作单元,例如 Spark jar、PySpark 脚本或提交到 Amazon EMR on EKS 的 SparkSQL 查询。
- 参数
name (str) – 作业运行的名称。
execution_role_arn (str) – 与作业运行关联的 IAM 角色 ARN。
release_label (str) – 用于作业运行的 Amazon EMR 发布版本。
job_driver (dict) – 作业配置详细信息,例如 Spark 作业参数。
configuration_overrides (dict | None) – 作业运行的配置覆盖,特别是应用程序配置或监控配置。
client_request_token (str | None) – 作业运行请求的客户端幂等令牌。如果要指定唯一 ID 以防止启动两个作业,请使用此令牌。
tags (dict | None) – 分配给作业运行的标签。
retry_max_attempts (int | None) – 作业驱动程序的最大尝试次数。
- 返回
作业运行请求的 ID。
- 返回类型
- get_job_failure_reason(job_id)[source]¶
获取作业失败的原因(例如,错误消息)。返回 None 或原因字符串。
- 参数
job_id (str) – 作业运行请求的 ID。