Amazon EKS 上的 Amazon EMR

Amazon EKS 上的 Amazon EMR提供了一个 Amazon EMR 部署选项,允许你在 Amazon EKS 上运行开源大数据框架。

先决任务

要使用这些操作符,你必须做一些事情

操作符

创建 Amazon EMR EKS 虚拟集群

EmrEksCreateClusterOperator 将创建一个 Amazon EKS 上的 Amazon EMR 虚拟集群。下面的示例 DAG 展示了如何创建一个 EMR on EKS 虚拟集群。

要在 Amazon EKS 上创建一个 Amazon EMR 集群,你需要指定一个虚拟集群名称、你想要使用的 eks 集群和一个 eks 命名空间。

有关更多详情,请参阅EMR on EKS 开发指南

tests/system/providers/amazon/aws/example_emr_eks.py[源代码]

    create_emr_eks_cluster = EmrEksCreateClusterOperator(
        task_id="create_emr_eks_cluster",
        virtual_cluster_name=virtual_cluster_name,
        eks_cluster_name=eks_cluster_name,
        eks_namespace=eks_namespace,
    )

提交作业给 Amazon EMR 虚拟集群

注意

此示例假设您已配置好 EMR on EKS 虚拟集群。有关更多信息,请参阅EMR on EKS 入门指南

EmrContainerOperator 将提交新作业给 Amazon EMR on Amazon EKS 虚拟集群。下面的示例作业计算数学常数 Pi。在生产作业中,您通常会参考 Amazon Simple Storage Service (S3) 上的 Spark 脚本。

要为 Amazon EMR on Amazon EKS 创建作业,您需要指定您的虚拟集群 ID、您希望使用的 Amazon EMR 版本、您的 IAM 执行角色和 Spark 提交参数。

您还可以选择提供配置覆盖,例如 Spark、Hive 或 Log4j 属性,以及将 Spark 日志发送到 Amazon S3 或 Amazon Cloudwatch 的监控配置。

在示例中,我们展示了如何添加 applicationConfiguration 以使用 AWS Glue 数据目录和 monitoringConfiguration 以将日志发送到 Amazon CloudWatch 中的 /aws/emr-eks-spark 日志组。有关作业配置的更多详情,请参阅EMR on EKS 指南

tests/system/providers/amazon/aws/example_emr_eks.py[源代码]

job_driver_arg = {
    "sparkSubmitJobDriver": {
        "entryPoint": f"s3://{s3_bucket_name}/{S3_FILE_NAME}",
        "sparkSubmitParameters": "--conf spark.executors.instances=2 --conf spark.executors.memory=2G "
        "--conf spark.executor.cores=2 --conf spark.driver.cores=1",
    }
}

configuration_overrides_arg = {
    "monitoringConfiguration": {
        "cloudWatchMonitoringConfiguration": {
            "logGroupName": "/emr-eks-jobs",
            "logStreamNamePrefix": "airflow",
        }
    },
}

我们传递 virtual_cluster_idexecution_role_arn 值作为操作符参数,但你可以在连接中存储它们或在 DAG 中提供它们。你的 AWS 区域应在 aws_default 连接中定义为 {"region_name": "us-east-1"} 或使用 aws_conn_id 参数传递给操作符的自定义连接名称。操作符返回作业运行的作业 ID。

tests/system/providers/amazon/aws/example_emr_eks.py[源代码]

job_starter = EmrContainerOperator(
    task_id="start_job",
    virtual_cluster_id=str(create_emr_eks_cluster.output),
    execution_role_arn=job_role_arn,
    release_label="emr-7.0.0-latest",
    job_driver=job_driver_arg,
    configuration_overrides=configuration_overrides_arg,
    name="pi.py",
)

传感器

等待 Amazon EMR 虚拟集群作业

要等待 Amazon EMR 虚拟集群作业的状态达到终止状态,可以使用 EmrContainerSensor

tests/system/providers/amazon/aws/example_emr_eks.py[源代码]

job_waiter = EmrContainerSensor(
    task_id="job_waiter",
    virtual_cluster_id=str(create_emr_eks_cluster.output),
    job_id=str(job_starter.output),
)

本条目有帮助吗?