AWS ECS 执行器

这是一个由 Amazon Elastic Container Service (ECS) 提供支持的 Airflow 执行器。Airflow 调度执行的每个任务都在其自己的 ECS 容器中运行。这种执行器的一些优点包括

  1. 任务隔离:一个任务不会成为另一个任务的“吵闹邻居”。CPU、内存和磁盘等资源会隔离到每个单独的任务。任何影响网络或导致整个容器失败的操作或故障只会影响其中运行的单个任务。任何单个用户都无法通过触发过多任务来使环境过载,因为没有共享工作程序。

  2. 自定义环境:您可以构建不同的容器镜像,其中包含运行任务所需的特定依赖项(如系统级依赖项)、二进制文件或数据。

  3. 成本效益高:计算资源仅在 Airflow 任务本身的生命周期内存在。这节省了成本,因为不需要随时准备好的持久/长期工作程序,这些工作程序也需要维护和修补。

有关快速入门指南,请参阅此处,它将帮助您使用基本配置启动并运行。

以下部分提供了有关配置、提供的 Dockerfile 示例和日志记录的更多通用详细信息。

配置选项

有许多可用的配置选项,可以直接在 airflow.cfg 文件的“aws_ecs_executor”部分下设置,或通过环境变量使用 AIRFLOW__AWS_ECS_EXECUTOR__<OPTION_NAME> 格式设置,例如 AIRFLOW__AWS_ECS_EXECUTOR__CONTAINER_NAME = "myEcsContainer"。有关如何设置这些选项的更多信息,请参阅设置配置选项

注意

配置选项必须在运行 Airflow 组件(调度程序、Webserver、ECS 任务容器等)的所有主机/环境中保持一致。有关设置配置的更多详细信息,请参阅此处

如果发生冲突,优先级从低到高依次是

  1. 加载具有默认值的选项的默认值。

  2. 加载通过 airflow.cfg 或环境变量显式提供的任何值。这些值会按照 Airflow 的配置优先级进行检查。

  3. 如果提供了 RUN_TASK_KWARGS 选项,则加载其中提供的任何值。

注意

exec_config 是可以提供给运算符的可选参数。它是一个字典类型,在 ECS Executor 的上下文中,它表示一个 run_task_kwargs 配置,然后在 Airflow 配置中指定的 run_task_kwargs(如果存在)之上进行更新。这是一个递归更新,实质上是将 Python 的 update 应用于配置中的每个嵌套字典。大致等同于:run_task_kwargs.update(exec_config)

必需的配置选项:

  • CLUSTER - Amazon ECS 集群的名称。必需。

  • CONTAINER_NAME - 将用于通过 ECS 执行器执行 Airflow 任务的容器名称。该容器应在 ECS 任务定义中指定。必需。

  • REGION_NAME - 配置 Amazon ECS 的 AWS 区域名称。必需。

可选的配置选项:

  • ASSIGN_PUBLIC_IP - 是否为 ECS 执行器启动的容器分配公共 IP 地址。默认为“False”。

  • AWS_CONN_ID - ECS 执行器用于对 AWS ECS 进行 API 调用所使用的 Airflow 连接(即凭据)。默认为“aws_default”。

  • LAUNCH_TYPE - 启动类型可以是“FARGATE”或“EC2”。默认为“FARGATE”。

  • PLATFORM_VERSION - 如果使用 FARGATE 启动类型,ECS 任务使用的平台版本。默认为“LATEST”。

  • RUN_TASK_KWARGS - 一个 JSON 字符串,包含提供给 ECS run_task API 的参数。

  • SECURITY_GROUPS - 与 ECS 任务关联的最多 5 个用逗号分隔的安全组 ID。默认为 VPC 默认值。

  • SUBNETS - 与 ECS 任务或服务关联的最多 16 个用逗号分隔的子网 ID。默认为 VPC 默认值。

  • TASK_DEFINITION - 要运行的 ECS 任务定义的族和修订版本(族:修订版本)或完整 ARN。默认为最新的 ACTIVE 修订版本。

  • MAX_RUN_TASK_ATTEMPTS - ECS 执行器应尝试运行任务的最大次数。这指的是任务启动失败(即 ECS API 失败、容器失败等)的情况。

  • CHECK_HEALTH_ON_STARTUP - 是否在启动时检查 ECS 执行器健康状况

