警告

Batch 执行器目前处于 alpha/实验阶段,可能会在没有警告的情况下进行更改。

AWS Batch 执行器

这是一个由 Amazon Batch 驱动的 Airflow 执行器。 由 Airflow 调度的每个任务都在由 Batch 调度的单独容器中运行。 这种执行器的一些优点包括

  1. 可扩展性和更低的成本:AWS Batch 允许动态配置执行任务所需的资源。 根据分配的资源,AWS Batch 可以根据工作负载自动向上或向下扩展,从而确保高效的资源利用率并降低成本。

  2. 作业队列和优先级:AWS Batch 提供了作业队列的概念,允许对任务的执行进行优先级排序和管理。 这确保了当同时调度多个任务时,它们会按照所需的优先级顺序执行。

  3. 灵活性:AWS Batch 支持 Fargate (ECS)、EC2 和 EKS 计算环境。 这些计算环境范围以及精细定义分配给计算环境的资源的能力为用户提供了很大的灵活性,可以选择最适合其工作负载的执行环境。

  4. 快速的任务执行:通过在 AWS Batch 中维护一个活动的 worker,提交给该服务的任务可以迅速执行。 凭借一个准备就绪的 worker,启动延迟非常小,从而确保任务在提交后立即开始。 此功能对于时间敏感型工作负载或需要近乎实时处理的应用程序尤其有利,从而提高了整体工作流程效率和响应能力。

有关快速入门指南,请参阅 此处,它将帮助您启动并运行基本配置。

以下部分提供了有关配置,提供的示例 Dockerfile 和日志记录的更多通用详细信息。

配置选项

有许多配置选项可用,这些选项可以直接在 airflow.cfg 文件中的“aws_batch_executor”部分下设置,也可以通过使用 AIRFLOW__AWS_BATCH_EXECUTOR__<OPTION_NAME> 格式的环境变量设置,例如 AIRFLOW__AWS_BATCH_EXECUTOR__JOB_QUEUE = "myJobQueue"。 有关如何设置这些选项的更多信息,请参见 设置配置选项

注意

配置选项必须在运行 Airflow 组件的所有主机/环境(调度程序,Web服务器,执行器管理资源等)中保持一致。 有关设置配置的更多详细信息,请参见此处

如果发生冲突,优先级顺序从最低到最高为

  1. 加载具有默认值的选项的默认值。

  2. 加载通过 airflow.cfg 或环境变量显式提供的任何值。 这些值会根据 Airflow 的配置优先级进行检查。

  3. 如果提供了 SUBMIT_JOB_KWARGS 选项,则加载其中提供的任何值。

注意

exec_config 是一个可选参数,可以提供给操作符。 它是一种字典类型,在 Batch 执行器的上下文中,它表示 submit_job_kwargs 配置,然后该配置会覆盖 Airflow 配置中指定的 submit_job_kwargs(如果存在)。 它是一个递归更新,本质上将 Python 更新应用于配置中的每个嵌套字典。 粗略地近似为:submit_job_kwargs.update(exec_config)

必需的配置选项:

  • JOB_QUEUE - 提交作业的作业队列。必需。

  • JOB_DEFINITION - 此作业使用的作业定义。必需。

  • JOB_NAME - AWS Batch 作业的名称。必需。

  • REGION_NAME - 配置 Amazon Batch 的 AWS 区域的名称。必需。

可选的配置选项:

  • AWS_CONN_ID - Batch 执行器用于向 AWS Batch 发出 API 调用的 Airflow 连接(即凭据)。 默认为“aws_default”。

  • SUBMIT_JOB_KWARGS - 一个 JSON 字符串,包含提供给 Batch submit_job API 的参数。

  • MAX_SUBMIT_JOB_ATTEMPTS - Batch 执行器应尝试提交作业的最大次数。 这指的是作业启动失败的实例(即 API 故障,容器故障等)

  • CHECK_HEALTH_ON_STARTUP - 是否在启动时检查 Batch 执行器的运行状况

有关可用选项的更详细说明,包括类型提示和示例,请参见 Amazon 提供程序包中的 config_templates 文件夹。

注意

exec_config 是一个可选参数,可以提供给操作符。 它是一种字典类型,在 Batch 执行器的上下文中,它表示 submit_job_kwargs 配置,然后该配置会覆盖 Airflow 配置中指定的 submit_job_kwargs(如果存在)。 它是一个递归更新,本质上将 Python 更新应用于配置中的每个嵌套字典。 粗略地近似为:submit_job_kwargs.update(exec_config)

