警告

Batch 执行器目前处于 alpha/实验阶段,可能会在没有警告的情况下进行更改。

AWS Batch 执行器

这是一个由 Amazon Batch 驱动的 Airflow 执行器。由 Airflow 调度的每个任务都在一个单独的容器内运行,由 Batch 调度。这种执行器的一些优点包括:

  1. 可扩展性和更低的成本:AWS Batch 允许动态配置执行任务所需的资源。根据分配的资源,AWS Batch 可以根据工作负载自动向上或向下扩展,确保高效的资源利用率并降低成本。

  2. 作业队列和优先级:AWS Batch 提供了作业队列的概念,允许对任务的执行进行优先级排序和管理。这确保了在同时调度多个任务时,它们会按照所需的优先级顺序执行。

  3. 灵活性:AWS Batch 支持 Fargate (ECS)、EC2 和 EKS 计算环境。这种范围广泛的计算环境,以及微调分配给计算环境的资源的能力,为用户在选择最适合其工作负载的执行环境方面提供了很大的灵活性。

  4. 快速任务执行:通过在 AWS Batch 中维护一个活动的 worker,提交给服务的任务可以快速执行。有了准备就绪的 worker,启动延迟最小,确保任务在提交后立即开始。此功能对于时间敏感的工作负载或需要近实时处理的应用程序尤其有利,从而提高整体工作流程效率和响应能力。

有关快速入门指南,请参阅此处,它将帮助您快速启动并运行基本配置。

以下部分提供了有关配置、提供的示例 Dockerfile 和日志记录的更多通用详细信息。

配置选项

有许多可用的配置选项,这些选项可以直接在 airflow.cfg 文件中的 “aws_batch_executor” 部分下设置,也可以通过使用 AIRFLOW__AWS_BATCH_EXECUTOR__<OPTION_NAME> 格式的环境变量设置,例如 AIRFLOW__AWS_BATCH_EXECUTOR__JOB_QUEUE = "myJobQueue"。有关如何设置这些选项的更多信息,请参阅设置配置选项

注意

配置选项必须在运行 Airflow 组件(调度器、Web 服务器、执行器管理的资源等)的所有主机/环境之间保持一致。有关设置配置的更多详细信息,请参阅此处

如果发生冲突,优先级顺序从低到高是

  1. 加载具有默认值的选项的默认值。

  2. 加载通过 airflow.cfg 或环境变量显式提供的任何值。这些值会通过 Airflow 的配置优先级进行检查。

  3. 如果提供了,则加载 SUBMIT_JOB_KWARGS 选项中提供的任何值。

注意

exec_config 是一个可选参数,可以提供给操作符。它是一个字典类型,在 Batch 执行器的上下文中,它表示 submit_job_kwargs 配置,然后在上面更新 Airflow 配置中指定的 submit_job_kwargs(如果存在)。它是一个递归更新,本质上是将 Python 更新应用于配置中的每个嵌套字典。大致近似为:submit_job_kwargs.update(exec_config)

必需的配置选项:

  • JOB_QUEUE - 提交作业的作业队列。必需。

  • JOB_DEFINITION - 此作业使用的作业定义。必需。

  • JOB_NAME - AWS Batch 作业的名称。必需。

  • REGION_NAME - 配置 Amazon Batch 的 AWS 区域的名称。必需。

可选的配置选项:

  • AWS_CONN_ID - Batch 执行器用于对 AWS Batch 进行 API 调用的 Airflow 连接(即凭据)。默认为 “aws_default”。

  • SUBMIT_JOB_KWARGS - 包含要提供给 Batch submit_job API 的参数的 JSON 字符串。

  • MAX_SUBMIT_JOB_ATTEMPTS - Batch 执行器应尝试提交作业的最大次数。这指的是作业启动失败的情况(即 API 失败、容器失败等)。

  • CHECK_HEALTH_ON_STARTUP - 是否在启动时检查 Batch 执行器的运行状况

有关可用选项的更详细说明,包括类型提示和示例,请参阅 Amazon 提供程序包中的 config_templates 文件夹。

注意

exec_config 是一个可选参数,可以提供给操作符。它是一个字典类型,在 Batch 执行器的上下文中,它表示 submit_job_kwargs 配置,然后在上面更新 Airflow 配置中指定的 submit_job_kwargs(如果存在)。它是一个递归更新,本质上是将 Python 更新应用于配置中的每个嵌套字典。大致近似为:submit_job_kwargs.update(exec_config)

