AWS ECS 执行器¶
这是一个由 Amazon Elastic Container Service (ECS) 驱动的 Airflow 执行器。Airflow 调度的每个执行任务都在其自己的 ECS 容器中运行。这种执行器的一些优点包括:
任务隔离:没有任务可以成为另一个任务的干扰邻居。CPU、内存和磁盘等资源被隔离到每个单独的任务。任何影响网络或导致整个容器失败的操作或故障只会影响其中运行的单个任务。没有单个用户可以通过触发太多任务来使环境过载,因为没有共享的工作线程。
自定义环境:您可以构建不同的容器镜像,其中包含运行任务所需的特定依赖项(如系统级依赖项)、二进制文件或数据。
成本效益:计算资源仅在 Airflow 任务本身的生命周期内存在。这节省了成本,因为它不需要始终准备就绪的持久/长期工作线程,这些工作线程还需要维护和修补。
有关快速入门指南,请参见此处,它将帮助您启动并运行基本配置。
以下各节提供了有关配置、提供的示例 Dockerfile 和日志记录的更多通用详细信息。
配置选项¶
有许多配置选项可用,可以直接在 airflow.cfg 文件中的“aws_ecs_executor”部分下设置,也可以通过环境变量以 AIRFLOW__AWS_ECS_EXECUTOR__<OPTION_NAME>
格式设置,例如 AIRFLOW__AWS_ECS_EXECUTOR__CONTAINER_NAME = "myEcsContainer"
。有关如何设置这些选项的更多信息,请参见设置配置选项。
注意
配置选项必须在运行 Airflow 组件(调度程序、Web 服务器、ECS 任务容器等)的所有主机/环境之间保持一致。有关设置配置的更多详细信息,请参见此处。
如果发生冲突,优先级顺序从低到高为:
加载具有默认值的选项的默认值。
加载通过 airflow.cfg 或环境变量显式提供的任何值。这些值会根据 Airflow 的配置优先级进行检查。
如果提供了 RUN_TASK_KWARGS 选项,则加载其中提供的任何值。
注意
exec_config
是可以提供给操作符的可选参数。它是一种字典类型,在 ECS 执行器的上下文中,它表示一个 run_task_kwargs
配置,然后该配置会更新覆盖上面 Airflow 配置中指定的 run_task_kwargs
(如果存在)。它是递归更新,本质上是将 Python 更新应用于配置中的每个嵌套字典。大致近似为:run_task_kwargs.update(exec_config)
必需的配置选项:¶
CLUSTER - Amazon ECS 集群的名称。必需。
CONTAINER_NAME - 将用于通过 ECS 执行器执行 Airflow 任务的容器的名称。容器应在 ECS 任务定义中指定。必需。
REGION_NAME - 配置 Amazon ECS 的 AWS 区域的名称。必需。
可选配置选项:¶
ASSIGN_PUBLIC_IP - 是否为 ECS 执行器启动的容器分配公共 IP 地址。默认为“False”。
AWS_CONN_ID - ECS 执行器用于向 AWS ECS 发出 API 调用的 Airflow 连接(即凭据)。默认为“aws_default”。
LAUNCH_TYPE - 启动类型可以是“FARGATE”或“EC2”。默认为“FARGATE”。
PLATFORM_VERSION - 如果使用 FARGATE 启动类型,则 ECS 任务使用的平台版本。默认为“LATEST”。
RUN_TASK_KWARGS - 包含向 ECS
run_task
API 提供的参数的 JSON 字符串。SECURITY_GROUPS - 与 ECS 任务关联的最多 5 个逗号分隔的安全组 ID。默认为 VPC 默认值。
SUBNETS - 与 ECS 任务或服务关联的最多 16 个逗号分隔的子网 ID。默认为 VPC 默认值。
TASK_DEFINITION - 要运行的 ECS 任务定义的系列和修订版(family:revision)或完整 ARN。默认为最新的 ACTIVE 修订版。
MAX_RUN_TASK_ATTEMPTS - Ecs 执行器应尝试运行任务的最大次数。这指的是任务启动失败的情况(即 ECS API 失败、容器失败等)。
CHECK_HEALTH_ON_STARTUP - 是否在启动时检查 ECS 执行器的运行状况
有关可用选项的更详细描述,包括类型提示和示例,请参见 Amazon 提供程序包中的 config_templates
文件夹。
注意
exec_config
是可以提供给操作符的可选参数。它是一种字典类型,在 ECS 执行器的上下文中,它表示一个 run_task_kwargs
配置,然后该配置会更新覆盖上面 Airflow 配置中指定的 run_task_kwargs
(如果存在)。它是递归更新,本质上是将 Python 更新应用于配置中的每个嵌套字典。大致近似为:run_task_kwargs.update(exec_config)
AWS ECS 执行器的 Dockerfile¶
可以在此处找到示例 Dockerfile,它创建了一个可以由 AWS ECS 用于在 Apache Airflow 中使用 AWS ECS 执行器运行 Airflow 任务的镜像。该镜像支持 AWS CLI/API 集成,允许您在 Airflow 环境中与 AWS 服务交互。它还包括从 S3 存储桶或本地文件夹加载 DAG(有向无环图)的选项。
构建镜像¶
AWS CLI 将安装在镜像中,并且有多种方法可以将 AWS 身份验证信息传递给容器,因此有多种方法可以构建镜像。本指南将介绍 2 种方法。
最安全的方法是使用 IAM 角色。创建 ECS 任务定义时,您可以选择任务角色和任务执行角色。任务执行角色是容器代理用来代表您发出 AWS API 请求的角色。对于 ECS 执行器,此角色至少需要具有 AmazonECSTaskExecutionRolePolicy
以及 CloudWatchLogsFullAccess
(或 CloudWatchLogsFullAccessV2
)策略。任务角色是容器用来发出 AWS API 请求的角色。此角色需要具有基于正在运行的 DAG 中描述的任务的权限。如果您是通过 S3 存储桶加载 DAG,则此角色需要具有读取 S3 存储桶的权限。
要创建新的任务角色或任务执行角色,请按照以下步骤操作
导航到 AWS 控制台上的 IAM 页面,然后从左侧的选项卡中的“访问管理”下,选择“角色”。
在“角色”页面上,单击右上角的“创建角色”。
在“可信实体类型”下,选择“AWS 服务”。
从“用例”下的下拉列表中选择“Elastic Container Service”,然后选择“Elastic Container Service Task”作为特定用例。单击“下一步”。
在“权限”页面中,选择角色将需要的权限,具体取决于角色是任务角色还是任务执行角色。选择所有必需的权限后,单击“下一步”。
输入新角色的名称和可选的描述。检查角色的可信实体和权限。根据需要添加任何标签,然后单击“创建角色”。
在为 ECS 集群创建任务定义时(有关更多详细信息,请参见设置指南),为任务定义选择新创建的适当任务角色和任务执行角色。
然后,您可以通过 cd
到包含 Dockerfile 的目录并运行来构建镜像
docker build -t my-airflow-image \
--build-arg aws_default_region=YOUR_DEFAULT_REGION .
注意:重要的是,镜像必须在相同的体系结构下构建和运行。例如,对于 Apple Silicon 上的用户,您可能希望使用 docker buildx
指定 arch
docker buildx build --platform=linux/amd64 -t my-airflow-image \
--build-arg aws_default_region=YOUR_DEFAULT_REGION .
有关使用 docker buildx
的更多信息,请参见此处。
第二种方法是使用构建时参数(aws_access_key_id
、aws_secret_access_key
、aws_default_region
和 aws_session_token
)。
注意:不建议在生产环境中使用此方法,因为用户凭据存储在容器中,这可能是一个安全漏洞。
要使用这些参数传递 AWS 身份验证信息,请在 Docker 构建过程中使用 --build-arg
选项。例如
docker build -t my-airflow-image \
--build-arg aws_access_key_id=YOUR_ACCESS_KEY \
--build-arg aws_secret_access_key=YOUR_SECRET_KEY \
--build-arg aws_default_region=YOUR_DEFAULT_REGION \
--build-arg aws_session_token=YOUR_SESSION_TOKEN .
将 YOUR_ACCESS_KEY
、YOUR_SECRET_KEY
、YOUR_SESSION_TOKEN
和 YOUR_DEFAULT_REGION
替换为有效的 AWS 凭据。
基础镜像¶
Docker 镜像基于 apache/airflow:latest
镜像构建。有关镜像的更多信息,请参见此处。
重要提示:此镜像中的 Airflow 和 Python 版本必须与运行 Airflow 调度程序进程(进而运行执行器)的主机/容器上的 Airflow 和 Python 版本保持一致。可以通过使用以下命令在本地运行容器来验证镜像的 Airflow 版本
docker run <image_name> version
同样,可以使用以下命令验证镜像的 Python 版本
docker run <image_name> python --version
请确保这些版本与运行 Airflow 调度器进程(以及 ECS 执行器)的主机/容器上的版本相匹配。具有特定 Python 版本的 Apache Airflow 镜像可以从 Dockerhub 注册表中下载,并通过 Python 版本 过滤标签。例如,标签 latest-python3.9
指定该镜像将安装 Python 3.9。
加载 DAG¶
在 ECS 管理的容器上加载 DAG 有多种方法。此 Dockerfile 预配置了两种可能的方法:从本地文件夹复制或从 S3 存储桶下载。也可以使用其他加载 DAG 的方法。
从 S3 存储桶¶
要从 S3 存储桶加载 DAG,请取消注释 Dockerfile 中的入口点行,以将 DAG 从指定的 S3 存储桶同步到容器内的 /opt/airflow/dags
目录。如果您想将 DAG 存储在 /opt/airflow/dags
以外的目录中,可以选择提供 container_dag_path
作为构建参数。
在 docker build 命令中添加 --build-arg s3_uri=YOUR_S3_URI
。将 YOUR_S3_URI
替换为您的 S3 存储桶的 URI。请确保您具有从该存储桶读取的适当权限。
请注意,以下命令还会将 AWS 凭证作为构建参数传递。
docker build -t my-airflow-image \
--build-arg aws_access_key_id=YOUR_ACCESS_KEY \
--build-arg aws_secret_access_key=YOUR_SECRET_KEY \
--build-arg aws_default_region=YOUR_DEFAULT_REGION \
--build-arg aws_session_token=YOUR_SESSION_TOKEN \
--build-arg s3_uri=YOUR_S3_URI .
从本地文件夹¶
要从本地文件夹加载 DAG,请将您的 DAG 文件放在主机上 Docker 构建上下文中的文件夹中,并使用 host_dag_path
构建参数提供该文件夹的位置。默认情况下,DAG 将被复制到 /opt/airflow/dags
,但这可以通过在 Docker 构建过程中传递 container_dag_path
构建时参数来更改。
docker build -t my-airflow-image --build-arg host_dag_path=./dags_on_host --build-arg container_dag_path=/path/on/container .
如果选择将 DAG 加载到 /opt/airflow/dags
以外的路径,则需要在 Airflow 配置中更新新路径。
安装 Python 依赖项¶
此 Dockerfile 支持通过 pip
从 requirements.txt
文件安装 Python 依赖项。将您的 requirements.txt
文件放在与 Dockerfile 相同的目录中。如果它位于不同的位置,则可以使用 requirements_path
构建参数指定它。在复制 requirements.txt
文件时,请记住 Docker 上下文。取消注释 Dockerfile 中将 requirements.txt
文件复制到容器的两行,并运行 pip install
以在容器上安装依赖项。
日志记录¶
通过此执行器执行的 Airflow 任务在配置的 VPC 中的容器内运行。这意味着 Airflow Web 服务器无法直接访问日志,并且在任务完成后停止容器时,日志将永久丢失。
使用 ECS 执行器时,应采用远程日志记录来持久化您的 Airflow 任务日志,并使其可从 Airflow Web 服务器查看。
配置远程日志记录¶
配置远程日志记录的方法有很多,并且有多个受支持的目标。有关 Airflow 任务日志记录的概述,请参阅此处。有关配置 S3 远程日志记录的说明,请参阅此处,有关 Cloudwatch 远程日志记录的说明,请参阅此处。以下是在 ECS 执行器的上下文中进行远程日志记录时需要注意的一些重要事项
Airflow 远程日志记录的配置选项应在运行 Airflow 的所有主机和容器上进行配置。例如,Web 服务器需要此配置,以便它可以从远程位置获取日志,并且 ECS 容器需要此配置,以便它可以将日志上传到远程位置。请参阅此处,了解有关如何通过配置文件或环境变量导出设置 Airflow 配置的更多信息。
将 Airflow 远程日志记录配置添加到容器可以通过多种方式完成。一些示例包括但不限于
直接在 Dockerfile 中导出为环境变量(请参阅 Dockerfile 部分上方)
更新
airflow.cfg
文件或在 Dockerfile 中复制/挂载/下载自定义airflow.cfg
。在 ECS 任务定义中以纯文本形式添加,或通过Secrets/System Manager添加
或者,使用ECS 任务环境变量文件
您必须在容器内配置凭证,才能与您的日志的远程服务(例如 S3、CloudWatch Logs 等)进行交互。这可以通过多种方式完成。一些示例包括但不限于
注意
配置选项必须在运行 Airflow 组件(调度程序、Web 服务器、ECS 任务容器等)的所有主机/环境之间保持一致。有关设置配置的更多详细信息,请参见此处。
ECS 任务日志记录¶
可以将 ECS 配置为使用 awslogs 日志驱动程序将日志信息发送到 CloudWatch Logs,以用于 ECS 任务本身。这些日志将包括 Airflow 任务操作符日志记录以及在容器中运行的进程的整个生命周期中发生的任何其他日志记录(在本例中为 Airflow CLI 命令 airflow tasks run ...
)。这有助于调试远程日志记录问题或测试远程日志记录配置。有关启用此日志记录的信息,请参阅此处。
注意:这些日志将无法从 Airflow Web 服务器 UI 查看。
为 Apache Airflow 设置 ECS 执行器¶
在 Apache Airflow 中使 ECS 执行器工作涉及 3 个步骤
创建 Airflow 和在 ECS 中运行的任务可以连接到的数据库。
创建和配置可以运行 Airflow 任务的 ECS 集群。
配置 Airflow 以使用 ECS 执行器和数据库。
选择数据库后端有不同的选项。有关 Airflow 支持的不同选项的更多信息,请参阅此处。以下指南将说明如何在 AWS 上设置 PostgreSQL RDS 实例。该指南还将介绍如何设置 ECS 集群。ECS 执行器支持各种启动类型,但本指南将说明如何设置 ECS Fargate 集群。
为 AWS ECS 执行器设置 RDS DB 实例¶
创建 RDS DB 实例¶
登录到您的 AWS 管理控制台并导航到 RDS 服务。
单击“创建数据库”以开始创建新的 RDS 实例。
选择“标准创建”选项,然后选择 PostreSQL。
选择适当的模板、可用性和持久性。
注意:在撰写本文时,“多可用区 DB 集群”选项不支持设置数据库名称,这是下面的必要步骤。
设置 DB 实例名称、用户名和密码。
选择实例配置和存储参数。
在“连接”部分中,选择“不连接到 EC2 计算资源”
选择或创建 VPC 和子网,并允许对 DB 的公共访问。选择或创建安全组并选择可用区。
打开“其他配置”选项卡,并将数据库名称设置为
airflow_db
。根据需要选择其他设置,然后单击“创建数据库”来创建数据库。
测试连接¶
为了能够连接到新的 RDS 实例,您需要允许来自您的 IP 地址的入站流量进入数据库。
在 RDS 实例的“连接和安全”选项卡中的“安全”标题下,找到指向您的新 RDS DB 实例的 VPC 安全组的链接。
创建一个入站规则,该规则允许来自您的 IP 地址(在 TCP 端口 5432(PostgreSQL)上)的流量。
修改安全组后,请确认可以连接到数据库。这需要安装
psql
。有关安装psql
的说明,请访问此处。
注意:在测试连接之前,请确保您的数据库状态为“可用”。
psql -h <endpoint> -p 5432 -U <username> <db_name>
端点可以在“连接和安全”选项卡中找到,用户名(和密码)是创建数据库时使用的凭据。
数据库名称应为 airflow_db
(除非创建数据库时使用了不同的名称)。
如果连接成功,系统会提示您输入密码。
使用 Fargate 创建 ECS 集群和任务定义¶
为了为与 Apache Airflow 协同工作的 ECS 集群创建任务定义,您需要一个配置正确的 Docker 镜像。 有关如何执行此操作的说明,请参见 Dockerfile 部分。
构建镜像后,需要将其放入 ECS 可以拉取的存储库中。 有多种方法可以实现这一点。 本指南将介绍如何使用 Amazon Elastic Container Registry (ECR) 执行此操作。
创建 ECR 存储库¶
登录到您的 AWS 管理控制台并导航到 ECR 服务。
单击“创建存储库”。
命名存储库并根据需要填写其他信息。
单击“创建存储库”。
创建存储库后,单击存储库。单击右上角的“查看推送命令”按钮。
按照说明推送 Docker 镜像,并根据需要替换镜像名称。 确保在推送镜像后刷新页面以确保镜像已上传。
创建 ECS 集群¶
登录到您的 AWS 管理控制台并导航到 Amazon Elastic Container Service。
单击“集群”,然后单击“创建集群”。
确保在“基础设施”下选择了“AWS Fargate(无服务器)”。
根据需要选择其他选项,然后单击“创建”以创建集群。
创建任务定义¶
单击左侧栏上的“任务定义”,然后单击“创建新的任务定义”。
选择“任务定义系列”名称。为“启动类型”选择“AWS Fargate”。
选择或创建“任务角色”和“任务执行角色”,并确保这些角色具有完成各自任务所需的权限。 您可以选择创建一个新的“任务执行角色”,该角色将具有任务运行所需的基本最低权限。
为容器选择一个名称,并使用上一节中推送的镜像的镜像 URI。 确保使用的角色具有拉取镜像所需的权限。
将以下环境变量添加到容器
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
,其值为 PostgreSQL 连接字符串,格式如下,使用上述数据库部分设置的值。
postgresql+psycopg2://<username>:<password>@<endpoint>/<database_name>
AIRFLOW__ECS_EXECUTOR__SECURITY_GROUPS
,其值为与 RDS 实例使用的 VPC 关联的安全组 ID 的逗号分隔列表。AIRFLOW__ECS_EXECUTOR__SUBNETS
,其值为与 RDS 实例关联的子网 ID 的逗号分隔列表。
允许 ECS 容器访问 RDS 数据库¶
作为最后一步,必须为 ECS 容器配置对数据库的访问。 可以使用许多不同的网络配置,但一种可能的方法是
登录到您的 AWS 管理控制台并导航到 VPC 控制面板。
在左侧的“安全”标题下,单击“安全组”。
选择与您的 RDS 实例关联的安全组,然后单击“编辑入站规则”。
添加一条新规则,允许 PostgreSQL 类型流量到与 Ecs 集群关联的子网的 CIDR。
配置 Airflow¶
要配置 Airflow 以利用 ECS 执行器并利用我们设置的资源,请创建一个脚本(例如,ecs_executor_config.sh
),其内容如下
export AIRFLOW__CORE__EXECUTOR='airflow.providers.amazon.aws.executors.ecs.ecs_executor.AwsEcsExecutor'
export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=<postgres-connection-string>
export AIRFLOW__AWS_ECS_EXECUTOR__REGION_NAME=<executor-region>
export AIRFLOW__AWS_ECS_EXECUTOR__CLUSTER=<ecs-cluster-name>
export AIRFLOW__AWS_ECS_EXECUTOR__CONTAINER_NAME=<ecs-container-name>
export AIRFLOW__AWS_ECS_EXECUTOR__TASK_DEFINITION=<task-definition-name>
export AIRFLOW__AWS_ECS_EXECUTOR__LAUNCH_TYPE='FARGATE'
export AIRFLOW__AWS_ECS_EXECUTOR__PLATFORM_VERSION='LATEST'
export AIRFLOW__AWS_ECS_EXECUTOR__ASSIGN_PUBLIC_IP='True'
export AIRFLOW__AWS_ECS_EXECUTOR__SECURITY_GROUPS=<security-group-id-for-rds>
export AIRFLOW__AWS_ECS_EXECUTOR__SUBNETS=<subnet-id-for-rds>
此脚本应在启动 Airflow 调度程序和 Web 服务器之前,在运行这些进程的主机上运行。
该脚本设置环境变量,这些环境变量配置 Airflow 以使用 ECS 执行器,并提供任务执行所需的信息。 应将所做的任何其他配置更改(例如用于远程日志记录)添加到此示例脚本中,以使整个 Airflow 环境的配置保持一致。
初始化 Airflow 数据库¶
需要先初始化 Airflow 数据库,才能使用它,并且需要添加用户才能登录。 以下命令添加一个管理员用户(如果尚未初始化,该命令也会初始化数据库)
airflow users create --username admin --password admin --firstname <your first name> --lastname <your last name> --email <your email> --role Admin