Google Kubernetes Engine 操作符¶
Google Kubernetes Engine (GKE) 提供了一个托管环境,用于使用 Google 基础设施部署、管理和扩展你的容器化应用程序。GKE 环境由多个机器(具体来说,Compute Engine 实例)组成,这些机器组合在一起形成一个集群。
先决任务¶
要使用这些操作符,你必须执行一些操作
使用 Cloud 控制台 选择或创建一个 Cloud Platform 项目。
为你的项目启用帐单,如 Google Cloud 文档 中所述。
启用 API,如 Cloud 控制台文档 中所述。
通过 pip 安装 API 库。
pip install 'apache-airflow[google]'有关 安装 的详细信息,请参阅。
管理 GKE 集群¶
集群是 GKE 的基础 - 所有工作负载都运行在集群之上。它由集群主节点和工作节点组成。创建或删除集群时,主节点的生命周期由 GKE 管理。工作节点表示为 Compute Engine VM 实例,GKE 在创建集群时会代表你创建这些实例。
创建 GKE 集群¶
以下是一个集群定义示例
CLUSTER = {"name": CLUSTER_NAME, "initial_node_count": 1}
在使用 GKECreateClusterOperator
创建集群时,需要此类字典对象或 Cluster
定义。
create_cluster = GKECreateClusterOperator(
task_id="create_cluster",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
body=CLUSTER,
)
你可以为此操作使用可延迟模式,以便异步运行操作员。它将让你在知道必须等待时释放工作进程,并将恢复操作员的任务交给触发器。因此,在挂起(延迟)时,它不会占用工作进程槽,并且你的集群在空闲操作员或传感器上浪费的资源会大大减少
create_cluster = GKECreateClusterOperator(
task_id="create_cluster",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
body=CLUSTER,
deferrable=True,
)
在集群内安装特定版本的 Kueue¶
Kueue 是一款云原生作业调度程序,与默认的 Kubernetes 调度程序、作业控制器和集群自动缩放器协同工作,提供端到端批处理系统。Kueue 实施作业队列,根据配额和用于在团队之间公平共享资源的层级结构,决定作业何时应该等待和何时应该启动。Kueue 支持自动驾驶集群、具有节点自动配置的标准 GKE 和常规自动缩放节点池。要使用 GKEStartKueueInsideClusterOperator
在集群上安装和使用 Kueue,请参见此示例
add_kueue_cluster = GKEStartKueueInsideClusterOperator(
task_id="add_kueue_cluster",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
kueue_version="v0.6.2",
)
删除 GKE 集群¶
要删除集群,请使用 GKEDeleteClusterOperator
。这还将删除分配给集群的所有节点。
delete_cluster = GKEDeleteClusterOperator(
task_id="delete_cluster",
name=CLUSTER_NAME,
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
)
你可以为此操作使用可延迟模式,以便异步运行操作员。它将让你在知道必须等待时释放工作进程,并将恢复操作员的任务交给触发器。因此,在挂起(延迟)时,它不会占用工作进程槽,并且你的集群在空闲操作员或传感器上浪费的资源会大大减少
delete_cluster = GKEDeleteClusterOperator(
task_id="delete_cluster",
name=CLUSTER_NAME,
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
deferrable=True,
)
管理 GKE 集群上的工作负载¶
GKE 适用于容器化应用程序,例如在 Docker 上创建的应用程序,并将它们部署到集群上运行。这些称为工作负载,当部署到集群上时,它们会利用集群的 CPU 和内存资源以有效运行。
在 GKE 集群上运行 Pod¶
有两种可用的运算符可以在 GKE 集群上运行 Pod
GKEStartPodOperator
扩展 KubernetesPodOperator
以提供使用 Google Cloud 凭据的授权。无需管理 kube_config
文件,因为它将自动生成。所有 Kubernetes 参数(config_file
除外)也适用于 GKEStartPodOperator
。有关 KubernetesPodOperator
的更多信息,请参阅:KubernetesPodOperator 指南。
与私有集群一起使用¶
所有集群都有一个规范端点。端点是 Kubernetes API 服务器的 IP 地址,Airflow 使用该地址与集群主服务器通信。端点显示在 Cloud Console 中集群的详细信息选项卡的端点字段下,以及 gcloud container clusters describe
的输出中的端点字段中。
私有集群有两个唯一的端点值:privateEndpoint
(内部 IP 地址)和 publicEndpoint
(外部 IP 地址)。默认情况下,针对私有集群运行 GKEStartPodOperator
会将外部 IP 地址设置为端点。如果您希望使用内部 IP 作为端点,则需要将 use_internal_ip
参数设置为 True
。
与 Autopilot(无服务器)集群一起使用¶
在无服务器集群(如 GKE Autopilot)上运行时,由于冷启动,pod 启动有时可能需要更长时间。在 pod 启动期间,会定期以较短的时间间隔检查状态,如果 pod 尚未启动,则会发出警告消息。您可以通过 startup_check_interval_seconds
参数增加此时间间隔长度,建议为 60 秒。
使用 XCom¶
我们可以在操作员上启用 XCom 的使用。这通过使用指定的 Pod 启动边车容器来实现。当指定 XCom 使用时,边车会自动挂载,其挂载点是路径 /airflow/xcom
。要向 XCom 提供值,请确保你的 Pod 将其写入边车中名为 return.json
的文件中。然后可以在 DAG 中的下游使用此内容。以下是一个使用示例
pod_task_xcom = GKEStartPodOperator(
task_id="pod_task_xcom",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
do_xcom_push=True,
namespace="default",
image="alpine",
cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"],
name="test-pod-xcom",
in_cluster=False,
on_finish_action="delete_pod",
)
然后在其他操作员中使用它
pod_task_xcom_result = BashOperator(
bash_command="echo \"{{ task_instance.xcom_pull('pod_task_xcom')[0] }}\"",
task_id="pod_task_xcom_result",
)
你可以为此操作使用可延迟模式,以便异步运行操作员。它将让你在知道必须等待时释放工作进程,并将恢复操作员的任务交给触发器。因此,在挂起(延迟)时,它不会占用工作进程槽,并且你的集群在空闲操作员或传感器上浪费的资源会大大减少
pod_task_xcom_async = GKEStartPodOperator(
task_id="pod_task_xcom_async",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
namespace="default",
image="alpine",
cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"],
name="test-pod-xcom-async",
in_cluster=False,
on_finish_action="delete_pod",
do_xcom_push=True,
deferrable=True,
get_logs=True,
)
在 GKE 集群上运行作业¶
有两种操作员可用于在 GKE 集群上运行作业
GKEStartJobOperator
扩展 KubernetesJobOperator
以使用 Google Cloud 凭据提供授权。无需管理 kube_config
文件,因为它将自动生成。所有 Kubernetes 参数(config_file
除外)也对 GKEStartJobOperator
有效。
job_task = GKEStartJobOperator(
task_id="job_task",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
namespace=JOB_NAMESPACE,
image="perl:5.34.0",
cmds=["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"],
name=JOB_NAME,
)
GKEStartJobOperator
还支持可延迟模式。请注意,仅当 wait_until_job_complete
参数设置为 True
时才有意义。
job_task_def = GKEStartJobOperator(
task_id="job_task_def",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
namespace=JOB_NAMESPACE,
image="perl:5.34.0",
cmds=["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"],
name=JOB_NAME_DEF,
wait_until_job_complete=True,
deferrable=True,
)
要在启用 Kueue 的 GKE 集群上运行作业,请使用 GKEStartKueueJobOperator
。
kueue_job_task = GKEStartKueueJobOperator(
task_id="kueue_job_task",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
queue_name=QUEUE_NAME,
namespace="default",
parallelism=3,
image="perl:5.34.0",
cmds=["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"],
name="test-pi",
suspend=True,
container_resources=k8s.V1ResourceRequirements(
requests={
"cpu": 1,
"memory": "200Mi",
},
),
)
在 GKE 集群上删除作业¶
在 GKE 集群上删除作业时可以使用两种运算符
GKEDeleteJobOperator
扩展了 KubernetesDeleteJobOperator
,以使用 Google Cloud 凭据提供授权。无需管理 kube_config
文件,因为它将自动生成。所有 Kubernetes 参数(config_file
除外)对于 GKEDeleteJobOperator
也是有效的。
delete_job = GKEDeleteJobOperator(
task_id="delete_job",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
name=JOB_NAME,
namespace=JOB_NAMESPACE,
)
通过给定的名称检索作业信息¶
您可以使用 GKEDescribeJobOperator
通过提供作业的名称和命名空间来检索现有作业的详细说明。
describe_job_task = GKEDescribeJobOperator(
task_id="describe_job_task",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
job_name=job_task.output["job_name"],
namespace="default",
cluster_name=CLUSTER_NAME,
)
检索作业列表¶
您可以使用 GKEListJobsOperator
检索现有作业的列表。如果提供了 namespace
参数,输出将包括给定命名空间中的作业。如果未指定 namespace
参数,将输出所有命名空间中的信息。
list_job_task = GKEListJobsOperator(
task_id="list_job_task", project_id=GCP_PROJECT_ID, location=GCP_LOCATION, cluster_name=CLUSTER_NAME
)
在 GKE 集群中创建资源¶
您可以使用 GKECreateCustomResourceOperator
在指定的 Google Kubernetes Engine 集群中创建资源。
create_resource_task = GKECreateCustomResourceOperator(
task_id="create_resource_task",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
yaml_conf=PVC_CONF,
)
在 GKE 集群中删除资源¶
您可以使用 GKEDeleteCustomResourceOperator
在指定的 Google Kubernetes Engine 集群中删除资源。
delete_resource_task = GKEDeleteCustomResourceOperator(
task_id="delete_resource_task",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
yaml_conf=PVC_CONF,
)
暂停 GKE 集群上的作业¶
您可以使用 GKESuspendJobOperator
暂停指定 Google Kubernetes Engine 集群中的作业。
suspend_job = GKESuspendJobOperator(
task_id="suspend_job",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
name=job_task.output["job_name"],
namespace="default",
)
恢复 GKE 集群上的作业¶
您可以使用 GKEResumeJobOperator
恢复指定 Google Kubernetes Engine 集群中的作业。
resume_job = GKEResumeJobOperator(
task_id="resume_job",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
cluster_name=CLUSTER_NAME,
name=job_task.output["job_name"],
namespace="default",
)