从 1.10 升级到 2

Apache Airflow 2 是一个主要版本,本文档的目的是帮助用户从 Airflow 1.10.x 迁移到 Airflow 2

步骤 1:切换到 Python 3

Airflow 1.10 是最后一个支持 Python 2 的版本系列。Airflow 2.0.0 需要 Python 3.6+,并且已通过 Python 版本 3.6、3.7 和 3.8 进行了测试。从 Airflow 2.1.2 开始添加了对 Python 3.9 的支持。

Airflow 2.3.0 不再支持 Python 3.6。它已通过 Python 3.7、3.8、3.9、3.10 进行了测试。

如果您有仍然需要 Python 2 的特定任务,那么您可以为此使用 @task.virtualenv@task.docker@task.kubernetes 装饰器。

有关 Python 2 和 Python 3 之间破坏性更改的列表,请参阅 CouchBaseDB 团队的这篇 实用博客

步骤 2:升级到 1.10.15

为了最大程度地减少从 Airflow 1.10 升级到 Airflow 2.0 及更高版本的用户的摩擦,已创建了 Airflow 1.10.15,又称“桥接版本”。这是最后一个 1.10 功能版本。Airflow 1.10.15 包括对从 Airflow 2.0 反向移植的各种功能的支持,以便用户在升级到 Airflow 2.0 之前轻松测试其 Airflow 环境。

我们强烈建议所有升级到 Airflow 2.0 的用户,首先升级到 Airflow 1.10.15 并测试其 Airflow 部署,然后才能升级到 Airflow 2.0。Airflow 1.10.x 已于 2021 年 6 月 17 日达到使用寿命终点。不会发布任何新的 Airflow 1.x 版本。

1.10.15 中的功能包括

1. Airflow 2.0 的大多数破坏性 DAG 和架构更改已反向移植到 Airflow 1.10.15。这种向后兼容性并不意味着 1.10.15 将以与 Airflow 2.0 相同的方式处理这些 DAG。相反,这意味着大多数与 Airflow 2.0 兼容的 DAG 将在 Airflow 1.10.15 中运行。此反向移植将为用户提供时间来随着时间的推移修改其 DAG,而不会造成任何服务中断。

2. 我们还将更新的 Airflow 2.0 CLI 命令反向移植到 Airflow 1.10.15,以便用户可以在升级之前修改其脚本以与 Airflow 2.0 兼容。

3. 对于 KubernetesExecutor 的用户,我们已经反向移植了 KubernetesExecutor 的 pod_template_file 功能,以及一个基于 airflow.cfg 设置生成 pod_template_file 的脚本。要生成此文件,只需运行以下命令

airflow generate_pod_template -o <output file path>

执行此步骤后,只需在 airflow.cfgkubernetes_executor 部分的 pod_template_file 配置中写出此文件的路径即可

注意

在 Airflow 2.4.2 版本之前,kubernetes_executor 部分称为 kubernetes

步骤 3:运行升级检查脚本

升级到 Airflow 1.10.15 后,我们建议您安装“升级检查”脚本。这些脚本将通读您的 airflow.cfg 和所有 DAG,并详细报告升级前所需的所有更改。我们正在勤奋地测试此脚本,我们的目标是任何能通过这些测试的 Airflow 设置都能够毫无问题地升级到 2.0。

pip install apache-airflow-upgrade-check

安装完成后,请运行升级检查脚本。

airflow upgrade_check

有关此过程的更多详细信息,请参阅此处 升级检查脚本

步骤 4:切换到反向移植提供程序

现在您已在 Python 3.6+ 环境中设置了 Airflow 1.10.15,您已准备好开始将您的 DAG 移植到 Airflow 2.0 合规性!

此转换中最重要的步骤也是最容易分步执行的步骤。所有 Airflow 2.0 运算符都通过反向移植提供程序包向后兼容 Airflow 1.10。在您自己的时间内,您可以通过 PyPI 安装提供程序并更改导入路径来过渡到使用这些反向移植提供程序。

例如:虽然过去您可能以这种方式导入 DockerOperator

from airflow.operators.docker_operator import DockerOperator

您现在将运行此命令来安装提供程序

pip install apache-airflow-backport-providers-docker

然后使用此路径导入运算符

from airflow.providers.docker.operators.docker import DockerOperator

请注意,反向移植提供程序包只是与 Airflow 2.0 兼容的提供程序包的反向移植。例如

pip install 'apache-airflow[docker]'

自动安装 apache-airflow-providers-docker 包。但是,你可以单独管理/升级/删除提供程序包,而无需涉及 Airflow 核心。

升级到 Apache Airflow 2.0 后,当你使用附加组件安装 Airflow 时,这些提供程序包会自动安装。即使不使用附加组件,几个提供程序(http、ftp、sqlite、imap)也会在安装 Airflow 时自动安装。你可以在 提供程序包 中阅读有关提供程序的更多信息。

步骤 5:升级 Airflow DAG

更改模板中未定义变量的处理方式