有关可用选项(包括类型提示和示例)的更详细描述,请参阅 Amazon 提供程序包中的 config_templates 文件夹。

注意

exec_config 是可以提供给运算符的可选参数。它是一个字典类型,在 ECS Executor 的上下文中,它表示一个 run_task_kwargs 配置,然后在 Airflow 配置中指定的 run_task_kwargs(如果存在)之上进行更新。这是一个递归更新,实质上是将 Python 的 update 应用于配置中的每个嵌套字典。大致等同于:run_task_kwargs.update(exec_config)

适用于 AWS ECS 执行器的 Dockerfile

可以在此处找到 Dockerfile 示例,它创建一个镜像,AWS ECS 可以使用该镜像在 Apache Airflow 中使用 AWS ECS 执行器运行 Airflow 任务。该镜像支持 AWS CLI/API 集成,允许您在 Airflow 环境中与 AWS 服务交互。它还包含从 S3 存储桶或本地文件夹加载 DAG(有向无环图)的选项。

先决条件

Docker 必须安装在您的系统上。可以在此处找到安装 Docker 的说明。

构建镜像

AWS CLI 将安装在镜像中,并且有多种方法可以将 AWS 身份验证信息传递给容器,因此也有多种方法来构建镜像。本指南将介绍 2 种方法。

最安全的方法是使用 IAM 角色。创建 ECS 任务定义时,您可以选择任务角色和任务执行角色。任务执行角色是容器代理代表您发出 AWS API 请求时使用的角色。对于 ECS 执行器而言,此角色至少需要拥有 AmazonECSTaskExecutionRolePolicy 以及 CloudWatchLogsFullAccess(或 CloudWatchLogsFullAccessV2)策略。任务角色是容器用于发出 AWS API 请求的角色。此角色需要根据正在运行的 DAG 中描述的任务具有权限。如果您通过 S3 存储桶加载 DAG,则此角色需要具有读取 S3 存储桶的权限。

要创建新的任务角色或任务执行角色,请按照以下步骤操作

  1. 登录您的 AWS 管理控制台并导航到 IAM 服务,然后在左侧导航栏中的“访问管理”下,选择“角色”。

  2. 在“角色”页面上,点击右上角的“创建角色”。

  3. 在“受信任的实体类型”下,选择“AWS 服务”。

  4. 从“使用案例”下的下拉列表中选择“Elastic Container Service”,并将特定使用案例选择为“Elastic Container Service Task”。点击“下一步”。

  5. 在“权限”页面上,根据角色是任务角色还是任务执行角色,选择所需的权限。选择所有必需的权限后,点击“下一步”。

  6. 输入新角色的名称,以及可选的描述。检查“受信任的实体”和角色的权限。根据需要添加任何标签,然后点击“创建角色”。

为 ECS 集群创建任务定义时(有关详细信息,请参阅设置指南),为任务定义选择适当的新创建的任务角色和任务执行角色。

然后,您可以 cd 进入包含 Dockerfile 的目录并运行以下命令来构建镜像

docker build -t my-airflow-image \
 --build-arg aws_default_region=YOUR_DEFAULT_REGION .

注意:在相同架构下构建和运行镜像非常重要。例如,对于 Apple Silicon 用户,您可能希望使用 docker buildx 指定架构

docker buildx build --platform=linux/amd64 -t my-airflow-image \
  --build-arg aws_default_region=YOUR_DEFAULT_REGION .

有关使用 docker buildx 的更多信息,请参阅此处