AWS Batch 执行器的 Dockerfile

可以在此处找到示例 Dockerfile,它创建一个可以被 AWS Batch 用来使用 Apache Airflow 中的 AWS Batch 执行器运行 Airflow 任务的映像。该映像支持 AWS CLI/API 集成,允许您在 Airflow 环境中与 AWS 服务进行交互。它还包括从 S3 存储桶或本地文件夹加载 DAG(有向无环图)的选项。

先决条件

您的系统上必须安装 Docker。有关安装 Docker 的说明,请参阅此处

构建映像

AWS CLI 将安装在映像中,并且有多种方法可以将 AWS 身份验证信息传递到容器,因此有多种方法来构建映像。本指南将介绍两种方法。

最安全的方法是使用 IAM 角色。创建 AWS Batch 作业定义时,您可以选择作业角色和执行角色。执行角色是容器代理用于代表您进行 AWS API 请求的角色。根据 Batch 执行器使用的计算资源,需要将适当的策略附加到执行角色。此外,该角色还需要至少具有 CloudWatchLogsFullAccess(或 CloudWatchLogsFullAccessV2)策略。作业角色是容器用于进行 AWS API 请求的角色。此角色需要具有基于正在运行的 DAG 中描述的任务的权限。如果您通过 S3 存储桶加载 DAG,则此角色需要具有读取 S3 存储桶的权限。

要创建新的作业角色或执行角色,请按照以下步骤操作

  1. 导航到 AWS 控制台上的 IAM 页面,然后在左侧的选项卡中,在访问管理下,选择角色。

  2. 在角色页面上,单击右上角的创建角色。

  3. 在可信实体类型下,选择 AWS 服务。

  4. 选择适用的用例。

  5. 在权限页面中,选择角色将需要的权限,具体取决于角色是作业角色还是执行角色。选择所有必需的权限后,单击下一步。

  6. 输入新角色的名称和一个可选的描述。查看角色的可信实体和权限。根据需要添加任何标签,然后单击创建角色。

为 Batch 创建作业定义时(有关更多详细信息,请参阅设置指南),请为作业定义选择适当的新创建的作业角色和执行角色。

然后,您可以通过 cd 进入包含 Dockerfile 的目录并运行来构建映像

docker build -t my-airflow-image \
 --build-arg aws_default_region=YOUR_DEFAULT_REGION .

注意:重要的是,映像是在相同的体系结构下构建和运行的。例如,对于 Apple Silicon 上的用户,您可能希望使用 docker buildx 指定架构

docker buildx build --platform=linux/amd64 -t my-airflow-image \
  --build-arg aws_default_region=YOUR_DEFAULT_REGION .

有关使用 docker buildx 的更多信息,请参阅此处

第二种方法是使用构建时参数(aws_access_key_idaws_secret_access_keyaws_default_regionaws_session_token)。

注意:不建议在生产环境中使用此方法,因为用户凭据存储在容器中,这可能是一个安全漏洞。

要使用这些参数传递 AWS 身份验证信息,请在 Docker 构建过程中使用 --build-arg 选项。例如

docker build -t my-airflow-image \
 --build-arg aws_access_key_id=YOUR_ACCESS_KEY \
 --build-arg aws_secret_access_key=YOUR_SECRET_KEY \
 --build-arg aws_default_region=YOUR_DEFAULT_REGION \
 --build-arg aws_session_token=YOUR_SESSION_TOKEN .

YOUR_ACCESS_KEYYOUR_SECRET_KEYYOUR_SESSION_TOKENYOUR_DEFAULT_REGION 替换为有效的 AWS 凭据。

基础映像

Docker 映像基于 apache/airflow:latest 映像构建。有关该映像的更多信息,请参阅此处

重要提示:此映像中的 Airflow 和 Python 版本必须与运行 Airflow 调度器进程(进而运行执行器)的主机/容器上的 Airflow 和 Python 版本一致。可以通过使用以下命令在本地运行容器来验证映像的 Airflow 版本

docker run <image_name> version

同样,可以通过以下命令验证映像的 Python 版本

docker run <image_name> python --version