在 Airflow 2.0 之前,Jinja 模板允许使用未定义变量。它们将呈现为空字符串,不会向用户指示使用了未定义变量。在此版本中,任何涉及未定义变量的模板呈现都将导致任务失败,并在呈现时在 UI 中显示错误。

实例化 DAG 时可以恢复行为。

import jinja2

dag = DAG("simple_dag", template_undefined=jinja2.Undefined)

或者,还可以使用 | default Jinja 过滤器逐个覆盖每个 Jinja 模板变量,如下所示。

{{a | default(1)}}

KubernetesPodOperator 的更改

KubernetesExecutor 非常相似,KubernetesPodOperator 将不再采用 Airflow 自定义类,而是需要 pod_template yaml 文件或 kubernetes.client.models 对象。

一个值得注意的例外是,我们将继续支持 airflow.providers.cncf.kubernetes.secret.Secret 类。

虽然以前用户会导入每个单独的类来构建 pod,如下所示

from airflow.kubernetes.pod import Port
from airflow.kubernetes.volume import Volume
from airflow.kubernetes.secret import Secret
from airflow.kubernetes.volume_mount import VolumeMount


volume_config = {"persistentVolumeClaim": {"claimName": "test-volume"}}
volume = Volume(name="test-volume", configs=volume_config)
volume_mount = VolumeMount("test-volume", mount_path="/root/mount_file", sub_path=None, read_only=True)

port = Port("http", 80)
secret_file = Secret("volume", "/etc/sql_conn", "airflow-secrets", "sql_alchemy_conn")
secret_env = Secret("env", "SQL_CONN", "airflow-secrets", "sql_alchemy_conn")

k = KubernetesPodOperator(
    namespace="default",
    image="ubuntu:16.04",
    cmds=["bash", "-cx"],
    arguments=["echo", "10"],
    labels={"foo": "bar"},
    secrets=[secret_file, secret_env],
    ports=[port],
    volumes=[volume],
    volume_mounts=[volume_mount],
    name="airflow-test-pod",
    task_id="task",
    affinity=affinity,
    is_delete_operator_pod=True,
    hostnetwork=False,
    tolerations=tolerations,
    configmaps=configmaps,
    init_containers=[init_container],
    priority_class_name="medium",
)

现在,用户可以使用 kubernetes.client.models 类作为创建所有 k8s 对象的单一入口点。

from kubernetes.client import models as k8s
from airflow.providers.cncf.kubernetes.secret import Secret


configmaps = ["test-configmap-1", "test-configmap-2"]

volume = k8s.V1Volume(
    name="test-volume",
    persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name="test-volume"),
)

port = k8s.V1ContainerPort(name="http", container_port=80)
secret_file = Secret("volume", "/etc/sql_conn", "airflow-secrets", "sql_alchemy_conn")
secret_env = Secret("env", "SQL_CONN", "airflow-secrets", "sql_alchemy_conn")
secret_all_keys = Secret("env", None, "airflow-secrets-2")
volume_mount = k8s.V1VolumeMount(
    name="test-volume", mount_path="/root/mount_file", sub_path=None, read_only=True
)

k = KubernetesPodOperator(
    namespace="default",
    image="ubuntu:16.04",
    cmds=["bash", "-cx"],
    arguments=["echo", "10"],
    labels={"foo": "bar"},
    secrets=[secret_file, secret_env],
    ports=[port],
    volumes=[volume],
    volume_mounts=[volume_mount],
    name="airflow-test-pod",
    task_id="task",
    is_delete_operator_pod=True,
    hostnetwork=False,
)

我们决定保留 Secret 类,因为用户似乎非常喜欢它简化了将 Kubernetes 密钥挂载到工作进程的复杂性。

有关 KubernetesPodOperator API 更改的更详细列表,请阅读附录中标题为“KubernetesPodOperator 的已更改参数”的部分

更改 dag_run_conf_overrides_params 的默认值

DagRun 配置字典现在将默认覆盖 params 字典。如果您通过 airflow dags backfill -cairflow dags trigger -c 传递一些键值对,则这些键值对将覆盖 params 中的现有键值对。您可以在 airflow.cfg 中将 dag_run_conf_overrides_params 设置为 False 来恢复此行为。

DAG 发现安全模式现在不区分大小写

DAG_DISCOVERY_SAFE_MODE 处于活动状态时,Airflow 现在将以不区分大小写的方式过滤所有包含字符串 airflowdag 的文件。此项更改是为了更好地支持新的 @dag 装饰器。

更改权限

DAG 级别的权限操作 can_dag_readcan_dag_edit 已作为 Airflow 2.0 的一部分弃用。它们将被 can_readcan_edit 替换。当角色获得 DAG 级别访问权限时,资源名称(或在 Flask App-Builder 术语中称为“视图菜单”)现在将以 DAG: 为前缀。因此,对 example_dag_id 执行操作 can_dag_read,现在表示为对 DAG:example_dag_id 执行操作 can_read。有一个名为 DAGs 的特殊视图(在 1.10.x 版本中称为 all_dags),它允许角色访问所有 DAG。默认的 AdminViewerUserOp 角色都可以访问 DAGs 视图。

