Amazon EKS 上的 Amazon EMR¶
Amazon EKS 上的 Amazon EMR提供了一个 Amazon EMR 部署选项,允许你在 Amazon EKS 上运行开源大数据框架。
先决任务¶
要使用这些操作符,你必须做一些事情
通过 pip 安装 API 库。
pip install 'apache-airflow[amazon]'详细的信息可在 Airflow™ 安装 中找到
设置连接.
操作符¶
创建 Amazon EMR EKS 虚拟集群¶
EmrEksCreateClusterOperator
将创建一个 Amazon EKS 上的 Amazon EMR 虚拟集群。下面的示例 DAG 展示了如何创建一个 EMR on EKS 虚拟集群。
要在 Amazon EKS 上创建一个 Amazon EMR 集群,你需要指定一个虚拟集群名称、你想要使用的 eks 集群和一个 eks 命名空间。
有关更多详情,请参阅EMR on EKS 开发指南。
create_emr_eks_cluster = EmrEksCreateClusterOperator(
task_id="create_emr_eks_cluster",
virtual_cluster_name=virtual_cluster_name,
eks_cluster_name=eks_cluster_name,
eks_namespace=eks_namespace,
)
提交作业给 Amazon EMR 虚拟集群¶
注意
此示例假设您已配置好 EMR on EKS 虚拟集群。有关更多信息,请参阅EMR on EKS 入门指南。
EmrContainerOperator
将提交新作业给 Amazon EMR on Amazon EKS 虚拟集群。下面的示例作业计算数学常数 Pi
。在生产作业中,您通常会参考 Amazon Simple Storage Service (S3) 上的 Spark 脚本。
要为 Amazon EMR on Amazon EKS 创建作业,您需要指定您的虚拟集群 ID、您希望使用的 Amazon EMR 版本、您的 IAM 执行角色和 Spark 提交参数。
您还可以选择提供配置覆盖,例如 Spark、Hive 或 Log4j 属性,以及将 Spark 日志发送到 Amazon S3 或 Amazon Cloudwatch 的监控配置。
在示例中,我们展示了如何添加 applicationConfiguration
以使用 AWS Glue 数据目录和 monitoringConfiguration
以将日志发送到 Amazon CloudWatch 中的 /aws/emr-eks-spark
日志组。有关作业配置的更多详情,请参阅EMR on EKS 指南。
job_driver_arg = {
"sparkSubmitJobDriver": {
"entryPoint": f"s3://{s3_bucket_name}/{S3_FILE_NAME}",
"sparkSubmitParameters": "--conf spark.executors.instances=2 --conf spark.executors.memory=2G "
"--conf spark.executor.cores=2 --conf spark.driver.cores=1",
}
}
configuration_overrides_arg = {
"monitoringConfiguration": {
"cloudWatchMonitoringConfiguration": {
"logGroupName": "/emr-eks-jobs",
"logStreamNamePrefix": "airflow",
}
},
}
我们传递 virtual_cluster_id
和 execution_role_arn
值作为操作符参数,但你可以在连接中存储它们或在 DAG 中提供它们。你的 AWS 区域应在 aws_default
连接中定义为 {"region_name": "us-east-1"}
或使用 aws_conn_id
参数传递给操作符的自定义连接名称。操作符返回作业运行的作业 ID。
job_starter = EmrContainerOperator(
task_id="start_job",
virtual_cluster_id=str(create_emr_eks_cluster.output),
execution_role_arn=job_role_arn,
release_label="emr-7.0.0-latest",
job_driver=job_driver_arg,
configuration_overrides=configuration_overrides_arg,
name="pi.py",
)
传感器¶
等待 Amazon EMR 虚拟集群作业¶
要等待 Amazon EMR 虚拟集群作业的状态达到终止状态,可以使用 EmrContainerSensor
job_waiter = EmrContainerSensor(
task_id="job_waiter",
virtual_cluster_id=str(create_emr_eks_cluster.output),
job_id=str(job_starter.output),
)