第二种方法是使用构建时参数(aws_access_key_idaws_secret_access_keyaws_default_regionaws_session_token)。

注意:不建议在生产环境中使用此方法,因为用户凭据存储在容器中,这可能存在安全漏洞。

要使用这些参数传递 AWS 身份验证信息,请在 Docker 构建过程中使用 --build-arg 选项。例如

docker build -t my-airflow-image \
 --build-arg aws_access_key_id=YOUR_ACCESS_KEY \
 --build-arg aws_secret_access_key=YOUR_SECRET_KEY \
 --build-arg aws_default_region=YOUR_DEFAULT_REGION \
 --build-arg aws_session_token=YOUR_SESSION_TOKEN .

YOUR_ACCESS_KEYYOUR_SECRET_KEYYOUR_SESSION_TOKENYOUR_DEFAULT_REGION 替换为有效的 AWS 凭据。

基础镜像

此 Docker 镜像构建在 apache/airflow:latest 镜像之上。有关该镜像的更多信息,请参阅此处

重要说明:此镜像中的 Airflow 和 Python 版本必须与运行 Airflow 调度程序进程(进而运行执行器)的主机/容器上的 Airflow 和 Python 版本一致。可以通过在本地运行容器并使用以下命令验证镜像的 Airflow 版本

docker run <image_name> version

类似地,可以通过以下命令验证镜像的 Python 版本

docker run <image_name> python --version

确保这些版本与运行 Airflow 调度程序进程(以及 ECS 执行器)的主机/容器上的版本匹配。可以从 Dockerhub 注册表下载特定 Python 版本的 Apache Airflow 镜像,并按Python 版本筛选标签。例如,标签 latest-python3.9 指定该镜像将安装 Python 3.9。

加载 DAG

有多种方法可以在由 ECS 管理的容器上加载 DAG。此 Dockerfile 已预配置了两种可能的方法:从本地文件夹复制,或从 S3 存储桶下载。也可以使用其他方法加载 DAG。

从 S3 存储桶

要从 S3 存储桶加载 DAG,请取消注释 Dockerfile 中的 entrypoint 行,将指定的 S3 存储桶中的 DAG 同步到容器内的 /opt/airflow/dags 目录。如果想将 DAG 存储在 /opt/airflow/dags 以外的目录中,可以选择提供 container_dag_path 作为构建参数。

在 docker build 命令中添加 --build-arg s3_uri=YOUR_S3_URI。将 YOUR_S3_URI 替换为您的 S3 存储桶的 URI。确保您具有从该存储桶读取的适当权限。

请注意,以下命令也正在传递 AWS 凭据作为构建参数。

docker build -t my-airflow-image \
 --build-arg aws_access_key_id=YOUR_ACCESS_KEY \
 --build-arg aws_secret_access_key=YOUR_SECRET_KEY \
 --build-arg aws_default_region=YOUR_DEFAULT_REGION \
 --build-arg aws_session_token=YOUR_SESSION_TOKEN \
 --build-arg s3_uri=YOUR_S3_URI .

从本地文件夹

要从本地文件夹加载 DAG,请将您的 DAG 文件放在 Docker 构建上下文中主机上的文件夹中,并使用 host_dag_path 构建参数提供该文件夹的位置。默认情况下,DAG 将复制到 /opt/airflow/dags,但这可以通过在 Docker 构建过程中传递 container_dag_path 构建时参数来更改

docker build -t my-airflow-image --build-arg host_dag_path=./dags_on_host --build-arg container_dag_path=/path/on/container .

如果选择将 DAG 加载到 /opt/airflow/dags 以外的路径,则需要在 Airflow 配置中更新新路径。

安装 Python 依赖项