作为运行 ``airflow db migrate`` 的一部分,现有权限将为您迁移。

当使用 access_control 变量初始化 DAG 时,旧权限名称的任何用法都将自动在数据库中更新,因此这不会造成重大更改。将引发弃用警告。

放弃旧版 UI,转而使用 FAB RBAC UI

警告

重大更改

以前我们使用两个版本的 UI
  • 非 RBAC UI

  • Flask App Builder RBAC UI

这很难维护,因为这意味着我们必须在两个地方实现/更新功能。通过此版本,我们已放弃旧版 UI,转而使用 Flask App Builder RBAC UI,从而减少了巨大的维护负担。不再需要在配置中明确设置 RBAC UI,因为它是唯一的默认 UI。

如果您以前使用非 RBAC UI,则必须切换到新的 RBAC UI 并创建用户才能访问 Airflow 的 Web 服务器。有关创建用户的 CLI 的更多详细信息,请参阅 命令行界面和环境变量参考

请注意,自定义认证后端需要重新编写以针对基于 FAB 的新 UI。

作为此更改的一部分,[webserver] 部分中的一些配置项已被移除,不再适用,包括 authenticatefilter_by_ownerowner_moderbac

在升级到此版本之前,我们建议激活新的 FAB RBAC UI。为此,你应将 airflow.cfg 文件中 [webserver] 中的 rbac 选项设置为 True

[webserver]
rbac = True

为了登录界面,你需要创建一个管理员帐户。

假设你已安装 Airflow 1.10.15,则可以使用 Airflow 2.0 CLI 命令语法 airflow users create 创建用户。由于 FAB RBAC UI 是唯一受支持的 UI,因此无需更改配置文件。

airflow users create \
    --role Admin \
    --username admin \
    --firstname FIRST_NAME \
    --lastname LAST_NAME \
    --email [email protected]

OAuth 中的重大变更

注意

当多个 airflow webserver 副本正在运行时,它们需要共享相同的 secret_key 才能访问相同的用户会话。通过任何配置机制注入此信息。1.10.15 桥接版本修改此功能以使用随机生成的密钥,而不是不安全的默认值,并且可能会破坏依赖于默认值的现有部署。当检索日志时,webserver 密钥也用于授权对 Celery 工作进程的请求。但是,使用密钥生成的令牌具有较短的到期时间 - 确保运行 airflow 组件的所有计算机上的时间都已同步(例如使用 ntpd),否则在访问日志时可能会出现“禁止”错误。

flask-oauthlib 已被 authlib 替换,因为 flask-oauthlib 已弃用,转而支持 authlib。已更改的旧和新提供程序配置密钥如下

旧密钥

新密钥

consumer_key

client_id

consumer_secret

client_secret

base_url

api_base_url

request_token_params

client_kwargs

有关更多信息,请访问 https://flask-appbuilder.readthedocs.io/en/latest/security.html#authentication-oauth

Pendulum 支持中的重大变更

Airflow 已从 Pendulum 1.x 升级到 Pendulum 2.x。这带来了一些重大变更,因为 Pendulum 2.x 中的某些方法及其定义已更改或已被移除。

例如,以下代码段现在将引发错误

execution_date.format("YYYY-MM-DD HH:mm:ss", formatter="alternative")

由于 Pendulum 2.x 不支持 formatter 选项,因此默认使用 alternative

有关详细信息,请访问 https://pendulum.eustace.io/blog/pendulum-2.0.0-is-out.html

步骤 6:升级配置设置

Airflow 2.0 对配置数据期望更加严格,并且在更多情况下需要明确指定配置值,而不是采用通用值。

其中一些内容在升级检查指南中进行了详细说明,但一个重要的更改领域与 Kubernetes Executor 有关。对于 Kubernetes Executor 的用户,下面会对此进行说明。

升级 KubernetesExecutor 设置

KubernetesExecutor 将不再从 airflow.cfg 中读取基本 Pod 配置。

在 Airflow 2.0 中,KubernetesExecutor 需要一个用 yaml 编写的基本 pod 模板。此文件可以在主机上的任何位置,并且将使用 airflow.cfg 文件中的 pod_template_file 配置进行链接。您可以通过运行以下命令来创建 pod_template_fileairflow generate_pod_template

对于 worker_container_repositoryworker_container_tag 和默认命名空间,airflow.cfg 仍将接受值。

以下 airflow.cfg 值将被弃用

