Apache Airflow 是一个以编程方式作者、调度和监控工作流的平台。工作流是处理一组数据的任务序列。你可以将工作流视为描述任务从未完成到完成的路径。另一方面,调度是计划、控制和优化何时执行特定任务的过程。
在 Apache Airflow 中编写工作流。
Airflow 使使用 Python 脚本轻松编写工作流。有向无环图(Directed Acyclic Graph,简称 DAG)在 Airflow 中表示工作流。它是一系列任务的集合,以一种方式展示了每个任务的关系和依赖。你可以拥有任意数量的 DAG,Airflow 将根据任务的关系和依赖来执行它们。如果任务 B 依赖于任务 A 的成功执行,这意味着 Airflow 将运行任务 A,并且只在任务 A 之后运行任务 B。这种依赖关系在 Airflow 中非常容易表达。例如,上述场景表达为
task_A >> task_B
也等同于
task_A.set_downstream(task_B)
这有助于 Airflow 知道它需要在任务 B 之前执行任务 A。任务之间的关系可以比上面表达的复杂得多,Airflow 会根据它们的关系和依赖来确定如何以及何时执行任务。
在我们讨论 Airflow 使工作流的调度、执行和监控变得容易的架构之前,让我们先讨论一下 Breeze 环境。
Breeze 环境
Breeze 环境是 Airflow 的开发环境,您可以在其中运行测试、构建镜像、构建文档以及许多其他事情。有关于 Breeze 环境的出色文档和视频。请查阅。您可以通过运行 ./breeze
脚本进入 Breeze 环境。您可以在 Breeze 环境中运行此处提到的所有命令。
调度器
调度器是监控 DAG 并触发那些依赖关系已满足的任务的组件。它监视 DAG 文件夹,检查每个 DAG 中的任务,并在它们准备就绪时触发它们。它通过生成一个定期运行(大约每分钟一次)的进程来实现这一点,该进程读取元数据数据库以检查每个任务的状态并决定需要做什么。元数据数据库记录了所有任务的状态。状态可以是运行中、成功、失败等。
当任务的依赖关系得到满足时,该任务就被认为是准备就绪了。依赖关系包括执行任务所需的所有数据。需要注意的是,调度器不会触发您的任务,直到它涵盖的周期结束。如果任务的 schedule_interval
是 @daily
,调度器会在一天结束时触发任务,而不是在开始时。这是为了确保任务所需的数据已经准备好。也可以在 UI 上手动触发任务。
在 Breeze 环境中,通过运行命令 airflow scheduler
启动调度器。它使用配置的生产环境。配置可以在 airflow.cfg
中指定。
执行器
执行器负责运行任务。它们与调度器协作,在任务排队时获取运行任务所需的资源信息。
默认情况下,Airflow 使用序列化执行器(SequentialExecutor)。然而,这个执行器功能有限,并且是唯一可以与 SQLite 一起使用的执行器。
还有许多其他执行器,区别在于它们拥有的资源以及如何选择使用这些资源。可用的执行器包括
- 序列化执行器(Sequential Executor)
- 调试执行器(Debug Executor)
- 本地执行器(Local Executor)
- Dask 执行器(Dask Executor)
- Celery 执行器(Celery Executor)
- Kubernetes 执行器(Kubernetes Executor)
- 使用 Mesos 进行扩展(社区贡献)
与 SequentialExecutor 相比,CeleryExecutor 是一个更好的执行器。CeleryExecutor 使用多个工作节点以分布式方式执行任务。如果一个工作节点宕机,CeleryExecutor 会将其任务分配给另一个工作节点。这确保了高可用性。
CeleryExecutor 与调度器密切协作,调度器将消息添加到队列中,然后由 Celery broker 将消息传递给 Celery worker 进行执行。您可以在文档中找到有关 CeleryExecutor 以及如何配置它的更多信息。
Web 服务器
Web 服务器是 Airflow 的 Web 界面(UI)。该 UI 功能丰富。它使监控和排除 DAG 和任务故障变得容易。
您可以在 UI 上执行许多操作。您可以触发任务、监控执行过程(包括任务的持续时间)。UI 可以树状视图和图状视图查看任务的依赖关系。您可以在 UI 中查看任务日志。
在 breeze 环境中,通过命令 airflow webserver
启动 Web UI。
后端
默认情况下,Airflow 使用 SQLite 后端存储配置信息、DAG 状态以及许多其他有用信息。这不应在生产环境中使用,因为 SQLite 可能导致数据丢失。
您可以使用 PostgreSQL 或 MySQL 作为 Airflow 的后端。切换到 PostgreSQL 或 MySQL 很简单。
在启动 breeze 环境时,命令 ./breeze --backend mysql
会选择 MySQL 作为后端。
Operator
Operator 决定任务做什么。Airflow 有许多内置的 Operator。每个 Operator 执行特定的任务。有执行 bash 命令的 BashOperator,调用 Python 函数的 PythonOperator,在 AWS Batch 上执行作业的 AwsBatchOperator,以及更多。
Sensor
Sensor 可以被描述为用于监控长时间运行任务的特殊 Operator。就像 Operator 一样,Airflow 中有许多预定义的 Sensor。这些包括
- AthenaSensor:询问查询的状态,直到其达到失败或成功状态。
- AzureCosmosDocumentSensor:检查 CosmosDB 中是否存在与给定查询匹配的文档。
- GoogleCloudStorageObjectSensor:检查 Google Cloud Storage 中是否存在文件。
大多数可用 Sensor 的列表可以在这个模块中找到。
贡献 Airflow
Airflow 是一个开源项目,欢迎所有人贡献。感谢出色的入门文档,入门非常容易。
我大约 12 周前通过 Outreachy 项目加入社区,并完成了大约 40 个拉取请求(PR)。
这是一段非凡的经历!感谢我的导师 Jarek 和 Kaxil,以及社区成员,特别是 Kamil 和 Tomek 的所有支持。我非常感激!
非常感谢 Leah E. Cole 精彩的评论。
分享