此 Dockerfile 支持通过 piprequirements.txt 文件安装 Python 依赖项。将您的 requirements.txt 文件放在与 Dockerfile 相同的目录中。如果在不同位置,可以使用 requirements_path 构建参数指定。复制 requirements.txt 文件时,请记住 Docker 上下文。取消注释 Dockerfile 中用于将 requirements.txt 文件复制到容器并运行 pip install 以在容器上安装依赖项的两行。

构建用于 AWS ECS 执行器的镜像

有关如何将通过此自述文件创建的 Docker 镜像与 ECS 执行器一起使用的详细说明,请参见此处

日志记录

通过此执行器执行的 Airflow 任务在配置的 VPC 内的容器中运行。这意味着日志无法直接访问 Airflow Webserver,并且容器在任务完成后停止时,日志将永久丢失。

使用 ECS 执行器时应采用远程日志记录,以持久化您的 Airflow 任务日志并使其可从 Airflow Webserver 查看。

配置远程日志记录

有多种方法可以配置远程日志记录和几种支持的目标。可以在此处找到 Airflow 任务日志记录的概述。此处可以找到配置 S3 远程日志记录的说明,此处可以找到 Cloudwatch 远程日志记录的说明。对于 ECS 执行器上下文中的远程日志记录,需要指出一些重要事项

  • Airflow 远程日志记录的配置选项应在运行 Airflow 的所有主机和容器上进行配置。例如,Webserver 需要此配置才能从远程位置获取日志,而 ECS 容器需要此配置才能将日志上传到远程位置。在此处阅读有关如何通过配置文件或环境变量导出设置 Airflow 配置的更多信息。

  • 将 Airflow 远程日志记录配置添加到容器的方法有很多种。一些示例包括但不限于

    • 直接在 Dockerfile 中导出为环境变量(参见上面的 Dockerfile 部分)

    • 更新 airflow.cfg 文件或在 Dockerfile 中复制/挂载/下载自定义 airflow.cfg

    • 在 ECS 任务定义中以纯文本形式添加或通过Secrets/System Manager 添加

    • 或者,使用ECS 任务环境文件

  • 您必须在容器内配置凭据才能与远程服务进行交互以处理您的日志(例如 S3、CloudWatch Logs 等)。这可以通过多种方式完成。一些示例包括但不限于

    • 直接将凭据导出到 Dockerfile 中(参见上面的 Dockerfile 部分)

    • 配置 Airflow 连接并将其提供为远程日志记录连接 ID(通过上述任何方式或您偏好的方式导出到容器中)。然后 Airflow 将使用这些凭据专门用于与您选择的远程日志记录目标进行交互。

注意

配置选项必须在运行 Airflow 组件(调度程序、Webserver、ECS 任务容器等)的所有主机/环境中保持一致。有关设置配置的更多详细信息,请参阅此处

ECS 任务日志记录

可以将 ECS 配置为使用 awslogs 日志驱动程序将日志信息发送到 CloudWatch Logs 以用于 ECS 任务本身。这些日志将包括 Airflow 任务运算符日志记录以及在容器中运行的进程生命周期内发生的任何其他日志记录(在本例中为 Airflow CLI 命令 airflow tasks run ...)。这有助于调试远程日志记录问题或测试远程日志记录配置。可以在此处找到启用此日志记录的信息。

注意:这些日志将无法从 Airflow Webserver UI 查看。

性能和调优

虽然 ECS 执行器会由于容器启动时间而为每个 Airflow 任务执行增加约 50-60 秒的延迟,但它允许更高程度的并行性和隔离。我们已测试过此执行器,调度了 1,000 多个任务并行运行,观察到最多可以同时运行 500 个任务。500 个任务的限制符合ECS 服务配额

在大规模运行此执行器和 Airflow 时,需要考虑一些配置选项。以下许多配置将限制可以同时运行的任务数量或调度程序的性能。

为 Apache Airflow 设置 ECS 执行器