worker_container_image_pull_policy
airflow_configmap
airflow_local_settings_configmap
dags_in_image
dags_volume_subpath
dags_volume_mount_point
dags_volume_claim
logs_volume_subpath
logs_volume_claim
dags_volume_host
logs_volume_host
env_from_configmap_ref
env_from_secret_ref
git_repo
git_branch
git_sync_depth
git_subpath
git_sync_rev
git_user
git_password
git_sync_root
git_sync_dest
git_dags_folder_mount_point
git_ssh_key_secret_name
git_ssh_known_hosts_configmap_name
git_sync_credentials_secret
git_sync_container_repository
git_sync_container_tag
git_sync_init_container_name
git_sync_run_as_user
worker_service_account_name
image_pull_secrets
gcp_service_account_keys
affinity
tolerations
run_as_user
fs_group
[kubernetes_node_selectors]
[kubernetes_annotations]
[kubernetes_environment_variables]
[kubernetes_secrets]
[kubernetes_labels]

``executor_config`` 现在在启动任务时将期待一个 ``kubernetes.client.models.V1Pod`` 类

在 Airflow 1.10.x 中,用户可以通过将字典传递给 executor_config 变量来在运行时修改任务 pod。现在,用户可以通过 kubernetes.client.models.V1Pod 完全访问 Kubernetes API。

虽然在弃用版本中,用户会使用以下字典挂载卷

second_task = PythonOperator(
    task_id="four_task",
    python_callable=test_volume_mount,
    executor_config={
        "KubernetesExecutor": {
            "volumes": [
                {
                    "name": "example-kubernetes-test-volume",
                    "hostPath": {"path": "/tmp/"},
                },
            ],
            "volume_mounts": [
                {
                    "mountPath": "/foo/",
                    "name": "example-kubernetes-test-volume",
                },
            ],
        }
    },
)

在新模型中,用户可以使用 pod_override 键下的以下代码完成相同操作

from kubernetes.client import models as k8s


@task(
    task_id="four_task",
    executor_config={
        "pod_override": k8s.V1Pod(
            spec=k8s.V1PodSpec(
                containers=[
                    k8s.V1Container(
                        name="base",
                        volume_mounts=[
                            k8s.V1VolumeMount(
                                mount_path="/foo/",
                                name="example-kubernetes-test-volume",
                            )
                        ],
                    )
                ],
                volumes=[
                    k8s.V1Volume(
                        name="example-kubernetes-test-volume",
                        host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
                    )
                ],
            )
        )
    },
)
def test_volume_mount():
    pass


second_task = test_volume_mount()

对于 Airflow 2.0,传统的 executor_config 将继续操作,并带有弃用警告,但将在未来版本中删除。

步骤 7:升级到 Airflow 2

在按照上述说明运行升级检查、安装后向移植的提供程序、修改 DAG 以使其兼容并更新配置设置之后,您应该准备好升级到 Airflow 2.0。

始终建议最后运行一次升级检查,以确保您没有遗漏任何内容。在此阶段,检测到的问题应为零或最少,您计划在升级 Airflow 版本后修复这些问题。

此时,只需按照标准的 Airflow 版本升级流程进行操作

  • 确保备份您的 Airflow 元数据库

  • 暂停所有 DAG,并确保没有内容正在主动运行

    • 暂停 DAG 的原因是确保在稍后的步骤中进行数据库升级时,没有内容正在主动写入数据库。

    • 为了更加小心,最好在暂停 DAG 后备份数据库。

  • 将 Airflow 版本安装/升级到您选择的 2.0 版本

  • 确保安装正确的提供程序

    • 这可以通过将“extras”选项用作 Airflow 安装的一部分,或通过单独安装提供程序来完成。

    • 请注意,如果您使用 pip 安装,则可能必须在安装新提供程序之前卸载后向移植提供程序。如果您使用具有指定要求集的 Airflow Docker 映像进行安装,则不适用此情况,因为更改会自动获取一组新的模块。

    • 您可以在 提供程序包 中阅读有关提供程序的更多信息。

  • 使用 airflow db migrate 迁移 Airflow 元数据库。

    • 上述命令可能不熟悉,因为它使用 Airflow 2.0 CLI 语法显示。

    • 数据库升级可能会根据需要修改数据库架构,还可能会映射现有数据以使其符合更新的数据库架构。

    注意

    数据库升级可能需要一段时间,具体取决于数据库中的 DAG 数量以及数据库中存储的任务历史记录、xcom 变量等历史记录的容量。在我们的测试中,我们发现将 Airflow 数据库从 Airflow 1.10.15 升级到 Airflow 2.0 在 PostgreSQL 上的 Airflow 数据库上大约花费了两到三分钟,该数据库大约有 35,000 个任务实例和 500 个 DAG。为了更快的数据库升级和更好的整体性能,建议定期存档不再有价值的旧历史元素。

  • 重启 Airflow 调度程序、Web 服务器和工作程序

附录

KubernetesPodOperator 的更改参数

端口已从 list[Port] 迁移到 list[V1ContainerPort]

之前

from airflow.kubernetes.pod import Port

port = Port("http", 80)
k = KubernetesPodOperator(
    namespace="default",
    image="ubuntu:16.04",
    cmds=["bash", "-cx"],
    arguments=["echo 10"],
    ports=[port],
    task_id="task",
)