AWS Batch 执行器的 Dockerfile

可以在此处找到示例 Dockerfile,它创建了一个可供 AWS Batch 使用的镜像,以使用 Apache Airflow 中的 AWS Batch 执行器运行 Airflow 任务。 该镜像支持 AWS CLI / API 集成,允许您在 Airflow 环境中与 AWS 服务进行交互。 它还包括从 S3 存储桶或本地文件夹加载 DAG(有向无环图)的选项。

先决条件

您的系统上必须安装 Docker。 有关安装 Docker 的说明,请参见此处

构建镜像

AWS CLI 将安装在镜像中,并且有多种方法可以将 AWS 身份验证信息传递到容器,因此有多种方法可以构建镜像。 本指南将介绍两种方法。

最安全的方法是使用 IAM 角色。 创建 AWS Batch 作业定义时,可以选择作业角色和执行角色。 执行角色是容器代理用来代表您发出 AWS API 请求的角色。 根据 Batch 执行器使用的计算类型,需要将适当的策略附加到执行角色。 此外,该角色还需要至少具有 CloudWatchLogsFullAccess(或 CloudWatchLogsFullAccessV2)策略。 作业角色是容器用来发出 AWS API 请求的角色。 此角色需要具有基于正在运行的 DAG 中描述的任务的权限。 如果您通过 S3 存储桶加载 DAG,则此角色需要具有读取 S3 存储桶的权限。

要创建新的作业角色或执行角色,请按照以下步骤操作

  1. 导航到 AWS 控制台上的 IAM 页面,然后从左侧选项卡上的“访问管理”下,选择“角色”。

  2. 在“角色”页面上,单击右上角的“创建角色”。

  3. 在“受信任实体类型”下,选择“AWS 服务”。

  4. 选择适用的用例。

  5. 在“权限”页面中,根据角色是作业角色还是执行角色,选择角色将需要的权限。 选择所有必需的权限后,单击“下一步”。

  6. 输入新角色的名称,以及可选的描述。 查看角色的“受信任实体”和“权限”。 根据需要添加任何标签,然后单击“创建角色”。

为 Batch 创建作业定义时(有关更多详细信息,请参见设置指南),请为作业定义选择适当的新创建的作业角色和执行角色。

然后,您可以通过 cd 到包含 Dockerfile 的目录并运行来构建镜像

docker build -t my-airflow-image \
 --build-arg aws_default_region=YOUR_DEFAULT_REGION .

注意:重要的是镜像以相同的架构构建和运行。 例如,对于 Apple Silicon 上的用户,您可能希望使用 docker buildx 指定 arch

docker buildx build --platform=linux/amd64 -t my-airflow-image \
  --build-arg aws_default_region=YOUR_DEFAULT_REGION .

有关使用 docker buildx 的更多信息,请参见此处

第二种方法是使用构建时参数(aws_access_key_idaws_secret_access_keyaws_default_regionaws_session_token)。

注意:不建议在生产环境中使用此方法,因为用户凭据存储在容器中,这可能是一个安全漏洞。

要使用这些参数传递 AWS 身份验证信息,请在 Docker 构建过程中使用 --build-arg 选项。例如:

docker build -t my-airflow-image \
 --build-arg aws_access_key_id=YOUR_ACCESS_KEY \
 --build-arg aws_secret_access_key=YOUR_SECRET_KEY \
 --build-arg aws_default_region=YOUR_DEFAULT_REGION \
 --build-arg aws_session_token=YOUR_SESSION_TOKEN .

YOUR_ACCESS_KEYYOUR_SECRET_KEYYOUR_SESSION_TOKENYOUR_DEFAULT_REGION 替换为有效的 AWS 凭证。

基础镜像

Docker 镜像构建于 apache/airflow:latest 镜像之上。有关该镜像的更多信息,请参见此处

重要提示:此镜像中的 Airflow 和 Python 版本必须与运行 Airflow 调度程序进程(进而运行执行器)的主机/容器上的 Airflow 和 Python 版本一致。可以使用以下命令在本地运行容器来验证镜像的 Airflow 版本:

docker run <image_name> version

