Amazon EMR

Amazon EMR(之前称为 Amazon Elastic MapReduce)是一个托管集群平台,可简化在 AWS 上运行大数据框架(如 Apache Hadoop 和 Apache Spark)来处理和分析大量数据。使用这些框架和相关的开源项目,您可以处理用于分析目的和商业智能工作负载的数据。Amazon EMR 还允许您将大量数据移入和移出其他 AWS 数据存储和数据库,如 Amazon Simple Storage Service (Amazon S3) 和 Amazon DynamoDB。

先决条件任务

要使用这些运算符,您必须执行以下几项操作

运算符

注意

为了成功运行示例,您需要为 Amazon EMR 创建 IAM 服务角色(EMR_EC2_DefaultRoleEMR_DefaultRole)。您可以使用 AWS CLI 创建这些角色:aws emr create-default-roles

创建 EMR 作业流

您可以使用 EmrCreateJobFlowOperator 创建新的 EMR 作业流。集群将在完成步骤后自动终止。

默认行为是在集群启动后立即将 DAG 任务节点标记为成功(wait_policy=None)。可以通过使用不同的 wait_policy 来修改此行为。可用选项包括:

  • WaitPolicy.WAIT_FOR_COMPLETION - DAG 任务节点等待集群运行

  • WaitPolicy.WAIT_FOR_STEPS_COMPLETION - DAG 任务节点等待集群终止

可以通过传递 deferrable=True 作为参数,以可延迟模式运行此运算符。使用 deferrable 模式将释放工作槽,并提高 Airflow 集群内资源的利用率。但是,此模式需要 Airflow 触发器在您的部署中可用。

JobFlow 配置

要在 EMR 上创建作业流,您需要指定 EMR 集群的配置

tests/system/amazon/aws/example_emr.py

SPARK_STEPS = [
    {
        "Name": "calculate_pi",
        "ActionOnFailure": "CONTINUE",
        "HadoopJarStep": {
            "Jar": "command-runner.jar",
            "Args": ["/usr/lib/spark/bin/run-example", "SparkPi", "10"],
        },
    }
]

JOB_FLOW_OVERRIDES: dict[str, Any] = {
    "Name": "PiCalc",
    "ReleaseLabel": "emr-7.1.0",
    "Applications": [{"Name": "Spark"}],
    "Instances": {
        "InstanceGroups": [
            {
                "Name": "Primary node",
                "Market": "ON_DEMAND",
                "InstanceRole": "MASTER",
                "InstanceType": "m5.xlarge",
                "InstanceCount": 1,
            },
        ],
        # If the EMR steps complete too quickly the cluster will be torn down before the other system test
        # tasks have a chance to run (such as the modify cluster step, the addition of more EMR steps, etc).
        # Set KeepJobFlowAliveWhenNoSteps to False to avoid the cluster from being torn down prematurely.
        "KeepJobFlowAliveWhenNoSteps": True,
        "TerminationProtected": False,
    },
    "Steps": SPARK_STEPS,
    "JobFlowRole": "EMR_EC2_DefaultRole",
    "ServiceRole": "EMR_DefaultRole",
}

这里我们创建了一个 EMR 单节点集群 *PiCalc*。它只有一个步骤 *calculate_pi*,它使用 Spark 计算 Pi 的值。配置 'KeepJobFlowAliveWhenNoSteps': False 告诉集群在步骤完成后关闭。或者,可以使用没有 Steps 值的配置,并且可以使用 EmrAddStepsOperator 在以后添加步骤。请参阅下面的详细信息。

注意

使用像这样的 EMR API 启动的 EMR 集群默认情况下对所有用户不可见,因此您可能在 EMR 管理控制台中看不到该集群 - 您可以通过在 JOB_FLOW_OVERRIDES 字典的末尾添加 'VisibleToAllUsers': True 来更改此设置。

有关更多配置信息,请参阅 Boto3 EMR 客户端

创建作业流

在以下代码中,我们将使用上述配置创建一个新的作业流。