之后

from kubernetes.client import models as k8s

port = k8s.V1ContainerPort(name="http", container_port=80)
k = KubernetesPodOperator(
    namespace="default",
    image="ubuntu:16.04",
    cmds=["bash", "-cx"],
    arguments=["echo 10"],
    ports=[port],
    task_id="task",
)

卷装载已从 list[VolumeMount] 迁移到 list[V1VolumeMount]

之前

from airflow.kubernetes.volume_mount import VolumeMount

volume_mount = VolumeMount("test-volume", mount_path="/root/mount_file", sub_path=None, read_only=True)
k = KubernetesPodOperator(
    namespace="default",
    image="ubuntu:16.04",
    cmds=["bash", "-cx"],
    arguments=["echo 10"],
    volume_mounts=[volume_mount],
    task_id="task",
)

之后

from kubernetes.client import models as k8s

volume_mount = k8s.V1VolumeMount(
    name="test-volume", mount_path="/root/mount_file", sub_path=None, read_only=True
)
k = KubernetesPodOperator(
    namespace="default",
    image="ubuntu:16.04",
    cmds=["bash", "-cx"],
    arguments=["echo 10"],
    volume_mounts=[volume_mount],
    task_id="task",
)

卷已从 list[Volume] 迁移到 list[V1Volume]

之前

from airflow.kubernetes.volume import Volume

volume_config = {"persistentVolumeClaim": {"claimName": "test-volume"}}
volume = Volume(name="test-volume", configs=volume_config)
k = KubernetesPodOperator(
    namespace="default",
    image="ubuntu:16.04",
    cmds=["bash", "-cx"],
    arguments=["echo 10"],
    volumes=[volume],
    task_id="task",
)

之后

from kubernetes.client import models as k8s

volume = k8s.V1Volume(
    name="test-volume",
    persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name="test-volume"),
)
k = KubernetesPodOperator(
    namespace="default",
    image="ubuntu:16.04",
    cmds=["bash", "-cx"],
    arguments=["echo 10"],
    volumes=[volume],
    task_id="task",
)

env_vars 已从 dict 迁移到 list[V1EnvVar]

之前

k = KubernetesPodOperator(
    namespace="default",
    image="ubuntu:16.04",
    cmds=["bash", "-cx"],
    arguments=["echo 10"],
    env_vars={"ENV1": "val1", "ENV2": "val2"},
    task_id="task",
)

之后

from kubernetes.client import models as k8s

env_vars = [
    k8s.V1EnvVar(name="ENV1", value="val1"),
    k8s.V1EnvVar(name="ENV2", value="val2"),
]

k = KubernetesPodOperator(
    namespace="default",
    image="ubuntu:16.04",
    cmds=["bash", "-cx"],
    arguments=["echo 10"],
    env_vars=env_vars,
    task_id="task",
)

PodRuntimeInfoEnv 已被移除

PodRuntimeInfoEnv 现在可以作为 V1EnvVarSource 添加到 env_vars 变量中

之前

from airflow.kubernetes.pod_runtime_info_env import PodRuntimeInfoEnv

k = KubernetesPodOperator(
    namespace="default",
    image="ubuntu:16.04",
    cmds=["bash", "-cx"],
    arguments=["echo 10"],
    pod_runtime_info_envs=[PodRuntimeInfoEnv("ENV3", "status.podIP")],
    task_id="task",
)

之后

from kubernetes.client import models as k8s

env_vars = [
    k8s.V1EnvVar(
        name="ENV3",
        value_from=k8s.V1EnvVarSource(field_ref=k8s.V1ObjectFieldSelector(field_path="status.podIP")),
    )
]

k = KubernetesPodOperator(
    namespace="default",
    image="ubuntu:16.04",
    cmds=["bash", "-cx"],
    arguments=["echo 10"],
    env_vars=env_vars,
    task_id="task",
)

configmaps 已被移除

Configmaps 现在可以作为 V1EnvVarSource 添加到 env_from 变量中

之前

k = KubernetesPodOperator(
    namespace="default",
    image="ubuntu:16.04",
    cmds=["bash", "-cx"],
    arguments=["echo 10"],
    configmaps=["test-configmap"],
    task_id="task",
)

之后

from kubernetes.client import models as k8s

configmap = "test-configmap"
env_from = [k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name=configmap))]

k = KubernetesPodOperator(
    namespace="default",
    image="ubuntu:16.04",
    cmds=["bash", "-cx"],
    arguments=["echo 10"],
    env_from=env_from,
    task_id="task",
)

资源已从 Dict 迁移到 V1ResourceRequirements

之前

resources = {
    "limit_cpu": 0.25,
    "limit_memory": "64Mi",
    "limit_ephemeral_storage": "2Gi",
    "request_cpu": "250m",
    "request_memory": "64Mi",
    "request_ephemeral_storage": "1Gi",
}
k = KubernetesPodOperator(
    namespace="default",
    image="ubuntu:16.04",
    cmds=["bash", "-cx"],
    arguments=["echo 10"],
    labels={"foo": "bar"},
    name="test",
    task_id="task" + self.get_current_task_name(),
    in_cluster=False,
    do_xcom_push=False,
    resources=resources,
)

