Amazon EMR

Amazon EMR(以前称为 Amazon Elastic MapReduce)是一个托管集群平台,可简化在 AWS 上运行大数据框架(如 Apache Hadoop 和 Apache Spark)以处理和分析海量数据。使用这些框架和相关的开源项目,您可以处理数据以进行分析目的和商业智能工作负载。Amazon EMR 还允许您将大量数据转换并移入和移出其他 AWS 数据存储和数据库,例如 Amazon Simple Storage Service (Amazon S3) 和 Amazon DynamoDB。

先决任务

要使用这些操作符,您必须执行以下操作

操作符

注意

为了成功运行示例,您需要为 Amazon EMR 创建 IAM 服务角色(EMR_EC2_DefaultRoleEMR_DefaultRole)。您可以使用 AWS CLI 创建这些角色: aws emr create-default-roles

创建 EMR 作业流

您可以使用 EmrCreateJobFlowOperator 创建新的 EMR 作业流。完成步骤后,集群将自动终止。此操作符可以通过传递 deferrable=True 作为参数以可延迟模式运行。使用 deferrable 模式将释放工作程序插槽,从而有效利用 Airflow 集群内的资源。但是,此模式需要您的部署中提供 Airflow 触发器。

作业流配置

要在 EMR 上创建作业流,您需要指定 EMR 集群的配置

tests/system/providers/amazon/aws/example_emr.py[源代码]

SPARK_STEPS = [
    {
        "Name": "calculate_pi",
        "ActionOnFailure": "CONTINUE",
        "HadoopJarStep": {
            "Jar": "command-runner.jar",
            "Args": ["/usr/lib/spark/bin/run-example", "SparkPi", "10"],
        },
    }
]

JOB_FLOW_OVERRIDES: dict[str, Any] = {
    "Name": "PiCalc",
    "ReleaseLabel": "emr-6.7.0",
    "Applications": [{"Name": "Spark"}],
    "Instances": {
        "InstanceGroups": [
            {
                "Name": "Primary node",
                "Market": "ON_DEMAND",
                "InstanceRole": "MASTER",
                "InstanceType": "m5.xlarge",
                "InstanceCount": 1,
            },
        ],
        "KeepJobFlowAliveWhenNoSteps": False,
        "TerminationProtected": False,
    },
    "Steps": SPARK_STEPS,
    "JobFlowRole": "EMR_EC2_DefaultRole",
    "ServiceRole": "EMR_DefaultRole",
}

在此,我们创建一个 EMR 单节点集群PiCalc。它只有一个步骤calculate_pi,使用 Spark 计算Pi 的值。配置 'KeepJobFlowAliveWhenNoSteps': False 告诉集群在步骤完成后关闭。或者,可以使用没有 Steps 值的配置,并且可以在以后使用 EmrAddStepsOperator 添加步骤。请参阅以下详细信息。

注意

使用 EMR API 启动的 EMR 集群(如本集群)默认情况下对所有用户不可见,因此您可能在 EMR 管理控制台中看不到该集群 - 您可以通过在 JOB_FLOW_OVERRIDES 字典的末尾添加 'VisibleToAllUsers': True 来更改此设置。

有关更多配置信息,请参阅 Boto3 EMR 客户端

创建作业流

在以下代码中,我们使用上面解释的配置创建新的作业流。

tests/system/providers/amazon/aws/example_emr.py[源代码]

create_job_flow = EmrCreateJobFlowOperator(
    task_id="create_job_flow",
    job_flow_overrides=JOB_FLOW_OVERRIDES,
)

向 EMR 作业流添加步骤

要向现有的 EMR 作业流添加步骤,可以使用 EmrAddStepsOperator。此操作员可以通过传递 deferrable=True 作为参数来以可延迟模式运行。使用 deferrable 模式将释放工作程序槽,从而有效利用 Airflow 集群中的资源。但是,此模式需要在部署中提供 Airflow 触发器。

tests/system/providers/amazon/aws/example_emr.py[源代码]

add_steps = EmrAddStepsOperator(
    task_id="add_steps",
    job_flow_id=create_job_flow.output,
    steps=SPARK_STEPS,
    execution_role_arn=execution_role_arn,
)

终止 EMR 作业流

要终止 EMR 作业流,可以使用 EmrTerminateJobFlowOperator。此操作员可以通过传递 deferrable=True 作为参数来以可延迟模式运行。使用 deferrable 模式将释放工作程序槽,从而有效利用 Airflow 集群中的资源。但是,此模式需要在部署中提供 Airflow 触发器。

tests/system/providers/amazon/aws/example_emr.py[源代码]

remove_cluster = EmrTerminateJobFlowOperator(
    task_id="remove_cluster",
    job_flow_id=create_job_flow.output,
)

修改 Amazon EMR 容器

要修改现有的 EMR 容器,可以使用 EmrContainerSensor

tests/system/providers/amazon/aws/example_emr.py[源代码]

modify_cluster = EmrModifyClusterOperator(
    task_id="modify_cluster", cluster_id=create_job_flow.output, step_concurrency_level=1
)

启动 EMR 笔记本执行

您可以使用 EmrStartNotebookExecutionOperator 在连接到正在运行的集群的现有笔记本上启动笔记本执行。

tests/system/providers/amazon/aws/example_emr_notebook_execution.py[源代码]

start_execution = EmrStartNotebookExecutionOperator(
    task_id="start_execution",
    editor_id=editor_id,
    cluster_id=cluster_id,
    relative_path="EMR-System-Test.ipynb",
    service_role="EMR_Notebooks_DefaultRole",
)

停止 EMR 笔记本执行

你可以使用 EmrStopNotebookExecutionOperator 停止正在运行的笔记本执行。

tests/system/providers/amazon/aws/example_emr_notebook_execution.py[源代码]

stop_execution = EmrStopNotebookExecutionOperator(
    task_id="stop_execution",
    notebook_execution_id=notebook_execution_id_1,
)

传感器

等待 EMR 笔记本执行状态

要监视 EMR 笔记本执行的状态,可以使用 EmrNotebookExecutionSensor

tests/system/providers/amazon/aws/example_emr_notebook_execution.py[源代码]

wait_for_execution_start = EmrNotebookExecutionSensor(
    task_id="wait_for_execution_start",
    notebook_execution_id=notebook_execution_id_1,
    target_states={"RUNNING"},
    poke_interval=5,
)

等待 Amazon EMR 作业流状态

要监视 EMR 作业流的状态,可以使用 EmrJobFlowSensor

tests/system/providers/amazon/aws/example_emr.py[源代码]

check_job_flow = EmrJobFlowSensor(task_id="check_job_flow", job_flow_id=create_job_flow.output)

等待 Amazon EMR 步骤状态

要监视 EMR 作业步骤的状态,可以使用 EmrStepSensor

tests/system/providers/amazon/aws/example_emr.py[源代码]

wait_for_step = EmrStepSensor(
    task_id="wait_for_step",
    job_flow_id=create_job_flow.output,
    step_id=get_step_id(add_steps.output),
)

限制

Amazon EMR 的服务配额相对较低,可以在 此处 详细查看。因此,在使用此页面中列出的任何运算符和传感器时,你可能会遇到限制问题。为了规避此限制,请考虑自定义 AWS 连接配置以修改默认的 Boto3 重试策略。 请参阅 AWS 连接配置文档

此条目是否有用?