类似地,可以使用以下命令验证镜像的 Python 版本:

docker run <image_name> python --version

确保这些版本与运行 Airflow 调度程序进程(以及 Batch 执行器)的主机/容器上的版本匹配。具有特定 Python 版本的 Apache Airflow 镜像可以从 Dockerhub 注册表下载,并通过 Python 版本 筛选标签。例如,标签 latest-python3.9 指定该镜像将安装 Python 3.9。

加载 DAGs

在 Batch 管理的容器上加载 DAGs 有很多方法。此 Dockerfile 预配置了两种可能的方法:从本地文件夹复制,或从 S3 存储桶下载。也可以使用其他加载 DAGs 的方法。

从 S3 存储桶

要从 S3 存储桶加载 DAGs,请取消注释 Dockerfile 中的 entrypoint 行,以便将 DAGs 从指定的 S3 存储桶同步到容器内的 /opt/airflow/dags 目录。如果想将 DAGs 存储在 /opt/airflow/dags 以外的目录中,您可以选择性地提供 container_dag_path 作为构建参数。

在 docker build 命令中添加 --build-arg s3_uri=YOUR_S3_URI。将 YOUR_S3_URI 替换为您的 S3 存储桶的 URI。确保您具有从该存储桶读取的适当权限。

请注意,以下命令还会将 AWS 凭证作为构建参数传递。

docker build -t my-airflow-image \
 --build-arg aws_access_key_id=YOUR_ACCESS_KEY \
 --build-arg aws_secret_access_key=YOUR_SECRET_KEY \
 --build-arg aws_default_region=YOUR_DEFAULT_REGION \
 --build-arg aws_session_token=YOUR_SESSION_TOKEN \
 --build-arg s3_uri=YOUR_S3_URI .

从本地文件夹

要从本地文件夹加载 DAGs,请将 DAG 文件放置在主机上的 Docker 构建上下文中的一个文件夹中,并使用 host_dag_path 构建参数提供该文件夹的位置。默认情况下,DAGs 将被复制到 /opt/airflow/dags,但这可以通过在 Docker 构建过程中传递 container_dag_path 构建时参数来更改。

docker build -t my-airflow-image --build-arg host_dag_path=./dags_on_host --build-arg container_dag_path=/path/on/container .

如果选择将 DAGs 加载到 /opt/airflow/dags 以外的不同路径,则需要在 Airflow 配置中更新新路径。

安装 Python 依赖项

此 Dockerfile 支持通过 piprequirements.txt 文件安装 Python 依赖项。将 requirements.txt 文件放在与 Dockerfile 相同的目录中。如果它位于不同的位置,则可以使用 requirements_path 构建参数指定它。复制 requirements.txt 文件时,请记住 Docker 上下文。取消注释 Dockerfile 中将 requirements.txt 文件复制到容器并运行 pip install 以在容器上安装依赖项的两个适当行。

为 AWS Batch 执行器构建镜像

有关如何将通过此自述文件创建的 Docker 镜像与 Batch 执行器一起使用的详细说明,请参见此处

日志记录

通过此执行器执行的 Airflow 任务在配置的 VPC 中的容器内运行。这意味着 Airflow Webserver 无法直接访问日志,并且容器在任务完成后停止时,日志将永久丢失。

使用 Batch 执行器时,应采用远程日志记录来持久化您的 Airflow 任务日志,并使其可从 Airflow Webserver 查看。

配置远程日志记录