之后

from kubernetes.client import models as k8s

resources = k8s.V1ResourceRequirements(
    requests={"memory": "64Mi", "cpu": "250m", "ephemeral-storage": "1Gi"},
    limits={
        "memory": "64Mi",
        "cpu": 0.25,
        "nvidia.com/gpu": None,
        "ephemeral-storage": "2Gi",
    },
)
k = KubernetesPodOperator(
    namespace="default",
    image="ubuntu:16.04",
    cmds=["bash", "-cx"],
    arguments=["echo 10"],
    labels={"foo": "bar"},
    name="test-" + str(random.randint(0, 1000000)),
    task_id="task" + self.get_current_task_name(),
    in_cluster=False,
    do_xcom_push=False,
    resources=resources,
)

image_pull_secrets 已从 String 迁移到 list[k8s.V1LocalObjectReference]

之前

k = KubernetesPodOperator(
    namespace="default",
    image="ubuntu:16.04",
    cmds=["bash", "-cx"],
    arguments=["echo 10"],
    name="test",
    task_id="task",
    image_pull_secrets="fake-secret",
    cluster_context="default",
)

之后

quay_k8s = KubernetesPodOperator(
    namespace="default",
    image="quay.io/apache/bash",
    image_pull_secrets=[k8s.V1LocalObjectReference("testquay")],
    cmds=["bash", "-cx"],
    name="airflow-private-image-pod",
    task_id="task-two",
)

从实验性 API 到稳定 API v1 的迁移指南

在 Airflow 2.0 中,我们添加了新的 REST API。实验性 API 仍然有效,但将来可能会取消支持。

但是,实验性 API 不需要身份验证,因此默认情况下处于禁用状态。如果您想使用实验性 API,则需要显式启用它。如果您的应用程序仍在使用实验性 API,您应该认真考虑迁移到稳定 API。

稳定 API 公开了许多可通过 Web 服务器访问的端点。以下是这两个端点之间的差异,将帮助您从实验性 REST API 迁移到稳定 REST API。

基本端点

稳定 API v1 的基本端点是 /api/v1/。您必须将实验性基本端点从 /api/experimental/ 更改为 /api/v1/。下表显示了差异

目的

实验性 REST API 端点

稳定的 REST API 端点

创建 DAGRuns(POST)

/api/experimental/dags/<DAG_ID>/dag_runs

/api/v1/dags/{dag_id}/dagRuns

列出 DAGRuns(GET)

/api/experimental/dags/<DAG_ID>/dag_runs

/api/v1/dags/{dag_id}/dagRuns

检查运行状况(GET)

/api/experimental/test

/api/v1/health

任务信息(GET)

/api/experimental/dags/<DAG_ID>/tasks/<TASK_ID>

/api/v1//dags/{dag_id}/tasks/{task_id}

TaskInstance 公共变量(GET)

/api/experimental/dags/<DAG_ID>/dag_runs/<string:execution_date>/tasks/<TASK_ID>

/api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}

暂停 DAG(PATCH)

/api/experimental/dags/<DAG_ID>/paused/<string:paused>

/api/v1/dags/{dag_id}

暂停的 DAG 信息(GET)

/api/experimental/dags/<DAG_ID>/paused

/api/v1/dags/{dag_id}

最新的 DAG 运行(GET)

/api/experimental/latest_runs

/api/v1/dags/{dag_id}/dagRuns

获取所有池(GET)

/api/experimental/pools

/api/v1/pools

创建池(POST)

/api/experimental/pools

/api/v1/pools

删除池(DELETE)

/api/experimental/pools/<string:name>

/api/v1/pools/{pool_name}

DAG 血缘(GET)

/api/experimental/lineage/<DAG_ID>/<string:execution_date>/

/api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries

此端点 /api/v1/dags/{dag_id}/dagRuns 还允许你使用查询字符串中的参数(如 start_dateend_dateexecution_date 等)来过滤 dag_runs。因此,此端点以前执行的操作

/api/experimental/dags/<string:dag_id>/dag_runs/<string:execution_date>

现在可以使用查询字符串中的过滤器参数来处理。有关最新运行的信息可以通过此端点(/api/v1/dags/{dag_id}/dagRuns)的查询字符串中的过滤器来获取。有关更多信息,请查看稳定的 API 参考文档

从 DAG 回调中进行异常处理的更改

来自 DAG 回调的异常会使 Airflow 调度程序崩溃。作为我们使调度程序更强大、更可靠的努力的一部分,我们已将此行为更改为记录异常。最重要的是,已添加了一个新的 dag.callback_exceptions 计数器指标,以帮助更好地监控回调异常。

迁移到 TaskFlow API

