Amazon EMR on Amazon EKS

Amazon EMR on EKS 为 Amazon EMR 提供了一种部署选项,允许你在 Amazon EKS 上运行开源大数据框架。

先决条件任务

要使用这些操作符,你必须执行以下几项操作

操作符

创建 Amazon EMR EKS 虚拟集群

EmrEksCreateClusterOperator 将创建一个 Amazon EMR on EKS 虚拟集群。下面的示例 DAG 展示了如何创建 EMR on EKS 虚拟集群。

要在 Amazon EKS 上创建 Amazon EMR 集群,你需要指定一个虚拟集群名称、你要使用的 eks 集群以及一个 eks 命名空间。

有关更多详细信息,请参阅 EMR on EKS 开发指南

tests/system/amazon/aws/example_emr_eks.py

    create_emr_eks_cluster = EmrEksCreateClusterOperator(
        task_id="create_emr_eks_cluster",
        virtual_cluster_name=virtual_cluster_name,
        eks_cluster_name=eks_cluster_name,
        eks_namespace=eks_namespace,
    )

向 Amazon EMR 虚拟集群提交作业

注意

此示例假设你已经配置了 EMR on EKS 虚拟集群。有关更多信息,请参阅 EMR on EKS 入门指南

EmrContainerOperator 将向 Amazon EMR on Amazon EKS 虚拟集群提交新作业。下面的示例作业计算数学常数 Pi。在生产作业中,你通常会引用 Amazon Simple Storage Service (S3) 上的 Spark 脚本。

要为 Amazon EMR on Amazon EKS 创建作业,你需要指定你的虚拟集群 ID、你要使用的 Amazon EMR 版本、你的 IAM 执行角色以及 Spark 提交参数。

你还可以选择提供配置覆盖,例如 Spark、Hive 或 Log4j 属性,以及将 Spark 日志发送到 Amazon S3 或 Amazon Cloudwatch 的监控配置。

在示例中,我们展示了如何添加 applicationConfiguration 以使用 AWS Glue 数据目录,以及 monitoringConfiguration 以将日志发送到 Amazon CloudWatch 中的 /aws/emr-eks-spark 日志组。有关作业配置的更多详细信息,请参阅 EMR on EKS 指南

tests/system/amazon/aws/example_emr_eks.py

job_driver_arg = {
    "sparkSubmitJobDriver": {
        "entryPoint": f"s3://{s3_bucket_name}/{S3_FILE_NAME}",
        "sparkSubmitParameters": "--conf spark.executors.instances=2 --conf spark.executors.memory=2G "
        "--conf spark.executor.cores=2 --conf spark.driver.cores=1",
    }
}

configuration_overrides_arg = {
    "monitoringConfiguration": {
        "cloudWatchMonitoringConfiguration": {
            "logGroupName": "/emr-eks-jobs",
            "logStreamNamePrefix": "airflow",
        }
    },
}

我们将 virtual_cluster_idexecution_role_arn 值作为操作符参数传递,但你可以将它们存储在连接中,或在 DAG 中提供。你的 AWS 区域应在 aws_default 连接中定义为 {"region_name": "us-east-1"},或者在传递给操作符的自定义连接名称中通过 aws_conn_id 参数定义。该操作符会返回作业运行的作业 ID。

tests/system/amazon/aws/example_emr_eks.py

job_starter = EmrContainerOperator(
    task_id="start_job",
    virtual_cluster_id=str(create_emr_eks_cluster.output),
    execution_role_arn=job_role_arn,
    release_label="emr-7.0.0-latest",
    job_driver=job_driver_arg,
    configuration_overrides=configuration_overrides_arg,
    name="pi.py",
)

传感器

等待 Amazon EMR 虚拟集群作业

要等待 Amazon EMR 虚拟集群作业的状态达到终端状态,你可以使用 EmrContainerSensor

tests/system/amazon/aws/example_emr_eks.py

job_waiter = EmrContainerSensor(
    task_id="job_waiter",
    virtual_cluster_id=str(create_emr_eks_cluster.output),
    job_id=str(job_starter.output),
)

此条目是否有帮助?