配置远程日志记录的方法有很多,并且支持多个目标。有关 Airflow 任务日志记录的概述,请参见此处。有关配置 S3 远程日志记录的说明,请参见此处,有关 Cloudwatch 远程日志记录的说明,请参见此处。以下是一些需要在 Batch 执行器上下文中指出的重要事项:

  • Airflow 远程日志记录的配置选项应在运行 Airflow 的所有主机和容器上配置。例如,Webserver 需要此配置才能从远程位置获取日志,Batch 执行器运行的容器需要此配置才能将日志上传到远程位置。请参见此处以阅读有关如何通过配置文件或环境变量导出设置 Airflow 配置的更多信息。

  • 将 Airflow 远程日志记录配置添加到容器的方法有很多。一些示例包括但不限于:

    • 直接在 Dockerfile 中导出为环境变量(请参见 Dockerfile 部分上方

    • 更新 airflow.cfg 文件或在 Dockerfile 中复制/挂载/下载自定义 airflow.cfg

    • 在作业定义中添加为环境变量

  • 您必须在容器内配置凭证才能与日志的远程服务(例如,S3、CloudWatch Logs 等)进行交互。这可以通过多种方式完成。一些示例包括但不限于:

    • 直接将凭证导出到 Dockerfile 中(请参见 Dockerfile 部分上方

    • 配置 Airflow 连接并将其作为 远程日志记录连接 ID 提供(通过上述任何方法或您首选的方法导出到容器中)。然后,Airflow 将专门使用这些凭证与您选择的远程日志记录目标进行交互。

注意

配置选项必须在运行 Airflow 组件的所有主机/环境(调度程序,Web服务器,执行器管理资源等)中保持一致。 有关设置配置的更多详细信息,请参见此处

设置 Apache Airflow 的 Batch 执行器

使 Batch 执行器在 Apache Airflow 中工作涉及 3 个步骤:

  1. 创建一个数据库,Airflow 和 Batch 执行的任务可以连接到该数据库。

  2. 创建和配置可以运行来自 Airflow 的任务的 Batch 资源。

  3. 配置 Airflow 以使用 Batch 执行器和数据库。

选择数据库后端有不同的选项。有关 Airflow 支持的不同选项的更多信息,请参见此处。以下指南将说明如何在 AWS 上设置 PostgreSQL RDS 实例。

为 AWS Batch 执行器设置 RDS DB 实例

创建 RDS DB 实例

  1. 登录到您的 AWS 管理控制台并导航到 RDS 服务。

  2. 单击“创建数据库”以开始创建新的 RDS 实例。

  3. 选择“标准创建”选项,然后选择 PostreSQL。

  4. 选择适当的模板、可用性和持久性。

    • 注意:在撰写本文时,“多可用区 DB 集群”选项不支持设置数据库名称,这是下面所需的步骤。

  5. 设置 DB 实例名称、用户名和密码。

  6. 选择实例配置和存储参数。

  7. 在“连接”部分中,选择“不连接到 EC2 计算资源”

  8. 选择或创建 VPC 和子网,并允许对 DB 的公共访问。选择或创建安全组,然后选择可用区。

  9. 打开“其他配置”选项卡,并将数据库名称设置为 airflow_db

  10. 根据需要选择其他设置,然后单击“创建数据库”来创建数据库。

测试连接性

为了能够连接到新的 RDS 实例,您需要允许来自您的 IP 地址的入站流量访问数据库。

  1. 在 RDS 实例的“连接和安全”选项卡的“安全”标题下,找到指向您的新 RDS DB 实例的 VPC 安全组的链接。

  2. 创建一个入站规则,该规则允许来自您的 IP 地址在 TCP 端口 5432 (PostgreSQL) 上的流量。

  3. 确认在修改安全组后您可以连接到 DB。这将需要安装 psql。有关安装 psql 的说明,请参见此处

注意:在测试连接性之前,请确保您的 DB 状态为“可用”

psql -h <endpoint> -p 5432 -U <username> <db_name>

端点可以在“连接和安全”选项卡上找到,用户名(和密码)是创建数据库时使用的凭据。

db_name 应该是 airflow_db(除非在创建数据库时使用了不同的名称。)

如果连接成功,系统将提示您输入密码。

设置 AWS Batch

AWS Batch 可以通过各种方式进行配置,具体取决于用例,其编排类型也不同。为简单起见,本指南将介绍如何使用 EC2 设置 Batch。

为了设置 AWS Batch 以使其与 Apache Airflow 协同工作,您需要一个正确配置的 Docker 镜像。有关如何操作的说明,请参阅Dockerfile部分。

构建镜像后,需要将其放入可以被容器拉取的存储库中。 有多种方法可以实现这一点。 本指南将介绍如何使用 Amazon Elastic Container Registry(ECR)完成此操作。

创建 ECR 存储库

  1. 登录到您的 AWS 管理控制台并导航到 ECR 服务。

  2. 点击“创建存储库”。

  3. 命名存储库并根据需要填写其他信息。

  4. 点击“创建存储库”。

  5. 创建存储库后,单击存储库。 单击右上角的“查看推送命令”按钮。

  6. 按照说明推送 Docker 镜像,并根据需要替换镜像名称。 确保在推送镜像后刷新页面以确认镜像已上传。

配置 AWS Batch

  1. 登录到您的 AWS 管理控制台并导航到 AWS Batch 登陆页面。

  2. 在左侧边栏中,点击“向导”。 该向导将引导您创建运行 Batch 作业所需的所有资源。

  3. 选择 “Amazon EC2” 作为编排方式。

  4. 点击“下一步”。

创建计算环境

  1. 为计算环境、标签和任何适当的实例配置选择一个名称。 在这里,您可以选择最小、最大和期望的 vCPU 数量,以及您想使用的 EC2 实例的类型。

  2. 对于“实例角色”,选择创建新的实例配置文件,或者使用附加了所需 IAM 权限的现有实例配置文件。 此实例配置文件允许为您的计算环境创建的 Amazon ECS 容器实例代表您调用所需的 AWS API 操作。

  3. 选择允许访问互联网的 VPC,以及具有必要权限的安全组。

  4. 点击“下一步”。

创建作业队列

  1. 为作业队列选择一个名称,以及优先级。 计算环境将设置为在上一步中创建的环境。

创建作业定义

  1. 为作业定义选择一个名称。

  2. 选择适当的平台配置。 确保启用 分配公共 IP

  3. 选择执行角色,并确保该角色具有完成其任务所需的权限。

  4. 输入在上一步中推送的镜像的镜像 URI。 确保正在使用的角色具有拉取镜像所需的权限。

  5. 选择适当的作业角色,同时考虑到正在运行的任务的要求。

  6. 根据需要配置环境。 您可以指定容器可用的 vCPU、内存或 GPU 的数量。 此外,将以下环境变量添加到容器中

  • AIRFLOW__DATABASE__SQL_ALCHEMY_CONN,其值为 PostgreSQL 连接字符串,格式如下,使用在数据库部分中设置的值

postgresql+psycopg2://<username>:<password>@<endpoint>/<database_name>
  1. 根据需要添加 Airflow 的其他配置(请参阅此处),Batch 执行器(请参阅此处)或远程日志记录(请参阅此处)。 请注意,应在整个 Airflow 环境中进行任何配置更改,以保持配置的一致性。

  2. 点击“下一步”。

  3. 在“审核和创建”页面中,审核所有选择,一旦一切正确,请单击“创建资源”。

允许容器访问 RDS 数据库

作为最后一步,必须为 Batch 管理的容器配置对数据库的访问。 可能有许多不同的网络配置,但一种可能的方法是

  1. 登录到您的 AWS 管理控制台并导航到 VPC 控制面板。

  2. 在左侧“安全”标题下,单击“安全组”。

  3. 选择与您的 RDS 实例关联的安全组,然后单击“编辑入站规则”。

  4. 添加一条新规则,允许 PostgreSQL 类型流量访问与 Batch 计算环境关联的子网的 CIDR。

配置 Airflow

要将 Airflow 配置为利用 Batch 执行器并利用我们设置的资源,请确保定义了以下环境变量

AIRFLOW__CORE__EXECUTOR='airflow.providers.amazon.aws.executors.batch.batch_executor.AwsBatchExecutor'

AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=<postgres-connection-string>

AIRFLOW__AWS_BATCH_EXECUTOR__REGION_NAME=<executor-region>

AIRFLOW__AWS_BATCH_EXECUTOR__JOB_QUEUE=<batch-job-queue>

AIRFLOW__AWS_BATCH_EXECUTOR__JOB_DEFINITION=<batch-job-definition>

AIRFLOW__AWS_BATCH_EXECUTOR__JOB_NAME=<batch-job-name>

该脚本应在运行 Airflow 调度器和 Web 服务器的主机上运行,然后在启动这些进程之前运行。

该脚本设置环境变量,将 Airflow 配置为使用 Batch 执行器并提供任务执行的必要信息。 对所做的任何其他配置更改(例如,用于远程日志记录)都应添加到此示例脚本中,以使整个 Airflow 环境中的配置保持一致。

初始化 Airflow DB

Airflow DB 需要在使用前进行初始化,并且需要添加一个用户供您登录。 以下命令添加一个管理员用户(如果尚未初始化 DB,该命令也会初始化 DB)

airflow users create --username admin --password admin --firstname <your first name> --lastname <your last name> --email <your email> --role Admin

此条目是否有帮助?