Airflow 2.0 引入了 TaskFlow API,以简化 Python 可调用任务的声明。建议用户用 TaskFlow 装饰器替代经典运算符。有关详细信息,请参阅 使用 TaskFlow

经典运算符

TaskFlow 装饰器

PythonOperator

@task@task.python 的缩写)

PythonVirtualenvOperator

@task.virtualenv

BranchPythonOperator

@task.branch

DockerOperator

@task.docker

KubernetesPodOperator

@task.kubernetes

2.0 中的 Airflow CLI 更改

Airflow CLI 已被整理,以便将相关命令分组为子命令,这意味着如果您在脚本中使用这些命令,则必须对它们进行更改。

本节介绍已进行的更改,以及您需要采取哪些措施来更新脚本。从命令行操作用户的能力已更改。 airflow create_userairflow delete_userairflow list_users 已分组为带有可选标志 createlistdelete 的单个命令 airflow usersairflow list_dags 命令现在是 airflow dags listairflow pauseairflow dags pause,依此类推。

在 Airflow 1.10 和 2.0 中,有一个 airflow config 命令,但行为不同。在 Airflow 1.10 中,它打印所有配置选项,而在 Airflow 2.0 中,它是一个命令组。 airflow config 现在是 airflow config list。您可以通过运行命令 airflow config --help 来查看其他选项

有关更新的 CLI 命令的完整列表,请参阅 https://airflow.apache.org/cli.html

你可以通过运行 airflow --help 来了解这些命令。例如,要获取有关 celery 组命令的帮助,你必须运行帮助命令:airflow celery --help

旧命令

新命令

airflow worker

airflow celery worker

celery

airflow flower

airflow celery flower

celery

airflow trigger_dag

airflow dags trigger

dags

airflow delete_dag

airflow dags delete

dags

airflow show_dag

airflow dags show

dags

airflow list_dags

airflow dags list

dags

airflow dag_status

airflow dags status

dags

airflow backfill

airflow dags backfill

dags

airflow list_dag_runs

airflow dags list-runs

dags

airflow pause

airflow dags pause

dags

airflow unpause

airflow dags unpause

dags

airflow next_execution

airflow dags next-execution

dags

airflow test

airflow tasks test

tasks

airflow clear

airflow tasks clear

tasks

airflow list_tasks

airflow tasks list

tasks

airflow task_failed_deps

airflow tasks failed-deps

tasks

airflow task_state

airflow tasks state

tasks

airflow run

airflow tasks run

tasks

airflow render

airflow tasks render

tasks

airflow initdb

airflow db init

数据库

airflow resetdb

airflow db reset

数据库

airflow upgradedb

airflow db upgrade

数据库

airflow checkdb

airflow db check

数据库

airflow shell

airflow db shell

数据库

airflow pool

airflow pools

airflow create_user

airflow users create

用户

airflow delete_user

airflow users delete

用户

airflow list_users

airflow users list

用户

airflow rotate_fernet_key

airflow rotate-fernet-key

airflow sync_perm

airflow sync-perm

``users`` 组的示例用法

创建新用户

airflow users create --username jondoe --lastname doe --firstname jon --email [email protected] --role Viewer --password test

列出用户

airflow users list

删除用户

airflow users delete --username jondoe

向角色中添加用户

airflow users add-role --username jondoe --role Public

从角色中删除用户

airflow users remove-role --username jondoe --role Public

在 CLI 中使用单个字符来更改短选项样式

对于 Airflow 短选项,请使用一个单个字符。根据下表,可以使用新命令

旧命令

新命令

airflow (dags\|tasks\|scheduler) [-sd, --subdir]

airflow (dags\|tasks\|scheduler) [-S, --subdir]

airflow test [-dr, --dry_run]

airflow tasks test [-n, --dry-run]

airflow test [-tp, --task_params]

airflow tasks test [-t, --task-params]

airflow test [-pm, --post_mortem]

airflow tasks test [-m, --post-mortem]

airflow run [-int, --interactive]

airflow 任务 运行 [-N, --交互]

airflow 回填 [-dr, --空运行]

airflow dags 回填 [-n, --空运行]

airflow 清除 [-dx, --dag_regex]

airflow 任务 清除 [-R, --dag-regex]

airflow kerberos [-kt, --密钥表]

airflow kerberos [-k, --密钥表]

airflow webserver [-hn, --主机名]

airflow webserver [-H, --主机名]

airflow worker [-cn, --celery_hostname]

airflow celery worker [-H, --celery-hostname]

airflow flower [-hn, --主机名]

airflow celery flower [-H, --主机名]

airflow flower [-fc, --flower_conf]

airflow celery flower [-c, --flower-conf]

airflow flower [-ba, --basic_auth]

airflow celery flower [-A, --basic-auth]

对于 Airflow 长选项,请使用 小写连字符形式,而不是 小写蛇形形式

旧选项

新选项

--task_regex

--task-regex

--start_date

--start-date

--end_date

--end-date

--dry_run

--dry-run

--no_backfill

--no-backfill

--mark_success

--mark-success

