Kubernetes 执行器¶
注意
从 Airflow 2.7.0 开始,您需要安装 cncf.kubernetes
提供程序包才能使用此执行器。这可以通过安装 apache-airflow-providers-cncf-kubernetes>=7.4.0
或使用 cncf.kubernetes
扩展安装 Airflow 来完成:pip install 'apache-airflow[cncf.kubernetes]'
。
Kubernetes 执行器在 Kubernetes 集群上的独立 Pod 中运行每个任务实例。
KubernetesExecutor 在 Airflow 调度器中作为进程运行。调度器本身不一定需要在 Kubernetes 上运行,但需要访问 Kubernetes 集群。
KubernetesExecutor 需要后端使用非 sqlite 数据库。
当 DAG 提交任务时,KubernetesExecutor 会从 Kubernetes API 请求一个工作器 Pod。然后,工作器 Pod 运行任务,报告结果并终止。
下面显示了一个在 Kubernetes 集群的五个节点的分布式集上运行的 Airflow 部署示例。
与常规 Airflow 架构一致,工作器需要访问 DAG 文件以执行这些 DAG 中的任务并与元数据存储库交互。此外,还需要在 Airflow 配置文件中指定特定于 Kubernetes 执行器的配置信息,例如工作器命名空间和镜像信息。
此外,Kubernetes 执行器可以使用执行器配置在每个任务的基础上指定其他功能。
配置¶
pod_template_file¶
要自定义用于 k8s 执行器工作进程的 Pod,您可以创建一个 Pod 模板文件。您必须在 airflow.cfg
的 kubernetes_executor
部分的 pod_template_file
选项中提供模板文件的路径。
Airflow 对 Pod 模板文件有两个严格的要求:基础镜像和 Pod 名称。
基础镜像¶
pod_template_file
必须在 spec.containers[0]
位置有一个名为 base
的容器,并且必须指定其 image
。
您可以在此必需容器之后自由创建边车容器,但 Airflow 假设 airflow 工作器容器存在于容器数组的开头,并假设该容器名为 base
。
注意
Airflow 可能会覆盖基础容器 image
,例如通过 pod_override 配置;但它必须存在于模板文件中,并且不能为空。
Pod 名称¶
必须在模板文件中设置 Pod 的 metadata.name
。此字段将在 Pod 启动时动态设置,以保证所有 Pod 的唯一性。但同样,它必须包含在模板中,并且不能为空。
示例 Pod 模板¶
考虑到这些要求,以下是一些基本 pod_template_file
YAML 文件的示例。
注意
当使用默认的 Airflow 配置值时,以下示例应该可以正常工作。但是,许多自定义配置值也需要通过此模板显式传递给 Pod。这包括但不限于 sql 配置、所需的 Airflow 连接、DAGs 文件夹路径和日志记录设置。有关详细信息,请参阅 配置参考。
在镜像中存储 DAG
---
apiVersion: v1
kind: Pod
metadata:
name: placeholder-name
spec:
containers:
- env:
- name: AIRFLOW__CORE__EXECUTOR
value: LocalExecutor
# Hard Coded Airflow Envs
- name: AIRFLOW__CORE__FERNET_KEY
valueFrom:
secretKeyRef:
name: RELEASE-NAME-fernet-key
key: fernet-key
- name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
valueFrom:
secretKeyRef:
name: RELEASE-NAME-airflow-metadata
key: connection
- name: AIRFLOW_CONN_AIRFLOW_DB
valueFrom:
secretKeyRef:
name: RELEASE-NAME-airflow-metadata
key: connection
image: dummy_image
imagePullPolicy: IfNotPresent
name: base
volumeMounts:
- mountPath: "/opt/airflow/logs"
name: airflow-logs
- mountPath: /opt/airflow/airflow.cfg
name: airflow-config
readOnly: true
subPath: airflow.cfg
restartPolicy: Never
securityContext:
runAsUser: 50000
fsGroup: 50000
serviceAccountName: "RELEASE-NAME-worker-serviceaccount"
volumes:
- emptyDir: {}
name: airflow-logs
- configMap:
name: RELEASE-NAME-airflow-config
name: airflow-config
在 persistentVolume
中存储 DAG
---
apiVersion: v1
kind: Pod
metadata:
name: placeholder-name
spec:
containers:
- env:
- name: AIRFLOW__CORE__EXECUTOR
value: LocalExecutor
# Hard Coded Airflow Envs
- name: AIRFLOW__CORE__FERNET_KEY
valueFrom:
secretKeyRef:
name: RELEASE-NAME-fernet-key
key: fernet-key
- name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
valueFrom:
secretKeyRef:
name: RELEASE-NAME-airflow-metadata
key: connection
- name: AIRFLOW_CONN_AIRFLOW_DB
valueFrom:
secretKeyRef:
name: RELEASE-NAME-airflow-metadata
key: connection
image: dummy_image
imagePullPolicy: IfNotPresent
name: base
volumeMounts:
- mountPath: "/opt/airflow/logs"
name: airflow-logs
- mountPath: /opt/airflow/dags
name: airflow-dags
readOnly: true
- mountPath: /opt/airflow/airflow.cfg
name: airflow-config
readOnly: true
subPath: airflow.cfg
restartPolicy: Never
securityContext:
runAsUser: 50000
fsGroup: 50000
serviceAccountName: "RELEASE-NAME-worker-serviceaccount"
volumes:
- name: airflow-dags
persistentVolumeClaim:
claimName: RELEASE-NAME-dags
- emptyDir: {}
name: airflow-logs
- configMap:
name: RELEASE-NAME-airflow-config
name: airflow-config
从 git
拉取 DAG
---
apiVersion: v1
kind: Pod
metadata:
name: dummy-name
spec:
initContainers:
- name: git-sync
image: "registry.k8s.io/git-sync/git-sync:v3.6.3"
env:
- name: GIT_SYNC_BRANCH
value: "v2-2-stable"
- name: GIT_SYNC_REPO
value: "https://github.com/apache/airflow.git"
- name: GIT_SYNC_DEPTH
value: "1"
- name: GIT_SYNC_ROOT
value: "/git"
- name: GIT_SYNC_DEST
value: "repo"
- name: GIT_SYNC_ADD_USER
value: "true"
- name: GIT_SYNC_ONE_TIME
value: "true"
volumeMounts:
- name: airflow-dags
mountPath: /git
containers:
- env:
- name: AIRFLOW__CORE__EXECUTOR
value: LocalExecutor
# Hard Coded Airflow Envs
- name: AIRFLOW__CORE__FERNET_KEY
valueFrom:
secretKeyRef:
name: RELEASE-NAME-fernet-key
key: fernet-key
- name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
valueFrom:
secretKeyRef:
name: RELEASE-NAME-airflow-metadata
key: connection
- name: AIRFLOW_CONN_AIRFLOW_DB
valueFrom:
secretKeyRef:
name: RELEASE-NAME-airflow-metadata
key: connection
image: dummy_image
imagePullPolicy: IfNotPresent
name: base
volumeMounts:
- mountPath: "/opt/airflow/logs"
name: airflow-logs
- mountPath: /opt/airflow/dags
name: airflow-dags
subPath: repo/airflow/example_dags
readOnly: false
- mountPath: /opt/airflow/airflow.cfg
name: airflow-config
readOnly: true
subPath: airflow.cfg
restartPolicy: Never
securityContext:
runAsUser: 50000
fsGroup: 50000
serviceAccountName: "RELEASE-NAME-worker-serviceaccount"
volumes:
- name: airflow-dags
emptyDir: {}
- name: airflow-logs
emptyDir: {}
- configMap:
name: RELEASE-NAME-airflow-config
name: airflow-config
pod_override¶
使用 KubernetesExecutor 时,Airflow 提供了在每个任务的基础上覆盖系统默认值的功能。要利用此功能,请创建一个 Kubernetes V1pod 对象并填写您想要的覆盖。
要覆盖 KubernetesExecutor 启动的 Pod 的基础容器,请创建一个只有一个容器的 V1pod,并覆盖以下字段
executor_config_volume_mount = {
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
volume_mounts=[
k8s.V1VolumeMount(mount_path="/foo/", name="example-kubernetes-test-volume")
],
)
],
volumes=[
k8s.V1Volume(
name="example-kubernetes-test-volume",
host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
)
],
)
),
}
@task(executor_config=executor_config_volume_mount)
def test_volume_mount():
"""
Tests whether the volume has been mounted.
"""
with open("/foo/volume_mount_test.txt", "w") as foo:
foo.write("Hello")
return_code = os.system("cat /foo/volume_mount_test.txt")
if return_code != 0:
raise ValueError(f"Error when checking volume mount. Return code {return_code}")
volume_task = test_volume_mount()
请注意,以下字段将被**扩展**而不是覆盖。来自 *spec*:volumes 和 init_containers。来自 *container*:卷挂载、环境变量、端口和设备。
要将边车容器添加到启动的 Pod 中,请创建一个 V1pod,其第一个容器为空,名为 base
,第二个容器包含您想要的边车。
executor_config_sidecar = {
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
),
k8s.V1Container(
name="sidecar",
image="ubuntu",
args=['echo "retrieved from mount" > /shared/test.txt'],
command=["bash", "-cx"],
volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
),
],
volumes=[
k8s.V1Volume(name="shared-empty-dir", empty_dir=k8s.V1EmptyDirVolumeSource()),
],
)
),
}
@task(executor_config=executor_config_sidecar)
def test_sharedvolume_mount():
"""
Tests whether the volume has been mounted.
"""
for i in range(5):
try:
return_code = os.system("cat /shared/test.txt")
if return_code != 0:
raise ValueError(f"Error when checking volume mount. Return code {return_code}")
except ValueError as e:
if i > 4:
raise e
sidecar_task = test_sharedvolume_mount()
您还可以在每个任务的基础上创建自定义 pod_template_file
,以便您可以在多个任务之间重复使用相同的基本值。这将替换 airflow.cfg 中命名的默认 pod_template_file
,然后使用 pod_override
覆盖该模板。
以下是一个同时具有这两个功能的任务示例
import os
import pendulum
from airflow import DAG
from airflow.decorators import task
from airflow.example_dags.libs.helper import print_stuff
from airflow.settings import AIRFLOW_HOME
from kubernetes.client import models as k8s
with DAG(
dag_id="example_pod_template_file",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example3"],
) as dag:
executor_config_template = {
"pod_template_file": os.path.join(AIRFLOW_HOME, "pod_templates/basic_template.yaml"),
"pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"})),
}
@task(executor_config=executor_config_template)
def task_with_template():
print_stuff()
管理 DAG 和日志¶
是否使用持久卷是可选的,具体取决于您的配置。
DAG:
要将 DAG 放入工作器中,您可以
在镜像中包含 DAG。
使用
git-sync
,它将在启动工作器容器之前运行 DAG 存储库的git pull
。将 DAG 存储在持久卷上,该卷可以挂载在所有工作器上。
日志:
要从工作器中获取任务日志,您可以
使用同时挂载在 Web 服务器和工作器上的持久卷。
启用远程日志记录。
注意
如果您没有启用日志持久化,并且您没有启用远程日志记录,则在工作器 Pod 关闭后,日志将丢失。
与 CeleryExecutor 的比较¶
与 CeleryExecutor 相比,KubernetesExecutor 不需要 Redis 等额外组件,但需要访问 Kubernetes 集群。
还可以使用内置的 Kubernetes 监控来监控 Pod。
使用 KubernetesExecutor,每个任务都在自己的 Pod 中运行。Pod 在任务排队时创建,并在任务完成时终止。过去,在诸如突发性工作负载之类的场景中,这比 CeleryExecutor 具有资源利用率优势,在 CeleryExecutor 中,您需要固定数量的长时间运行的 Celery 工作器 Pod,而不管是否有要运行的任务。
但是,官方 Apache Airflow Helm 图表 可以根据队列中的任务数量自动将 Celery 工作器缩减为零,因此在使用官方图表时,这不再是一个优势。
使用 Celery 工作器,您往往会减少任务延迟,因为在任务排队时,工作器 Pod 已经启动并运行。另一方面,由于多个任务在同一个 Pod 中运行,因此使用 Celery 时,您可能需要更加注意任务设计中的资源利用率,尤其是内存消耗。
KubernetesExecutor 有用的一种情况是,如果您有长时间运行的任务,因为如果您在任务运行时进行部署,则该任务将继续运行,直到完成(或超时等)。但是,对于 CeleryExecutor,如果您设置了宽限期,则该任务将仅继续运行,直到宽限期结束,届时该任务将终止。KubernetesExecutor 可以很好地工作的另一种情况是,当您的任务在资源需求或镜像方面不是很统一时。
最后,请注意,它不必是二选一;使用 CeleryKubernetesExecutor,可以在同一个集群上同时使用 CeleryExecutor 和 KubernetesExecutor。CeleryKubernetesExecutor 将查看任务的 queue
以确定是在 Celery 还是 Kubernetes 上运行。默认情况下,任务被发送到 Celery 工作器,但如果您希望使用 KubernetesExecutor 运行任务,则将其发送到 kubernetes
队列,它将在自己的 Pod 中运行。KubernetesPodOperator 可以产生类似的效果,无论您使用什么执行器。
容错¶
提示
要解决 KubernetesExecutor 的问题,您可以使用 airflow kubernetes generate-dag-yaml
命令。此命令会在 Kubernetes 中启动 Pod 时生成 Pod,并将它们转储到 yaml 文件中供您检查。
处理工作器 Pod 崩溃¶
在处理分布式系统时,我们需要一个系统,该系统假定任何组件都可能随时崩溃,原因从 OOM 错误到节点升级不等。
如果工作器在能够向后端数据库报告其状态之前死亡,则执行器可以使用 Kubernetes 观察器线程来发现故障 Pod。
Kubernetes 观察器是一个线程,可以订阅 Kubernetes 数据库中发生的每个更改。当 Pod 启动、运行、结束和失败时,它会收到警报。通过监控此流,KubernetesExecutor 可以发现工作器崩溃并正确地将任务报告为失败。
但是,如果调度器 Pod 崩溃怎么办?¶
如果调度器崩溃,调度器将使用观察器的 resourceVersion
恢复其状态。
监控 Kubernetes 集群的观察器线程时,每个事件都有一个单调递增的数字,称为 resourceVersion
。每次执行器读取 resourceVersion
时,执行器都会将最新值存储在后端数据库中。由于存储了 resourceVersion,因此调度器可以重新启动并从其停止的位置继续读取观察器流。由于任务独立于执行器运行并将结果直接报告给数据库,因此调度器故障不会导致任务失败或重新运行。