让 ECS 执行器在 Apache Airflow 中工作涉及 3 个步骤

  1. 创建一个数据库,Airflow 和在 ECS 中运行的任务可以连接到该数据库。

  2. 创建和配置一个 ECS 集群,该集群可以运行来自 Airflow 的任务。

  3. 配置 Airflow 使用 ECS 执行器和数据库。

有不同的选项可用于选择数据库后端。此处可以找到有关 Airflow 支持的不同选项的更多信息。以下指南将介绍如何在 AWS 上设置 PostgreSQL RDS 实例。该指南还将涵盖设置 ECS 集群。ECS 执行器支持各种启动类型,但本指南将解释如何设置 ECS Fargate 集群。

为 AWS ECS 执行器设置 RDS 数据库实例

创建 RDS 数据库实例

  1. 登录您的 AWS 管理控制台并导航到 RDS 服务。

  2. 点击“创建数据库”开始创建新的 RDS 实例。

  3. 选择“标准创建”选项,并选择 PostreSQL。

  4. 选择适当的模板、可用性和持久性。

    • 注意:在撰写本文时,“多可用区数据库集群”选项不支持设置数据库名称,这是下面的必需步骤。

  5. 设置数据库实例名称、用户名和密码。

  6. 选择实例配置和存储参数。

  7. 在“连接性”部分,选择“不连接到 EC2 计算资源”

  8. 选择或创建一个 VPC 和子网,并允许对数据库进行公共访问。选择或创建安全组并选择可用区。

  9. 打开“附加配置”选项卡,将数据库名称设置为 airflow_db

  10. 根据需要选择其他设置,然后点击“创建数据库”来创建数据库。

测试连接性

为了能够连接到新的 RDS 实例,您需要允许从您的 IP 地址到数据库的入站流量。

  1. 在 RDS 实例的“连接性与安全性”选项卡中的“安全性”标题下,找到指向您的新 RDS 数据库实例的 VPC 安全组的链接。

  2. 创建一个入站规则,允许从您的 IP 地址(或多个地址)通过 TCP 端口 5432 (PostgreSQL) 的流量。

  3. 确认修改安全组后可以连接到数据库。这将需要安装 psql。可以在此处找到安装 psql 的说明。

注意:在测试连接性之前,请确保您的数据库状态为“可用”

psql -h <endpoint> -p 5432 -U <username> <db_name>

可以在“连接性和安全性”选项卡上找到端点,用户名(和密码)是创建数据库时使用的凭据。

数据库名称应为 airflow_db(除非创建数据库时使用了不同的名称)。

如果连接成功,将提示您输入密码。

创建使用 Fargate 的 ECS 集群和任务定义

为了为将与 Apache Airflow 配合使用的 ECS 集群创建任务定义,您需要一个正确配置的 Docker 镜像。有关如何执行此操作的说明,请参阅Dockerfile 部分。

镜像构建完成后,需要将其放置在 ECS 可以拉取的仓库中。有多种方法可以实现这一点。本指南将介绍使用 Amazon Elastic Container Registry (ECR) 的方法。

创建 ECR 仓库

  1. 登录您的 AWS 管理控制台并导航到 ECR 服务。

  2. 点击“创建仓库”。

  3. 为仓库命名并填写其他必要信息。

  4. 点击“创建仓库”。

  5. 仓库创建完成后,点击该仓库。点击右上角的“查看推送命令”按钮。

  6. 按照说明推送 Docker 镜像,并根据需要替换镜像名称。推送镜像后刷新页面,确保镜像已上传。

创建 ECS 集群

  1. 登录您的 AWS 管理控制台并导航到 Amazon Elastic Container Service。

  2. 点击“集群”,然后点击“创建集群”。

  3. 确保在“基础设施”下选择了“AWS Fargate (无服务器)”。

  4. 根据需要选择其他选项,然后点击“创建”以创建集群。

