从 1.10 升级到 2¶
Apache Airflow 2 是一个主要版本,本文档的目的是帮助用户从 Airflow 1.10.x 迁移到 Airflow 2
步骤 1:切换到 Python 3¶
Airflow 1.10 是最后一个支持 Python 2 的版本系列。Airflow 2.0.0 需要 Python 3.6+,并且已通过 Python 版本 3.6、3.7 和 3.8 进行了测试。从 Airflow 2.1.2 开始添加了对 Python 3.9 的支持。
Airflow 2.3.0 不再支持 Python 3.6。它已通过 Python 3.7、3.8、3.9、3.10 进行了测试。
如果您有仍然需要 Python 2 的特定任务,那么您可以为此使用 @task.virtualenv
、@task.docker
或 @task.kubernetes
装饰器。
有关 Python 2 和 Python 3 之间破坏性更改的列表,请参阅 CouchBaseDB 团队的这篇 实用博客。
步骤 2:升级到 1.10.15¶
为了最大程度地减少从 Airflow 1.10 升级到 Airflow 2.0 及更高版本的用户的摩擦,已创建了 Airflow 1.10.15,又称“桥接版本”。这是最后一个 1.10 功能版本。Airflow 1.10.15 包括对从 Airflow 2.0 反向移植的各种功能的支持,以便用户在升级到 Airflow 2.0 之前轻松测试其 Airflow 环境。
我们强烈建议所有升级到 Airflow 2.0 的用户,首先升级到 Airflow 1.10.15 并测试其 Airflow 部署,然后才能升级到 Airflow 2.0。Airflow 1.10.x 已于 2021 年 6 月 17 日达到使用寿命终点。不会发布任何新的 Airflow 1.x 版本。
1.10.15 中的功能包括
1. Airflow 2.0 的大多数破坏性 DAG 和架构更改已反向移植到 Airflow 1.10.15。这种向后兼容性并不意味着 1.10.15 将以与 Airflow 2.0 相同的方式处理这些 DAG。相反,这意味着大多数与 Airflow 2.0 兼容的 DAG 将在 Airflow 1.10.15 中运行。此反向移植将为用户提供时间来随着时间的推移修改其 DAG,而不会造成任何服务中断。
2. 我们还将更新的 Airflow 2.0 CLI 命令反向移植到 Airflow 1.10.15,以便用户可以在升级之前修改其脚本以与 Airflow 2.0 兼容。
3. 对于 KubernetesExecutor 的用户,我们已经反向移植了 KubernetesExecutor 的 pod_template_file
功能,以及一个基于 airflow.cfg
设置生成 pod_template_file
的脚本。要生成此文件,只需运行以下命令
airflow generate_pod_template -o <output file path>
执行此步骤后,只需在 airflow.cfg
的 kubernetes_executor
部分的 pod_template_file
配置中写出此文件的路径即可
注意
在 Airflow 2.4.2 版本之前,kubernetes_executor
部分称为 kubernetes
。
步骤 3:运行升级检查脚本¶
升级到 Airflow 1.10.15 后,我们建议您安装“升级检查”脚本。这些脚本将通读您的 airflow.cfg
和所有 DAG,并详细报告升级前所需的所有更改。我们正在勤奋地测试此脚本,我们的目标是任何能通过这些测试的 Airflow 设置都能够毫无问题地升级到 2.0。
pip install apache-airflow-upgrade-check
安装完成后,请运行升级检查脚本。
airflow upgrade_check
有关此过程的更多详细信息,请参阅此处 升级检查脚本。
步骤 4:切换到反向移植提供程序¶
现在您已在 Python 3.6+ 环境中设置了 Airflow 1.10.15,您已准备好开始将您的 DAG 移植到 Airflow 2.0 合规性!
此转换中最重要的步骤也是最容易分步执行的步骤。所有 Airflow 2.0 运算符都通过反向移植提供程序包向后兼容 Airflow 1.10。在您自己的时间内,您可以通过 PyPI 安装提供程序并更改导入路径来过渡到使用这些反向移植提供程序。
例如:虽然过去您可能以这种方式导入 DockerOperator
from airflow.operators.docker_operator import DockerOperator
您现在将运行此命令来安装提供程序
pip install apache-airflow-backport-providers-docker
然后使用此路径导入运算符
from airflow.providers.docker.operators.docker import DockerOperator
请注意,反向移植提供程序包只是与 Airflow 2.0 兼容的提供程序包的反向移植。例如
pip install 'apache-airflow[docker]'
自动安装 apache-airflow-providers-docker
包。但是,你可以单独管理/升级/删除提供程序包,而无需涉及 Airflow 核心。
升级到 Apache Airflow 2.0 后,当你使用附加组件安装 Airflow 时,这些提供程序包会自动安装。即使不使用附加组件,几个提供程序(http、ftp、sqlite、imap)也会在安装 Airflow 时自动安装。你可以在 提供程序包 中阅读有关提供程序的更多信息。
步骤 5:升级 Airflow DAG¶
更改模板中未定义变量的处理方式
在 Airflow 2.0 之前,Jinja 模板允许使用未定义变量。它们将呈现为空字符串,不会向用户指示使用了未定义变量。在此版本中,任何涉及未定义变量的模板呈现都将导致任务失败,并在呈现时在 UI 中显示错误。
实例化 DAG 时可以恢复行为。
import jinja2
dag = DAG("simple_dag", template_undefined=jinja2.Undefined)
或者,还可以使用 | default
Jinja 过滤器逐个覆盖每个 Jinja 模板变量,如下所示。
{{a | default(1)}}
KubernetesPodOperator 的更改
与 KubernetesExecutor
非常相似,KubernetesPodOperator
将不再采用 Airflow 自定义类,而是需要 pod_template yaml 文件或 kubernetes.client.models
对象。
一个值得注意的例外是,我们将继续支持 airflow.providers.cncf.kubernetes.secret.Secret
类。
虽然以前用户会导入每个单独的类来构建 pod,如下所示
from airflow.kubernetes.pod import Port
from airflow.kubernetes.volume import Volume
from airflow.kubernetes.secret import Secret
from airflow.kubernetes.volume_mount import VolumeMount
volume_config = {"persistentVolumeClaim": {"claimName": "test-volume"}}
volume = Volume(name="test-volume", configs=volume_config)
volume_mount = VolumeMount("test-volume", mount_path="/root/mount_file", sub_path=None, read_only=True)
port = Port("http", 80)
secret_file = Secret("volume", "/etc/sql_conn", "airflow-secrets", "sql_alchemy_conn")
secret_env = Secret("env", "SQL_CONN", "airflow-secrets", "sql_alchemy_conn")
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo", "10"],
labels={"foo": "bar"},
secrets=[secret_file, secret_env],
ports=[port],
volumes=[volume],
volume_mounts=[volume_mount],
name="airflow-test-pod",
task_id="task",
affinity=affinity,
is_delete_operator_pod=True,
hostnetwork=False,
tolerations=tolerations,
configmaps=configmaps,
init_containers=[init_container],
priority_class_name="medium",
)
现在,用户可以使用 kubernetes.client.models
类作为创建所有 k8s 对象的单一入口点。
from kubernetes.client import models as k8s
from airflow.providers.cncf.kubernetes.secret import Secret
configmaps = ["test-configmap-1", "test-configmap-2"]
volume = k8s.V1Volume(
name="test-volume",
persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name="test-volume"),
)
port = k8s.V1ContainerPort(name="http", container_port=80)
secret_file = Secret("volume", "/etc/sql_conn", "airflow-secrets", "sql_alchemy_conn")
secret_env = Secret("env", "SQL_CONN", "airflow-secrets", "sql_alchemy_conn")
secret_all_keys = Secret("env", None, "airflow-secrets-2")
volume_mount = k8s.V1VolumeMount(
name="test-volume", mount_path="/root/mount_file", sub_path=None, read_only=True
)
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo", "10"],
labels={"foo": "bar"},
secrets=[secret_file, secret_env],
ports=[port],
volumes=[volume],
volume_mounts=[volume_mount],
name="airflow-test-pod",
task_id="task",
is_delete_operator_pod=True,
hostnetwork=False,
)
我们决定保留 Secret 类,因为用户似乎非常喜欢它简化了将 Kubernetes 密钥挂载到工作进程的复杂性。
有关 KubernetesPodOperator API 更改的更详细列表,请阅读附录中标题为“KubernetesPodOperator 的已更改参数”的部分
更改 dag_run_conf_overrides_params 的默认值
DagRun 配置字典现在将默认覆盖 params 字典。如果您通过 airflow dags backfill -c
或 airflow dags trigger -c
传递一些键值对,则这些键值对将覆盖 params 中的现有键值对。您可以在 airflow.cfg
中将 dag_run_conf_overrides_params
设置为 False
来恢复此行为。
DAG 发现安全模式现在不区分大小写
当 DAG_DISCOVERY_SAFE_MODE
处于活动状态时,Airflow 现在将以不区分大小写的方式过滤所有包含字符串 airflow
和 dag
的文件。此项更改是为了更好地支持新的 @dag
装饰器。
更改权限
DAG 级别的权限操作 can_dag_read
和 can_dag_edit
已作为 Airflow 2.0 的一部分弃用。它们将被 can_read
和 can_edit
替换。当角色获得 DAG 级别访问权限时,资源名称(或在 Flask App-Builder 术语中称为“视图菜单”)现在将以 DAG:
为前缀。因此,对 example_dag_id
执行操作 can_dag_read
,现在表示为对 DAG:example_dag_id
执行操作 can_read
。有一个名为 DAGs
的特殊视图(在 1.10.x 版本中称为 all_dags
),它允许角色访问所有 DAG。默认的 Admin
、Viewer
、User
、Op
角色都可以访问 DAGs
视图。
作为运行 ``airflow db migrate`` 的一部分,现有权限将为您迁移。
当使用 access_control
变量初始化 DAG 时,旧权限名称的任何用法都将自动在数据库中更新,因此这不会造成重大更改。将引发弃用警告。
放弃旧版 UI,转而使用 FAB RBAC UI
警告
重大更改
- 以前我们使用两个版本的 UI
非 RBAC UI
Flask App Builder RBAC UI
这很难维护,因为这意味着我们必须在两个地方实现/更新功能。通过此版本,我们已放弃旧版 UI,转而使用 Flask App Builder RBAC UI,从而减少了巨大的维护负担。不再需要在配置中明确设置 RBAC UI,因为它是唯一的默认 UI。
如果您以前使用非 RBAC UI,则必须切换到新的 RBAC UI 并创建用户才能访问 Airflow 的 Web 服务器。有关创建用户的 CLI 的更多详细信息,请参阅 命令行界面和环境变量参考
请注意,自定义认证后端需要重新编写以针对基于 FAB 的新 UI。
作为此更改的一部分,[webserver]
部分中的一些配置项已被移除,不再适用,包括 authenticate
、filter_by_owner
、owner_mode
和 rbac
。
在升级到此版本之前,我们建议激活新的 FAB RBAC UI。为此,你应将 airflow.cfg
文件中 [webserver]
中的 rbac
选项设置为 True
[webserver]
rbac = True
为了登录界面,你需要创建一个管理员帐户。
假设你已安装 Airflow 1.10.15,则可以使用 Airflow 2.0 CLI 命令语法 airflow users create
创建用户。由于 FAB RBAC UI 是唯一受支持的 UI,因此无需更改配置文件。
airflow users create \
--role Admin \
--username admin \
--firstname FIRST_NAME \
--lastname LAST_NAME \
--email [email protected]
OAuth 中的重大变更
注意
当多个 airflow webserver 副本正在运行时,它们需要共享相同的 secret_key 才能访问相同的用户会话。通过任何配置机制注入此信息。1.10.15 桥接版本修改此功能以使用随机生成的密钥,而不是不安全的默认值,并且可能会破坏依赖于默认值的现有部署。当检索日志时,webserver 密钥也用于授权对 Celery 工作进程的请求。但是,使用密钥生成的令牌具有较短的到期时间 - 确保运行 airflow 组件的所有计算机上的时间都已同步(例如使用 ntpd),否则在访问日志时可能会出现“禁止”错误。
flask-oauthlib
已被 authlib
替换,因为 flask-oauthlib
已弃用,转而支持 authlib
。已更改的旧和新提供程序配置密钥如下
旧密钥 |
新密钥 |
---|---|
|
|
|
|
|
|
|
|
有关更多信息,请访问 https://flask-appbuilder.readthedocs.io/en/latest/security.html#authentication-oauth
Pendulum 支持中的重大变更
Airflow 已从 Pendulum 1.x 升级到 Pendulum 2.x。这带来了一些重大变更,因为 Pendulum 2.x 中的某些方法及其定义已更改或已被移除。
例如,以下代码段现在将引发错误
execution_date.format("YYYY-MM-DD HH:mm:ss", formatter="alternative")
由于 Pendulum 2.x 不支持 formatter
选项,因此默认使用 alternative
。
有关详细信息,请访问 https://pendulum.eustace.io/blog/pendulum-2.0.0-is-out.html
步骤 6:升级配置设置¶
Airflow 2.0 对配置数据期望更加严格,并且在更多情况下需要明确指定配置值,而不是采用通用值。
其中一些内容在升级检查指南中进行了详细说明,但一个重要的更改领域与 Kubernetes Executor 有关。对于 Kubernetes Executor 的用户,下面会对此进行说明。
升级 KubernetesExecutor 设置
KubernetesExecutor 将不再从 airflow.cfg 中读取基本 Pod 配置。
在 Airflow 2.0 中,KubernetesExecutor 需要一个用 yaml 编写的基本 pod 模板。此文件可以在主机上的任何位置,并且将使用 airflow.cfg
文件中的 pod_template_file
配置进行链接。您可以通过运行以下命令来创建 pod_template_file
:airflow generate_pod_template
对于 worker_container_repository
、worker_container_tag
和默认命名空间,airflow.cfg
仍将接受值。
以下 airflow.cfg
值将被弃用
worker_container_image_pull_policy
airflow_configmap
airflow_local_settings_configmap
dags_in_image
dags_volume_subpath
dags_volume_mount_point
dags_volume_claim
logs_volume_subpath
logs_volume_claim
dags_volume_host
logs_volume_host
env_from_configmap_ref
env_from_secret_ref
git_repo
git_branch
git_sync_depth
git_subpath
git_sync_rev
git_user
git_password
git_sync_root
git_sync_dest
git_dags_folder_mount_point
git_ssh_key_secret_name
git_ssh_known_hosts_configmap_name
git_sync_credentials_secret
git_sync_container_repository
git_sync_container_tag
git_sync_init_container_name
git_sync_run_as_user
worker_service_account_name
image_pull_secrets
gcp_service_account_keys
affinity
tolerations
run_as_user
fs_group
[kubernetes_node_selectors]
[kubernetes_annotations]
[kubernetes_environment_variables]
[kubernetes_secrets]
[kubernetes_labels]
``executor_config`` 现在在启动任务时将期待一个 ``kubernetes.client.models.V1Pod`` 类
在 Airflow 1.10.x 中,用户可以通过将字典传递给 executor_config
变量来在运行时修改任务 pod。现在,用户可以通过 kubernetes.client.models.V1Pod
完全访问 Kubernetes API。
虽然在弃用版本中,用户会使用以下字典挂载卷
second_task = PythonOperator(
task_id="four_task",
python_callable=test_volume_mount,
executor_config={
"KubernetesExecutor": {
"volumes": [
{
"name": "example-kubernetes-test-volume",
"hostPath": {"path": "/tmp/"},
},
],
"volume_mounts": [
{
"mountPath": "/foo/",
"name": "example-kubernetes-test-volume",
},
],
}
},
)
在新模型中,用户可以使用 pod_override
键下的以下代码完成相同操作
from kubernetes.client import models as k8s
@task(
task_id="four_task",
executor_config={
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
volume_mounts=[
k8s.V1VolumeMount(
mount_path="/foo/",
name="example-kubernetes-test-volume",
)
],
)
],
volumes=[
k8s.V1Volume(
name="example-kubernetes-test-volume",
host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
)
],
)
)
},
)
def test_volume_mount():
pass
second_task = test_volume_mount()
对于 Airflow 2.0,传统的 executor_config
将继续操作,并带有弃用警告,但将在未来版本中删除。
步骤 7:升级到 Airflow 2¶
在按照上述说明运行升级检查、安装后向移植的提供程序、修改 DAG 以使其兼容并更新配置设置之后,您应该准备好升级到 Airflow 2.0。
始终建议最后运行一次升级检查,以确保您没有遗漏任何内容。在此阶段,检测到的问题应为零或最少,您计划在升级 Airflow 版本后修复这些问题。
此时,只需按照标准的 Airflow 版本升级流程进行操作
确保备份您的 Airflow 元数据库
暂停所有 DAG,并确保没有内容正在主动运行
暂停 DAG 的原因是确保在稍后的步骤中进行数据库升级时,没有内容正在主动写入数据库。
为了更加小心,最好在暂停 DAG 后备份数据库。
将 Airflow 版本安装/升级到您选择的 2.0 版本
确保安装正确的提供程序
这可以通过将“extras”选项用作 Airflow 安装的一部分,或通过单独安装提供程序来完成。
请注意,如果您使用 pip 安装,则可能必须在安装新提供程序之前卸载后向移植提供程序。如果您使用具有指定要求集的 Airflow Docker 映像进行安装,则不适用此情况,因为更改会自动获取一组新的模块。
您可以在 提供程序包 中阅读有关提供程序的更多信息。
使用
airflow db migrate
迁移 Airflow 元数据库。上述命令可能不熟悉,因为它使用 Airflow 2.0 CLI 语法显示。
数据库升级可能会根据需要修改数据库架构,还可能会映射现有数据以使其符合更新的数据库架构。
注意
数据库升级可能需要一段时间,具体取决于数据库中的 DAG 数量以及数据库中存储的任务历史记录、xcom 变量等历史记录的容量。在我们的测试中,我们发现将 Airflow 数据库从 Airflow 1.10.15 升级到 Airflow 2.0 在 PostgreSQL 上的 Airflow 数据库上大约花费了两到三分钟,该数据库大约有 35,000 个任务实例和 500 个 DAG。为了更快的数据库升级和更好的整体性能,建议定期存档不再有价值的旧历史元素。
重启 Airflow 调度程序、Web 服务器和工作程序
附录¶
KubernetesPodOperator 的更改参数¶
端口已从 list[Port] 迁移到 list[V1ContainerPort]
之前
from airflow.kubernetes.pod import Port
port = Port("http", 80)
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
ports=[port],
task_id="task",
)
之后
from kubernetes.client import models as k8s
port = k8s.V1ContainerPort(name="http", container_port=80)
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
ports=[port],
task_id="task",
)
卷装载已从 list[VolumeMount] 迁移到 list[V1VolumeMount]
之前
from airflow.kubernetes.volume_mount import VolumeMount
volume_mount = VolumeMount("test-volume", mount_path="/root/mount_file", sub_path=None, read_only=True)
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
volume_mounts=[volume_mount],
task_id="task",
)
之后
from kubernetes.client import models as k8s
volume_mount = k8s.V1VolumeMount(
name="test-volume", mount_path="/root/mount_file", sub_path=None, read_only=True
)
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
volume_mounts=[volume_mount],
task_id="task",
)
卷已从 list[Volume] 迁移到 list[V1Volume]
之前
from airflow.kubernetes.volume import Volume
volume_config = {"persistentVolumeClaim": {"claimName": "test-volume"}}
volume = Volume(name="test-volume", configs=volume_config)
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
volumes=[volume],
task_id="task",
)
之后
from kubernetes.client import models as k8s
volume = k8s.V1Volume(
name="test-volume",
persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name="test-volume"),
)
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
volumes=[volume],
task_id="task",
)
env_vars 已从 dict 迁移到 list[V1EnvVar]
之前
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
env_vars={"ENV1": "val1", "ENV2": "val2"},
task_id="task",
)
之后
from kubernetes.client import models as k8s
env_vars = [
k8s.V1EnvVar(name="ENV1", value="val1"),
k8s.V1EnvVar(name="ENV2", value="val2"),
]
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
env_vars=env_vars,
task_id="task",
)
PodRuntimeInfoEnv 已被移除
PodRuntimeInfoEnv 现在可以作为 V1EnvVarSource
添加到 env_vars
变量中
之前
from airflow.kubernetes.pod_runtime_info_env import PodRuntimeInfoEnv
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
pod_runtime_info_envs=[PodRuntimeInfoEnv("ENV3", "status.podIP")],
task_id="task",
)
之后
from kubernetes.client import models as k8s
env_vars = [
k8s.V1EnvVar(
name="ENV3",
value_from=k8s.V1EnvVarSource(field_ref=k8s.V1ObjectFieldSelector(field_path="status.podIP")),
)
]
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
env_vars=env_vars,
task_id="task",
)
configmaps 已被移除
Configmaps 现在可以作为 V1EnvVarSource
添加到 env_from
变量中
之前
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
configmaps=["test-configmap"],
task_id="task",
)
之后
from kubernetes.client import models as k8s
configmap = "test-configmap"
env_from = [k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name=configmap))]
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
env_from=env_from,
task_id="task",
)
资源已从 Dict 迁移到 V1ResourceRequirements
之前
resources = {
"limit_cpu": 0.25,
"limit_memory": "64Mi",
"limit_ephemeral_storage": "2Gi",
"request_cpu": "250m",
"request_memory": "64Mi",
"request_ephemeral_storage": "1Gi",
}
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
labels={"foo": "bar"},
name="test",
task_id="task" + self.get_current_task_name(),
in_cluster=False,
do_xcom_push=False,
resources=resources,
)
之后
from kubernetes.client import models as k8s
resources = k8s.V1ResourceRequirements(
requests={"memory": "64Mi", "cpu": "250m", "ephemeral-storage": "1Gi"},
limits={
"memory": "64Mi",
"cpu": 0.25,
"nvidia.com/gpu": None,
"ephemeral-storage": "2Gi",
},
)
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
labels={"foo": "bar"},
name="test-" + str(random.randint(0, 1000000)),
task_id="task" + self.get_current_task_name(),
in_cluster=False,
do_xcom_push=False,
resources=resources,
)
image_pull_secrets 已从 String 迁移到 list[k8s.V1LocalObjectReference]
之前
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
name="test",
task_id="task",
image_pull_secrets="fake-secret",
cluster_context="default",
)
之后
quay_k8s = KubernetesPodOperator(
namespace="default",
image="quay.io/apache/bash",
image_pull_secrets=[k8s.V1LocalObjectReference("testquay")],
cmds=["bash", "-cx"],
name="airflow-private-image-pod",
task_id="task-two",
)
从实验性 API 到稳定 API v1 的迁移指南¶
在 Airflow 2.0 中,我们添加了新的 REST API。实验性 API 仍然有效,但将来可能会取消支持。
但是,实验性 API 不需要身份验证,因此默认情况下处于禁用状态。如果您想使用实验性 API,则需要显式启用它。如果您的应用程序仍在使用实验性 API,您应该认真考虑迁移到稳定 API。
稳定 API 公开了许多可通过 Web 服务器访问的端点。以下是这两个端点之间的差异,将帮助您从实验性 REST API 迁移到稳定 REST API。
基本端点
稳定 API v1 的基本端点是 /api/v1/
。您必须将实验性基本端点从 /api/experimental/
更改为 /api/v1/
。下表显示了差异
目的 |
实验性 REST API 端点 |
稳定的 REST API 端点 |
---|---|---|
创建 DAGRuns(POST) |
|
|
列出 DAGRuns(GET) |
|
|
检查运行状况(GET) |
|
|
任务信息(GET) |
|
|
TaskInstance 公共变量(GET) |
|
|
暂停 DAG(PATCH) |
|
|
暂停的 DAG 信息(GET) |
|
|
最新的 DAG 运行(GET) |
|
|
获取所有池(GET) |
|
|
创建池(POST) |
|
|
删除池(DELETE) |
|
|
DAG 血缘(GET) |
|
|
此端点 /api/v1/dags/{dag_id}/dagRuns
还允许你使用查询字符串中的参数(如 start_date
、end_date
、execution_date
等)来过滤 dag_runs。因此,此端点以前执行的操作
/api/experimental/dags/<string:dag_id>/dag_runs/<string:execution_date>
现在可以使用查询字符串中的过滤器参数来处理。有关最新运行的信息可以通过此端点(/api/v1/dags/{dag_id}/dagRuns
)的查询字符串中的过滤器来获取。有关更多信息,请查看稳定的 API 参考文档
从 DAG 回调中进行异常处理的更改¶
来自 DAG 回调的异常会使 Airflow 调度程序崩溃。作为我们使调度程序更强大、更可靠的努力的一部分,我们已将此行为更改为记录异常。最重要的是,已添加了一个新的 dag.callback_exceptions 计数器指标,以帮助更好地监控回调异常。
迁移到 TaskFlow API¶
Airflow 2.0 引入了 TaskFlow API,以简化 Python 可调用任务的声明。建议用户用 TaskFlow 装饰器替代经典运算符。有关详细信息,请参阅 使用 TaskFlow。
经典运算符 |
TaskFlow 装饰器 |
---|---|
|
|
|
|
|
|
|
|
|
|
2.0 中的 Airflow CLI 更改¶
Airflow CLI 已被整理,以便将相关命令分组为子命令,这意味着如果您在脚本中使用这些命令,则必须对它们进行更改。
本节介绍已进行的更改,以及您需要采取哪些措施来更新脚本。从命令行操作用户的能力已更改。 airflow create_user
、airflow delete_user
和 airflow list_users
已分组为带有可选标志 create
、list
和 delete
的单个命令 airflow users
。 airflow list_dags
命令现在是 airflow dags list
,airflow pause
是 airflow dags pause
,依此类推。
在 Airflow 1.10 和 2.0 中,有一个 airflow config
命令,但行为不同。在 Airflow 1.10 中,它打印所有配置选项,而在 Airflow 2.0 中,它是一个命令组。 airflow config
现在是 airflow config list
。您可以通过运行命令 airflow config --help
来查看其他选项
有关更新的 CLI 命令的完整列表,请参阅 https://airflow.apache.org/cli.html。
你可以通过运行 airflow --help
来了解这些命令。例如,要获取有关 celery
组命令的帮助,你必须运行帮助命令:airflow celery --help
。
旧命令 |
新命令 |
组 |
---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
``users`` 组的示例用法
创建新用户
airflow users create --username jondoe --lastname doe --firstname jon --email [email protected] --role Viewer --password test
列出用户
airflow users list
删除用户
airflow users delete --username jondoe
向角色中添加用户
airflow users add-role --username jondoe --role Public
从角色中删除用户
airflow users remove-role --username jondoe --role Public
在 CLI 中使用单个字符来更改短选项样式
对于 Airflow 短选项,请使用一个单个字符。根据下表,可以使用新命令
旧命令 |
新命令 |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
对于 Airflow 长选项,请使用 小写连字符形式,而不是 小写蛇形形式
旧选项 |
新选项 |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
从 CLI 中移除 serve_logs 命令
serve_logs
命令已删除。此命令应仅由内部应用程序机制运行,并且无需从 CLI 界面访问它。
dag_state CLI 命令
如果 DAGRun 是使用传入的 conf 键/值触发的,则它们还将打印在 dag_state CLI 响应中,即正在运行,{“name”: “bob”},而在之前的版本中它只打印状态:即正在运行
在 backfill 命令中弃用 ignore_first_depends_on_past 并将其默认设置为 True
使用 depends_on_past
dags 进行 backfill 时,用户需要传递 --ignore-first-depends-on-past
。我们应该将其默认设置为 true
以避免混淆
对 Airflow 插件的更改¶
如果您正在使用 Airflow 插件并传递 admin_views
和 menu_links
,它们用于非 RBAC UI(基于 flask-admin
的 UI),请更新它以使用 flask_appbuilder_views
和 flask_appbuilder_menu_links
。
旧:
from airflow.plugins_manager import AirflowPlugin
from flask_admin import BaseView, expose
from flask_admin.base import MenuLink
class TestView(BaseView):
@expose("/")
def test(self):
# in this example, put your test_plugin/test.html template at airflow/plugins/templates/test_plugin/test.html
return self.render("test_plugin/test.html", content="Hello galaxy!")
v = TestView(category="Test Plugin", name="Test View")
ml = MenuLink(category="Test Plugin", name="Test Menu Link", url="https://airflow.apache.org/")
class AirflowTestPlugin(AirflowPlugin):
admin_views = [v]
menu_links = [ml]
更改为:
from airflow.plugins_manager import AirflowPlugin
from flask_appbuilder import expose, BaseView as AppBuilderBaseView
class TestAppBuilderBaseView(AppBuilderBaseView):
default_view = "test"
@expose("/")
def test(self):
return self.render_template("test_plugin/test.html", content="Hello galaxy!")
v_appbuilder_view = TestAppBuilderBaseView()
v_appbuilder_package = {
"name": "Test View",
"category": "Test Plugin",
"view": v_appbuilder_view,
}
# Creating a flask appbuilder Menu Item
appbuilder_mitem = {
"name": "Google",
"category": "Search",
"category_icon": "fa-th",
"href": "https://www.google.com",
}
# Defining the plugin class
class AirflowTestPlugin(AirflowPlugin):
name = "test_plugin"
appbuilder_views = [v_appbuilder_package]
appbuilder_menu_items = [appbuilder_mitem]
附加名称的更改¶
将 all
附加缩减为仅包含面向用户的依赖项。这意味着此附加不包含开发依赖项。如果您正在使用它并依赖开发包,则应使用 devel_all
。
对 Airflow 1.10.x 版本的支持¶
Airflow 1.10.x 于 2021 年 6 月 17 日达到生命周期终点。不会再发布任何新的 Airflow 1.x 版本。
对反向移植提供程序的支持于 2021 年 3 月 17 日结束。不会再发布任何新版本的反向移植提供程序。
我们计划对我们的版本控制和发布过程采用严格的语义版本控制方法。这意味着我们不计划在 2.* 版本中进行任何向后不兼容的更改。任何重大更改,包括删除 Airflow 2.0 中弃用的功能,都将作为 Airflow 3.0 版本的一部分进行。