确保这些版本与运行 Airflow 调度器进程(以及 Batch 执行器)的主机/容器上的版本匹配。可以从 Dockerhub 注册表下载具有特定 Python 版本的 Apache Airflow 映像,并按Python 版本过滤标签。例如,标签 latest-python3.9 指定映像将安装 Python 3.9。

加载 DAG

有很多方法可以在由 Batch 管理的容器上加载 DAG。此 Dockerfile 预先配置了两种可能的方法:从本地文件夹复制或从 S3 存储桶下载。也可以使用其他加载 DAG 的方法。

从 S3 存储桶

要从 S3 存储桶加载 DAG,请取消 Dockerfile 中入口点行的注释,以便将 DAG 从指定的 S3 存储桶同步到容器内的 /opt/airflow/dags 目录。如果希望将 DAG 存储在 /opt/airflow/dags 之外的目录中,可以选择提供 container_dag_path 作为构建参数。

在 docker build 命令中添加 --build-arg s3_uri=YOUR_S3_URI。将 YOUR_S3_URI 替换为您的 S3 存储桶的 URI。确保您具有从存储桶读取的适当权限。

请注意,以下命令也将 AWS 凭证作为构建参数传递。

docker build -t my-airflow-image \
 --build-arg aws_access_key_id=YOUR_ACCESS_KEY \
 --build-arg aws_secret_access_key=YOUR_SECRET_KEY \
 --build-arg aws_default_region=YOUR_DEFAULT_REGION \
 --build-arg aws_session_token=YOUR_SESSION_TOKEN \
 --build-arg s3_uri=YOUR_S3_URI .

从本地文件夹

要从本地文件夹加载 DAG,请将您的 DAG 文件放置在主机上 Docker 构建上下文中的文件夹中,并使用 host_dag_path 构建参数提供该文件夹的位置。默认情况下,DAG 将被复制到 /opt/airflow/dags,但这可以通过在 Docker 构建过程中传递 container_dag_path 构建时参数来更改。

docker build -t my-airflow-image --build-arg host_dag_path=./dags_on_host --build-arg container_dag_path=/path/on/container .

如果选择将 DAG 加载到 /opt/airflow/dags 之外的其他路径,则需要在 Airflow 配置中更新新路径。

安装 Python 依赖项

此 Dockerfile 支持通过 piprequirements.txt 文件安装 Python 依赖项。将您的 requirements.txt 文件放置在与 Dockerfile 相同的目录中。如果它位于不同的位置,可以使用 requirements_path 构建参数指定它。在复制 requirements.txt 文件时,请记住 Docker 上下文。取消注释 Dockerfile 中将 requirements.txt 文件复制到容器并运行 pip install 以在容器上安装依赖项的两行适当的行。

为 AWS Batch 执行器构建镜像

有关如何将您通过此自述文件创建的 Docker 镜像与 Batch 执行器一起使用的详细说明,请参阅此处

日志记录

通过此执行器执行的 Airflow 任务在配置的 VPC 内的容器中运行。这意味着 Airflow Webserver 无法直接访问日志,并且当容器在任务完成后停止时,日志将永久丢失。

使用 Batch 执行器时,应采用远程日志记录来持久化您的 Airflow 任务日志,并使其可从 Airflow Webserver 查看。

配置远程日志记录

有多种方法可以配置远程日志记录以及多个受支持的目标。有关 Airflow 任务日志记录的常规概述,请参阅此处。有关配置 S3 远程日志记录的说明,请参阅此处,有关 Cloudwatch 远程日志记录的说明,请参阅此处。在 Batch 执行器的上下文中,需要指出远程日志记录的一些重要事项

  • Airflow 远程日志记录的配置选项应在运行 Airflow 的所有主机和容器上进行配置。例如,Webserver 需要此配置才能从远程位置获取日志,而 Batch 执行器运行的容器需要此配置才能将日志上传到远程位置。请参阅此处,了解有关如何通过配置文件或环境变量导出设置 Airflow 配置的更多信息。

  • 可以使用多种方法将 Airflow 远程日志记录配置添加到容器。一些示例包括但不限于

    • 在 Dockerfile 中直接导出为环境变量(请参阅上面的 Dockerfile 部分)

    • 更新 airflow.cfg 文件,或在 Dockerfile 中复制/挂载/下载自定义的 airflow.cfg

    • 在作业定义中添加为环境变量

  • 您必须在容器中配置凭证,以便能够与日志的远程服务(例如,S3、CloudWatch Logs 等)进行交互。这可以通过多种方式完成。一些示例包括但不限于

    • 直接将凭证导出到 Dockerfile 中(请参阅上面的 Dockerfile 部分)

    • 配置 Airflow 连接并将其作为 远程日志连接 ID 提供(通过上面列出的任何方式或您首选的方法导出到容器中)。然后,Airflow 将专门使用这些凭证与您选择的远程日志记录目标进行交互。