创建任务定义

  1. 点击左侧导航栏中的“任务定义”,然后点击“创建新任务定义”。

  2. 选择任务定义族名称。启动类型选择 AWS Fargate。

  3. 选择或创建任务角色和任务执行角色,并确保这些角色具有完成各自任务所需的权限。您可以选择创建一个新的任务执行角色,该角色将具有任务运行所需的最基本权限。

  4. 为容器选择一个名称,并使用上一节中推送的镜像的镜像 URI。确保使用的角色具有拉取镜像所需的权限。

  5. 向容器添加以下环境变量

  • AIRFLOW__DATABASE__SQL_ALCHEMY_CONN,其值为 PostgreSQL 连接字符串,格式如下,使用上面数据库部分设置的值

postgresql+psycopg2://<username>:<password>@<endpoint>/<database_name>
  • AIRFLOW__ECS_EXECUTOR__SECURITY_GROUPS,其值为与 RDS 实例使用的 VPC 关联的安全组 ID 的逗号分隔列表。

  • AIRFLOW__ECS_EXECUTOR__SUBNETS,其值为与 RDS 实例关联的子网 ID 的逗号分隔列表。

  1. 根据 Airflow 的一般需求(请参阅此处)、ECS 执行器(请参阅此处)或远程日志记录(请参阅此处),添加其他必要配置。请注意,任何配置更改都应在整个 Airflow 环境中进行,以保持配置一致。

  2. 点击“创建”。

允许 ECS 容器访问 RDS 数据库

最后一步,必须为 ECS 容器配置数据库访问。有许多不同的网络配置方案,但一种可能的方法是

  1. 登录您的 AWS 管理控制台并导航到 VPC 仪表板。

  2. 在左侧导航栏中,在“安全性”标题下,点击“安全组”。

  3. 选择与您的 RDS 实例关联的安全组,然后点击“编辑入站规则”。

  4. 添加一条新规则,允许 PostgreSQL 类型的流量流向 ECS 集群关联的子网的 CIDR。

配置 Airflow

要配置 Airflow 以利用 ECS 执行器并利用我们设置的资源,请创建一个脚本(例如,ecs_executor_config.sh),其内容如下

export AIRFLOW__CORE__EXECUTOR='airflow.providers.amazon.aws.executors.ecs.ecs_executor.AwsEcsExecutor'

export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=<postgres-connection-string>

export AIRFLOW__AWS_ECS_EXECUTOR__REGION_NAME=<executor-region>

export AIRFLOW__AWS_ECS_EXECUTOR__CLUSTER=<ecs-cluster-name>

export AIRFLOW__AWS_ECS_EXECUTOR__CONTAINER_NAME=<ecs-container-name>

export AIRFLOW__AWS_ECS_EXECUTOR__TASK_DEFINITION=<task-definition-name>

export AIRFLOW__AWS_ECS_EXECUTOR__LAUNCH_TYPE='FARGATE'

export AIRFLOW__AWS_ECS_EXECUTOR__PLATFORM_VERSION='LATEST'

export AIRFLOW__AWS_ECS_EXECUTOR__ASSIGN_PUBLIC_IP='True'

export AIRFLOW__AWS_ECS_EXECUTOR__SECURITY_GROUPS=<security-group-id-for-rds>

export AIRFLOW__AWS_ECS_EXECUTOR__SUBNETS=<subnet-id-for-rds>

此脚本应在运行 Airflow 调度程序和 Webserver 的主机上运行,然后再启动这些进程。

该脚本设置环境变量,用于配置 Airflow 使用 ECS 执行器并提供任务执行所需的必要信息。进行的任何其他配置更改(例如远程日志记录的更改)都应添加到此示例脚本中,以使整个 Airflow 环境中的配置保持一致。

初始化 Airflow 数据库

Airflow 数据库在使用前需要初始化,并且需要添加一个用户才能登录。以下命令添加一个管理员用户(如果数据库尚未初始化,该命令也会对其进行初始化)

airflow users create --username admin --password admin --firstname <your first name> --lastname <your last name> --email <your email> --role Admin

此条目有帮助吗?