Amazon EMR¶
Amazon EMR(之前称为 Amazon Elastic MapReduce)是一个托管集群平台,可简化在 AWS 上运行大数据框架(如 Apache Hadoop 和 Apache Spark)来处理和分析大量数据。使用这些框架和相关的开源项目,您可以处理用于分析目的和商业智能工作负载的数据。Amazon EMR 还允许您将大量数据移入和移出其他 AWS 数据存储和数据库,如 Amazon Simple Storage Service (Amazon S3) 和 Amazon DynamoDB。
先决条件任务¶
要使用这些运算符,您必须执行以下几项操作
通过 pip 安装 API 库。
pip install 'apache-airflow[amazon]'详细信息请参阅 Airflow® 的安装
设置连接.
运算符¶
注意
为了成功运行示例,您需要为 Amazon EMR 创建 IAM 服务角色(EMR_EC2_DefaultRole
和 EMR_DefaultRole
)。您可以使用 AWS CLI 创建这些角色:aws emr create-default-roles
。
创建 EMR 作业流¶
您可以使用 EmrCreateJobFlowOperator
创建新的 EMR 作业流。集群将在完成步骤后自动终止。
默认行为是在集群启动后立即将 DAG 任务节点标记为成功(wait_policy=None
)。可以通过使用不同的 wait_policy
来修改此行为。可用选项包括:
WaitPolicy.WAIT_FOR_COMPLETION
- DAG 任务节点等待集群运行WaitPolicy.WAIT_FOR_STEPS_COMPLETION
- DAG 任务节点等待集群终止
可以通过传递 deferrable=True
作为参数,以可延迟模式运行此运算符。使用 deferrable
模式将释放工作槽,并提高 Airflow 集群内资源的利用率。但是,此模式需要 Airflow 触发器在您的部署中可用。
JobFlow 配置¶
要在 EMR 上创建作业流,您需要指定 EMR 集群的配置
tests/system/amazon/aws/example_emr.py
SPARK_STEPS = [
{
"Name": "calculate_pi",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": ["/usr/lib/spark/bin/run-example", "SparkPi", "10"],
},
}
]
JOB_FLOW_OVERRIDES: dict[str, Any] = {
"Name": "PiCalc",
"ReleaseLabel": "emr-7.1.0",
"Applications": [{"Name": "Spark"}],
"Instances": {
"InstanceGroups": [
{
"Name": "Primary node",
"Market": "ON_DEMAND",
"InstanceRole": "MASTER",
"InstanceType": "m5.xlarge",
"InstanceCount": 1,
},
],
# If the EMR steps complete too quickly the cluster will be torn down before the other system test
# tasks have a chance to run (such as the modify cluster step, the addition of more EMR steps, etc).
# Set KeepJobFlowAliveWhenNoSteps to False to avoid the cluster from being torn down prematurely.
"KeepJobFlowAliveWhenNoSteps": True,
"TerminationProtected": False,
},
"Steps": SPARK_STEPS,
"JobFlowRole": "EMR_EC2_DefaultRole",
"ServiceRole": "EMR_DefaultRole",
}
这里我们创建了一个 EMR 单节点集群 *PiCalc*。它只有一个步骤 *calculate_pi*,它使用 Spark 计算 Pi
的值。配置 'KeepJobFlowAliveWhenNoSteps': False
告诉集群在步骤完成后关闭。或者,可以使用没有 Steps
值的配置,并且可以使用 EmrAddStepsOperator
在以后添加步骤。请参阅下面的详细信息。
注意
使用像这样的 EMR API 启动的 EMR 集群默认情况下对所有用户不可见,因此您可能在 EMR 管理控制台中看不到该集群 - 您可以通过在 JOB_FLOW_OVERRIDES
字典的末尾添加 'VisibleToAllUsers': True
来更改此设置。
有关更多配置信息,请参阅 Boto3 EMR 客户端。
创建作业流¶
在以下代码中,我们将使用上述配置创建一个新的作业流。
tests/system/amazon/aws/example_emr.py
create_job_flow = EmrCreateJobFlowOperator(
task_id="create_job_flow",
job_flow_overrides=JOB_FLOW_OVERRIDES,
)
向 EMR 作业流添加步骤¶
要向现有的 EMR 作业流添加步骤,您可以使用 EmrAddStepsOperator
。可以通过传递 deferrable=True
作为参数,以可延迟模式运行此运算符。使用 deferrable
模式将释放工作槽,并提高 Airflow 集群内资源的利用率。但是,此模式需要 Airflow 触发器在您的部署中可用。
tests/system/amazon/aws/example_emr.py
add_steps = EmrAddStepsOperator(
task_id="add_steps",
job_flow_id=create_job_flow.output,
steps=SPARK_STEPS,
execution_role_arn=execution_role_arn,
)
终止 EMR 作业流¶
要终止 EMR 作业流,您可以使用 EmrTerminateJobFlowOperator
。可以通过传递 deferrable=True
作为参数,以可延迟模式运行此运算符。使用 deferrable
模式将释放工作槽,并提高 Airflow 集群内资源的利用率。但是,此模式需要 Airflow 触发器在您的部署中可用。
tests/system/amazon/aws/example_emr.py
remove_cluster = EmrTerminateJobFlowOperator(
task_id="remove_cluster",
job_flow_id=create_job_flow.output,
)
修改 Amazon EMR 容器¶
要修改现有的 EMR 容器,您可以使用 EmrContainerSensor
。
tests/system/amazon/aws/example_emr.py
modify_cluster = EmrModifyClusterOperator(
task_id="modify_cluster", cluster_id=create_job_flow.output, step_concurrency_level=1
)
启动 EMR Notebook 执行¶
您可以使用 EmrStartNotebookExecutionOperator
在附加到正在运行的集群的现有 Notebook 上启动 Notebook 执行。
tests/system/amazon/aws/example_emr_notebook_execution.py
start_execution = EmrStartNotebookExecutionOperator(
task_id="start_execution",
editor_id=editor_id,
cluster_id=cluster_id,
relative_path="EMR-System-Test.ipynb",
service_role="EMR_Notebooks_DefaultRole",
)
停止 EMR Notebook 执行¶
您可以使用 EmrStopNotebookExecutionOperator
来停止正在运行的 Notebook 执行。
tests/system/amazon/aws/example_emr_notebook_execution.py
stop_execution = EmrStopNotebookExecutionOperator(
task_id="stop_execution",
notebook_execution_id=notebook_execution_id_1,
)
传感器¶
等待 EMR Notebook 执行状态¶
要监视 EMR Notebook 执行的状态,您可以使用 EmrNotebookExecutionSensor
。
tests/system/amazon/aws/example_emr_notebook_execution.py
wait_for_execution_start = EmrNotebookExecutionSensor(
task_id="wait_for_execution_start",
notebook_execution_id=notebook_execution_id_1,
target_states={"RUNNING"},
poke_interval=5,
)
等待 Amazon EMR 作业流状态¶
要监视 EMR 作业流的状态,您可以使用 EmrJobFlowSensor
。
tests/system/amazon/aws/example_emr.py
check_job_flow = EmrJobFlowSensor(task_id="check_job_flow", job_flow_id=create_job_flow.output)
等待 Amazon EMR 步骤状态¶
要监视 EMR 作业步骤的状态,您可以使用 EmrStepSensor
。
tests/system/amazon/aws/example_emr.py
wait_for_step = EmrStepSensor(
task_id="wait_for_step",
job_flow_id=create_job_flow.output,
step_id=get_step_id(add_steps.output),
)
限制¶
Amazon EMR 的服务配额相对较低,可以在此处查看详细信息。因此,当使用此页面中列出的任何运算符和传感器时,您可能会遇到限制问题。为了规避此限制,请考虑自定义 AWS 连接配置以修改默认的 Boto3 重试策略。请参阅 AWS 连接配置文档。