--donot_pickle

--donot-pickle

--ignore_dependencies

--ignore-dependencies

--ignore_first_depends_on_past

--ignore-first-depends-on-past

--delay_on_limit

--delay-on-limit

--reset_dagruns

--reset-dagruns

--rerun_failed_tasks

--rerun-failed-tasks

--run_backwards

--run-backwards

--only_failed

--only-failed

--only_running

--only-running

--exclude_subdags

--exclude-subdags

--exclude_parentdag

--exclude-parentdag

--dag_regex

--dag-regex

--run_id

--run-id

--exec_date

--exec-date

--ignore_all_dependencies

--ignore-all-dependencies

--ignore_depends_on_past

--ignore-depends-on-past

--ship_dag

--ship-dag

--job_id

--job-id

--cfg_path

--cfg-path

--ssl_cert

--ssl-cert

--ssl_key

--ssl-key

--worker_timeout

--worker-timeout

--access_logfile

--access-logfile

--error_logfile

--error-logfile

--dag_id

--dag-id

--num_runs

--num-runs

--do_pickle

--do-pickle

--celery_hostname

--celery-hostname

--broker_api

--broker-api

--flower_conf

--flower-conf

--url_prefix

--url-prefix

--basic_auth

--basic-auth

--task_params

--task-params

--post_mortem

--post-mortem

--conn_uri

--conn-uri

--conn_type

--conn-type

--conn_host

--conn-host

--conn_login

--conn-login

--conn_password

--conn-password

--conn_schema

--conn-schema

--conn_port

--conn-port

--conn_extra

--conn-extra

--use_random_password

--use-random-password

--skip_serve_logs

--skip-serve-logs

从 CLI 中移除 serve_logs 命令

serve_logs 命令已删除。此命令应仅由内部应用程序机制运行,并且无需从 CLI 界面访问它。

dag_state CLI 命令

如果 DAGRun 是使用传入的 conf 键/值触发的,则它们还将打印在 dag_state CLI 响应中,即正在运行,{“name”: “bob”},而在之前的版本中它只打印状态:即正在运行

在 backfill 命令中弃用 ignore_first_depends_on_past 并将其默认设置为 True

使用 depends_on_past dags 进行 backfill 时,用户需要传递 --ignore-first-depends-on-past。我们应该将其默认设置为 true 以避免混淆

对 Airflow 插件的更改

如果您正在使用 Airflow 插件并传递 admin_viewsmenu_links,它们用于非 RBAC UI(基于 flask-admin 的 UI),请更新它以使用 flask_appbuilder_viewsflask_appbuilder_menu_links

:

from airflow.plugins_manager import AirflowPlugin

from flask_admin import BaseView, expose
from flask_admin.base import MenuLink


class TestView(BaseView):
    @expose("/")
    def test(self):
        # in this example, put your test_plugin/test.html template at airflow/plugins/templates/test_plugin/test.html
        return self.render("test_plugin/test.html", content="Hello galaxy!")


v = TestView(category="Test Plugin", name="Test View")

ml = MenuLink(category="Test Plugin", name="Test Menu Link", url="https://airflow.apache.org/")


class AirflowTestPlugin(AirflowPlugin):
    admin_views = [v]
    menu_links = [ml]

更改为:

from airflow.plugins_manager import AirflowPlugin
from flask_appbuilder import expose, BaseView as AppBuilderBaseView


class TestAppBuilderBaseView(AppBuilderBaseView):
    default_view = "test"

    @expose("/")
    def test(self):
        return self.render_template("test_plugin/test.html", content="Hello galaxy!")


v_appbuilder_view = TestAppBuilderBaseView()
v_appbuilder_package = {
    "name": "Test View",
    "category": "Test Plugin",
    "view": v_appbuilder_view,
}

# Creating a flask appbuilder Menu Item
appbuilder_mitem = {
    "name": "Google",
    "category": "Search",
    "category_icon": "fa-th",
    "href": "https://www.google.com",
}


# Defining the plugin class
class AirflowTestPlugin(AirflowPlugin):
    name = "test_plugin"
    appbuilder_views = [v_appbuilder_package]
    appbuilder_menu_items = [appbuilder_mitem]

附加名称的更改

all 附加缩减为仅包含面向用户的依赖项。这意味着此附加不包含开发依赖项。如果您正在使用它并依赖开发包,则应使用 devel_all

对 Airflow 1.10.x 版本的支持

Airflow 1.10.x 于 2021 年 6 月 17 日达到生命周期终点。不会再发布任何新的 Airflow 1.x 版本。

对反向移植提供程序的支持于 2021 年 3 月 17 日结束。不会再发布任何新版本的反向移植提供程序。

我们计划对我们的版本控制和发布过程采用严格的语义版本控制方法。这意味着我们不计划在 2.* 版本中进行任何向后不兼容的更改。任何重大更改,包括删除 Airflow 2.0 中弃用的功能,都将作为 Airflow 3.0 版本的一部分进行。

此条目是否有用?