注意

配置选项必须在运行 Airflow 组件(调度器、Web 服务器、执行器管理的资源等)的所有主机/环境之间保持一致。有关设置配置的更多详细信息,请参阅此处

为 Apache Airflow 设置 Batch 执行器

在 Apache Airflow 中使 Batch 执行器工作涉及 3 个步骤

  1. 创建一个 Airflow 和 Batch 执行的任务可以连接的数据库。

  2. 创建和配置可以运行来自 Airflow 的任务的 Batch 资源。

  3. 配置 Airflow 以使用 Batch 执行器和数据库。

有不同的选项可用于选择数据库后端。有关 Airflow 支持的不同选项的更多信息,请参阅此处。以下指南将说明如何在 AWS 上设置 PostgreSQL RDS 实例。

为 AWS Batch 执行器设置 RDS 数据库实例

创建 RDS 数据库实例

  1. 登录到您的 AWS 管理控制台并导航到 RDS 服务。

  2. 单击“创建数据库”开始创建新的 RDS 实例。

  3. 选择“标准创建”选项,然后选择 PostreSQL。

  4. 选择适当的模板、可用性和持久性。

    • 注意:在撰写本文时,“多可用区数据库集群”选项不支持设置数据库名称,这是下面所需的步骤。

  5. 设置数据库实例名称、用户名和密码。

  6. 选择实例配置和存储参数。

  7. 在“连接”部分中,选择“不连接到 EC2 计算资源”

  8. 选择或创建 VPC 和子网,并允许对数据库进行公共访问。选择或创建安全组,然后选择可用区。

  9. 打开“其他配置”选项卡,并将数据库名称设置为 airflow_db

  10. 根据需要选择其他设置,然后单击“创建数据库”以创建数据库。

测试连接性

为了能够连接到新的 RDS 实例,您需要允许从您的 IP 地址到数据库的入站流量。

  1. 在 RDS 实例的“连接和安全”选项卡中的“安全”标题下,找到指向新 RDS 数据库实例的 VPC 安全组的链接。

  2. 创建一个入站规则,允许来自您的 IP 地址的 TCP 端口 5432 (PostgreSQL) 上的流量。

  3. 修改安全组后,确认您可以连接到数据库。这将需要安装 psql。有关安装 psql 的说明,请参阅此处

注意:在测试连接性之前,请确保您的数据库状态为“可用”

psql -h <endpoint> -p 5432 -U <username> <db_name>

可以在“连接和安全”选项卡上找到端点,用户名(和密码)是创建数据库时使用的凭证。

db_name 应为 airflow_db(除非创建数据库时使用了其他名称)。

如果连接成功,系统将提示您输入密码。

设置 AWS Batch

AWS Batch 可以通过多种方式进行配置,具体取决于使用案例,具有不同的编排类型。为简单起见,本指南将介绍如何使用 EC2 设置 Batch。

为了设置 AWS Batch 以便它可以与 Apache Airflow 一起使用,您将需要一个正确配置的 Docker 镜像。有关如何执行此操作的说明,请参阅 Dockerfile 部分。

构建镜像后,需要将其放入存储库中,以便容器可以拉取它。有多种方法可以实现此目的。本指南将介绍如何使用 Amazon Elastic Container Registry (ECR) 来完成此操作。

创建 ECR 存储库

  1. 登录您的 AWS 管理控制台,并导航到 ECR 服务。

  2. 点击“创建存储库”。

  3. 命名存储库并根据需要填写其他信息。

  4. 点击“创建存储库”。

  5. 创建存储库后,点击该存储库。点击右上角的“查看推送命令”按钮。

  6. 按照说明推送 Docker 镜像,并根据需要替换镜像名称。请在推送镜像后刷新页面,确保镜像已成功上传。

