Amazon EMR on Amazon EKS¶
Amazon EMR on EKS 为 Amazon EMR 提供了一种部署选项,允许你在 Amazon EKS 上运行开源大数据框架。
先决条件任务¶
要使用这些操作符,你必须执行以下几项操作
通过 pip 安装 API 库。
pip install 'apache-airflow[amazon]'详细信息请参考Airflow® 的安装
设置连接.
操作符¶
创建 Amazon EMR EKS 虚拟集群¶
EmrEksCreateClusterOperator
将创建一个 Amazon EMR on EKS 虚拟集群。下面的示例 DAG 展示了如何创建 EMR on EKS 虚拟集群。
要在 Amazon EKS 上创建 Amazon EMR 集群,你需要指定一个虚拟集群名称、你要使用的 eks 集群以及一个 eks 命名空间。
有关更多详细信息,请参阅 EMR on EKS 开发指南。
tests/system/amazon/aws/example_emr_eks.py
create_emr_eks_cluster = EmrEksCreateClusterOperator(
task_id="create_emr_eks_cluster",
virtual_cluster_name=virtual_cluster_name,
eks_cluster_name=eks_cluster_name,
eks_namespace=eks_namespace,
)
向 Amazon EMR 虚拟集群提交作业¶
注意
此示例假设你已经配置了 EMR on EKS 虚拟集群。有关更多信息,请参阅 EMR on EKS 入门指南。
EmrContainerOperator
将向 Amazon EMR on Amazon EKS 虚拟集群提交新作业。下面的示例作业计算数学常数 Pi
。在生产作业中,你通常会引用 Amazon Simple Storage Service (S3) 上的 Spark 脚本。
要为 Amazon EMR on Amazon EKS 创建作业,你需要指定你的虚拟集群 ID、你要使用的 Amazon EMR 版本、你的 IAM 执行角色以及 Spark 提交参数。
你还可以选择提供配置覆盖,例如 Spark、Hive 或 Log4j 属性,以及将 Spark 日志发送到 Amazon S3 或 Amazon Cloudwatch 的监控配置。
在示例中,我们展示了如何添加 applicationConfiguration
以使用 AWS Glue 数据目录,以及 monitoringConfiguration
以将日志发送到 Amazon CloudWatch 中的 /aws/emr-eks-spark
日志组。有关作业配置的更多详细信息,请参阅 EMR on EKS 指南。
tests/system/amazon/aws/example_emr_eks.py
job_driver_arg = {
"sparkSubmitJobDriver": {
"entryPoint": f"s3://{s3_bucket_name}/{S3_FILE_NAME}",
"sparkSubmitParameters": "--conf spark.executors.instances=2 --conf spark.executors.memory=2G "
"--conf spark.executor.cores=2 --conf spark.driver.cores=1",
}
}
configuration_overrides_arg = {
"monitoringConfiguration": {
"cloudWatchMonitoringConfiguration": {
"logGroupName": "/emr-eks-jobs",
"logStreamNamePrefix": "airflow",
}
},
}
我们将 virtual_cluster_id
和 execution_role_arn
值作为操作符参数传递,但你可以将它们存储在连接中,或在 DAG 中提供。你的 AWS 区域应在 aws_default
连接中定义为 {"region_name": "us-east-1"}
,或者在传递给操作符的自定义连接名称中通过 aws_conn_id
参数定义。该操作符会返回作业运行的作业 ID。
tests/system/amazon/aws/example_emr_eks.py
job_starter = EmrContainerOperator(
task_id="start_job",
virtual_cluster_id=str(create_emr_eks_cluster.output),
execution_role_arn=job_role_arn,
release_label="emr-7.0.0-latest",
job_driver=job_driver_arg,
configuration_overrides=configuration_overrides_arg,
name="pi.py",
)
传感器¶
等待 Amazon EMR 虚拟集群作业¶
要等待 Amazon EMR 虚拟集群作业的状态达到终端状态,你可以使用 EmrContainerSensor
tests/system/amazon/aws/example_emr_eks.py
job_waiter = EmrContainerSensor(
task_id="job_waiter",
virtual_cluster_id=str(create_emr_eks_cluster.output),
job_id=str(job_starter.output),
)