Amazon EMR¶
Amazon EMR(以前称为 Amazon Elastic MapReduce)是一个托管集群平台,可简化在 AWS 上运行大数据框架(如 Apache Hadoop 和 Apache Spark)以处理和分析海量数据。使用这些框架和相关的开源项目,您可以处理数据以进行分析目的和商业智能工作负载。Amazon EMR 还允许您将大量数据转换并移入和移出其他 AWS 数据存储和数据库,例如 Amazon Simple Storage Service (Amazon S3) 和 Amazon DynamoDB。
先决任务¶
要使用这些操作符,您必须执行以下操作
通过 pip 安装 API 库。
pip install 'apache-airflow[amazon]'详细信息请参阅 安装 Airflow™
设置连接.
操作符¶
注意
为了成功运行示例,您需要为 Amazon EMR 创建 IAM 服务角色(EMR_EC2_DefaultRole
和 EMR_DefaultRole
)。您可以使用 AWS CLI 创建这些角色: aws emr create-default-roles
。
创建 EMR 作业流¶
您可以使用 EmrCreateJobFlowOperator
创建新的 EMR 作业流。完成步骤后,集群将自动终止。此操作符可以通过传递 deferrable=True
作为参数以可延迟模式运行。使用 deferrable
模式将释放工作程序插槽,从而有效利用 Airflow 集群内的资源。但是,此模式需要您的部署中提供 Airflow 触发器。
作业流配置¶
要在 EMR 上创建作业流,您需要指定 EMR 集群的配置
SPARK_STEPS = [
{
"Name": "calculate_pi",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": ["/usr/lib/spark/bin/run-example", "SparkPi", "10"],
},
}
]
JOB_FLOW_OVERRIDES: dict[str, Any] = {
"Name": "PiCalc",
"ReleaseLabel": "emr-6.7.0",
"Applications": [{"Name": "Spark"}],
"Instances": {
"InstanceGroups": [
{
"Name": "Primary node",
"Market": "ON_DEMAND",
"InstanceRole": "MASTER",
"InstanceType": "m5.xlarge",
"InstanceCount": 1,
},
],
"KeepJobFlowAliveWhenNoSteps": False,
"TerminationProtected": False,
},
"Steps": SPARK_STEPS,
"JobFlowRole": "EMR_EC2_DefaultRole",
"ServiceRole": "EMR_DefaultRole",
}
在此,我们创建一个 EMR 单节点集群PiCalc。它只有一个步骤calculate_pi,使用 Spark 计算Pi
的值。配置 'KeepJobFlowAliveWhenNoSteps': False
告诉集群在步骤完成后关闭。或者,可以使用没有 Steps
值的配置,并且可以在以后使用 EmrAddStepsOperator
添加步骤。请参阅以下详细信息。
注意
使用 EMR API 启动的 EMR 集群(如本集群)默认情况下对所有用户不可见,因此您可能在 EMR 管理控制台中看不到该集群 - 您可以通过在 JOB_FLOW_OVERRIDES
字典的末尾添加 'VisibleToAllUsers': True
来更改此设置。
有关更多配置信息,请参阅 Boto3 EMR 客户端。
创建作业流¶
在以下代码中,我们使用上面解释的配置创建新的作业流。
create_job_flow = EmrCreateJobFlowOperator(
task_id="create_job_flow",
job_flow_overrides=JOB_FLOW_OVERRIDES,
)
向 EMR 作业流添加步骤¶
要向现有的 EMR 作业流添加步骤,可以使用 EmrAddStepsOperator
。此操作员可以通过传递 deferrable=True
作为参数来以可延迟模式运行。使用 deferrable
模式将释放工作程序槽,从而有效利用 Airflow 集群中的资源。但是,此模式需要在部署中提供 Airflow 触发器。
add_steps = EmrAddStepsOperator(
task_id="add_steps",
job_flow_id=create_job_flow.output,
steps=SPARK_STEPS,
execution_role_arn=execution_role_arn,
)
终止 EMR 作业流¶
要终止 EMR 作业流,可以使用 EmrTerminateJobFlowOperator
。此操作员可以通过传递 deferrable=True
作为参数来以可延迟模式运行。使用 deferrable
模式将释放工作程序槽,从而有效利用 Airflow 集群中的资源。但是,此模式需要在部署中提供 Airflow 触发器。
remove_cluster = EmrTerminateJobFlowOperator(
task_id="remove_cluster",
job_flow_id=create_job_flow.output,
)
修改 Amazon EMR 容器¶
要修改现有的 EMR 容器,可以使用 EmrContainerSensor
。
modify_cluster = EmrModifyClusterOperator(
task_id="modify_cluster", cluster_id=create_job_flow.output, step_concurrency_level=1
)
启动 EMR 笔记本执行¶
您可以使用 EmrStartNotebookExecutionOperator
在连接到正在运行的集群的现有笔记本上启动笔记本执行。
start_execution = EmrStartNotebookExecutionOperator(
task_id="start_execution",
editor_id=editor_id,
cluster_id=cluster_id,
relative_path="EMR-System-Test.ipynb",
service_role="EMR_Notebooks_DefaultRole",
)
停止 EMR 笔记本执行¶
你可以使用 EmrStopNotebookExecutionOperator
停止正在运行的笔记本执行。
stop_execution = EmrStopNotebookExecutionOperator(
task_id="stop_execution",
notebook_execution_id=notebook_execution_id_1,
)
传感器¶
等待 EMR 笔记本执行状态¶
要监视 EMR 笔记本执行的状态,可以使用 EmrNotebookExecutionSensor
。
wait_for_execution_start = EmrNotebookExecutionSensor(
task_id="wait_for_execution_start",
notebook_execution_id=notebook_execution_id_1,
target_states={"RUNNING"},
poke_interval=5,
)
等待 Amazon EMR 作业流状态¶
要监视 EMR 作业流的状态,可以使用 EmrJobFlowSensor
。
check_job_flow = EmrJobFlowSensor(task_id="check_job_flow", job_flow_id=create_job_flow.output)
等待 Amazon EMR 步骤状态¶
要监视 EMR 作业步骤的状态,可以使用 EmrStepSensor
。
wait_for_step = EmrStepSensor(
task_id="wait_for_step",
job_flow_id=create_job_flow.output,
step_id=get_step_id(add_steps.output),
)
限制¶
Amazon EMR 的服务配额相对较低,可以在 此处 详细查看。因此,在使用此页面中列出的任何运算符和传感器时,你可能会遇到限制问题。为了规避此限制,请考虑自定义 AWS 连接配置以修改默认的 Boto3 重试策略。 请参阅 AWS 连接配置文档。