配置 AWS Batch

  1. 登录您的 AWS 管理控制台,并导航到 AWS Batch 登录页面。

  2. 在左侧边栏中,点击“向导”。此向导将引导您创建运行 Batch 作业所需的所有资源。

  3. 选择编排方式为 Amazon EC2。

  4. 点击“下一步”。

创建计算环境

  1. 为计算环境选择一个名称、标签和任何适当的实例配置。在这里,您可以选择最小、最大和期望的 vCPU 数量,以及您希望使用的 EC2 实例类型。

  2. 对于实例角色,选择创建新的实例配置文件或使用具有所需 IAM 权限的现有实例配置文件。此实例配置文件允许为您的计算环境创建的 Amazon ECS 容器实例代表您调用所需的 AWS API 操作。

  3. 选择一个允许访问互联网的 VPC,以及具有必要权限的安全组。

  4. 点击“下一步”。

创建作业队列

  1. 为作业队列选择一个名称,以及优先级。计算环境将设置为上一步中创建的环境。

创建作业定义

  1. 为作业定义选择一个名称。

  2. 选择适当的平台配置。确保启用 分配公共 IP

  3. 选择执行角色,并确保该角色具有完成其任务所需的权限。

  4. 输入上一节中推送的镜像的镜像 URI。确保使用的角色具有拉取镜像所需的权限。

  5. 根据运行的任务的要求,选择适当的作业角色。

  6. 根据需要配置环境。您可以指定容器可用的 vCPU、内存或 GPU 数量。此外,将以下环境变量添加到容器中

  • AIRFLOW__DATABASE__SQL_ALCHEMY_CONN,其值为 PostgreSQL 连接字符串,格式如下,使用数据库部分中设置的值

postgresql+psycopg2://<username>:<password>@<endpoint>/<database_name>
  1. 根据 Airflow 的一般需要(请参阅此处)、Batch 执行器(请参阅此处)或远程日志记录(请参阅此处)添加其他配置。 请注意,应在整个 Airflow 环境中进行任何配置更改,以保持配置的一致性。

  2. 点击“下一步”。

  3. 在“查看和创建”页面中,查看所有选择,并在一切正确后,点击“创建资源”。

允许容器访问 RDS 数据库

作为最后一步,必须为 Batch 管理的容器配置对数据库的访问。有许多不同的网络配置是可能的,但一种可能的方法是

  1. 登录您的 AWS 管理控制台,并导航到 VPC 控制面板。

  2. 在左侧的“安全”标题下,点击“安全组”。

  3. 选择与您的 RDS 实例关联的安全组,然后点击“编辑入站规则”。

  4. 添加一个新规则,允许将 PostgreSQL 类型流量发送到与 Batch 计算环境关联的子网的 CIDR。

配置 Airflow

要配置 Airflow 以使用 Batch 执行器并利用我们设置的资源,请确保定义以下环境变量

AIRFLOW__CORE__EXECUTOR='airflow.providers.amazon.aws.executors.batch.batch_executor.AwsBatchExecutor'

AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=<postgres-connection-string>

AIRFLOW__AWS_BATCH_EXECUTOR__REGION_NAME=<executor-region>

AIRFLOW__AWS_BATCH_EXECUTOR__JOB_QUEUE=<batch-job-queue>

AIRFLOW__AWS_BATCH_EXECUTOR__JOB_DEFINITION=<batch-job-definition>

AIRFLOW__AWS_BATCH_EXECUTOR__JOB_NAME=<batch-job-name>

此脚本应在启动 Airflow 调度程序和 Web 服务器之前,在运行 Airflow 调度程序和 Web 服务器的主机上运行。

该脚本设置环境变量,以配置 Airflow 使用 Batch 执行器,并为任务执行提供必要的信息。 任何其他配置更改(例如远程日志记录)都应添加到此示例脚本中,以保持 Airflow 环境中配置的一致性。

初始化 Airflow DB

Airflow DB 需要先初始化才能使用,并且需要添加一个用户才能登录。以下命令添加一个管理员用户(如果尚未初始化,该命令还会初始化 DB)

airflow users create --username admin --password admin --firstname <your first name> --lastname <your last name> --email <your email> --role Admin

此条目有帮助吗?