tests/system/amazon/aws/example_emr.py

create_job_flow = EmrCreateJobFlowOperator(
    task_id="create_job_flow",
    job_flow_overrides=JOB_FLOW_OVERRIDES,
)

向 EMR 作业流添加步骤

要向现有的 EMR 作业流添加步骤,您可以使用 EmrAddStepsOperator。可以通过传递 deferrable=True 作为参数,以可延迟模式运行此运算符。使用 deferrable 模式将释放工作槽,并提高 Airflow 集群内资源的利用率。但是,此模式需要 Airflow 触发器在您的部署中可用。

tests/system/amazon/aws/example_emr.py

add_steps = EmrAddStepsOperator(
    task_id="add_steps",
    job_flow_id=create_job_flow.output,
    steps=SPARK_STEPS,
    execution_role_arn=execution_role_arn,
)

终止 EMR 作业流

要终止 EMR 作业流,您可以使用 EmrTerminateJobFlowOperator。可以通过传递 deferrable=True 作为参数,以可延迟模式运行此运算符。使用 deferrable 模式将释放工作槽,并提高 Airflow 集群内资源的利用率。但是,此模式需要 Airflow 触发器在您的部署中可用。

tests/system/amazon/aws/example_emr.py

remove_cluster = EmrTerminateJobFlowOperator(
    task_id="remove_cluster",
    job_flow_id=create_job_flow.output,
)

修改 Amazon EMR 容器

要修改现有的 EMR 容器,您可以使用 EmrContainerSensor

tests/system/amazon/aws/example_emr.py

modify_cluster = EmrModifyClusterOperator(
    task_id="modify_cluster", cluster_id=create_job_flow.output, step_concurrency_level=1
)

启动 EMR Notebook 执行

您可以使用 EmrStartNotebookExecutionOperator 在附加到正在运行的集群的现有 Notebook 上启动 Notebook 执行。

tests/system/amazon/aws/example_emr_notebook_execution.py

start_execution = EmrStartNotebookExecutionOperator(
    task_id="start_execution",
    editor_id=editor_id,
    cluster_id=cluster_id,
    relative_path="EMR-System-Test.ipynb",
    service_role="EMR_Notebooks_DefaultRole",
)

停止 EMR Notebook 执行

您可以使用 EmrStopNotebookExecutionOperator 来停止正在运行的 Notebook 执行。

tests/system/amazon/aws/example_emr_notebook_execution.py

stop_execution = EmrStopNotebookExecutionOperator(
    task_id="stop_execution",
    notebook_execution_id=notebook_execution_id_1,
)

传感器

等待 EMR Notebook 执行状态

要监视 EMR Notebook 执行的状态,您可以使用 EmrNotebookExecutionSensor

tests/system/amazon/aws/example_emr_notebook_execution.py

wait_for_execution_start = EmrNotebookExecutionSensor(
    task_id="wait_for_execution_start",
    notebook_execution_id=notebook_execution_id_1,
    target_states={"RUNNING"},
    poke_interval=5,
)

等待 Amazon EMR 作业流状态

要监视 EMR 作业流的状态,您可以使用 EmrJobFlowSensor

tests/system/amazon/aws/example_emr.py

check_job_flow = EmrJobFlowSensor(task_id="check_job_flow", job_flow_id=create_job_flow.output)

等待 Amazon EMR 步骤状态

要监视 EMR 作业步骤的状态,您可以使用 EmrStepSensor

tests/system/amazon/aws/example_emr.py

wait_for_step = EmrStepSensor(
    task_id="wait_for_step",
    job_flow_id=create_job_flow.output,
    step_id=get_step_id(add_steps.output),
)

限制

Amazon EMR 的服务配额相对较低,可以在此处查看详细信息。因此,当使用此页面中列出的任何运算符和传感器时,您可能会遇到限制问题。为了规避此限制,请考虑自定义 AWS 连接配置以修改默认的 Boto3 重试策略。请参阅 AWS 连接配置文档

此条目是否有帮助?