Google Kubernetes Engine 操作符

Google Kubernetes Engine (GKE) 提供了一个托管环境,用于使用 Google 基础设施部署、管理和扩展你的容器化应用程序。GKE 环境由多个机器(具体来说,Compute Engine 实例)组成,这些机器组合在一起形成一个集群。

先决任务

要使用这些操作符,你必须执行一些操作

管理 GKE 集群

集群是 GKE 的基础 - 所有工作负载都运行在集群之上。它由集群主节点和工作节点组成。创建或删除集群时,主节点的生命周期由 GKE 管理。工作节点表示为 Compute Engine VM 实例,GKE 在创建集群时会代表你创建这些实例。

创建 GKE 集群

以下是一个集群定义示例

tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine.py[源代码]

CLUSTER = {"name": CLUSTER_NAME, "initial_node_count": 1}

在使用 GKECreateClusterOperator 创建集群时,需要此类字典对象或 Cluster 定义。

tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine.py[源代码]

create_cluster = GKECreateClusterOperator(
    task_id="create_cluster",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    body=CLUSTER,
)

你可以为此操作使用可延迟模式,以便异步运行操作员。它将让你在知道必须等待时释放工作进程,并将恢复操作员的任务交给触发器。因此,在挂起(延迟)时,它不会占用工作进程槽,并且你的集群在空闲操作员或传感器上浪费的资源会大大减少

tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_async.py[源代码]

create_cluster = GKECreateClusterOperator(
    task_id="create_cluster",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    body=CLUSTER,
    deferrable=True,
)

在集群内安装特定版本的 Kueue

Kueue 是一款云原生作业调度程序,与默认的 Kubernetes 调度程序、作业控制器和集群自动缩放器协同工作,提供端到端批处理系统。Kueue 实施作业队列,根据配额和用于在团队之间公平共享资源的层级结构,决定作业何时应该等待和何时应该启动。Kueue 支持自动驾驶集群、具有节点自动配置的标准 GKE 和常规自动缩放节点池。要使用 GKEStartKueueInsideClusterOperator 在集群上安装和使用 Kueue,请参见此示例

tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_kueue.py[源代码]

    add_kueue_cluster = GKEStartKueueInsideClusterOperator(
        task_id="add_kueue_cluster",
        project_id=GCP_PROJECT_ID,
        location=GCP_LOCATION,
        cluster_name=CLUSTER_NAME,
        kueue_version="v0.6.2",
    )

删除 GKE 集群

要删除集群,请使用 GKEDeleteClusterOperator。这还将删除分配给集群的所有节点。

tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine.py[源代码]

delete_cluster = GKEDeleteClusterOperator(
    task_id="delete_cluster",
    name=CLUSTER_NAME,
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
)

你可以为此操作使用可延迟模式,以便异步运行操作员。它将让你在知道必须等待时释放工作进程,并将恢复操作员的任务交给触发器。因此,在挂起(延迟)时,它不会占用工作进程槽,并且你的集群在空闲操作员或传感器上浪费的资源会大大减少

tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_async.py[源代码]

delete_cluster = GKEDeleteClusterOperator(
    task_id="delete_cluster",
    name=CLUSTER_NAME,
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    deferrable=True,
)

管理 GKE 集群上的工作负载

GKE 适用于容器化应用程序,例如在 Docker 上创建的应用程序,并将它们部署到集群上运行。这些称为工作负载,当部署到集群上时,它们会利用集群的 CPU 和内存资源以有效运行。

在 GKE 集群上运行 Pod

有两种可用的运算符可以在 GKE 集群上运行 Pod

GKEStartPodOperator 扩展 KubernetesPodOperator 以提供使用 Google Cloud 凭据的授权。无需管理 kube_config 文件,因为它将自动生成。所有 Kubernetes 参数(config_file 除外)也适用于 GKEStartPodOperator。有关 KubernetesPodOperator 的更多信息,请参阅:KubernetesPodOperator 指南。

与私有集群一起使用

所有集群都有一个规范端点。端点是 Kubernetes API 服务器的 IP 地址,Airflow 使用该地址与集群主服务器通信。端点显示在 Cloud Console 中集群的详细信息选项卡的端点字段下,以及 gcloud container clusters describe 的输出中的端点字段中。

私有集群有两个唯一的端点值:privateEndpoint(内部 IP 地址)和 publicEndpoint(外部 IP 地址)。默认情况下,针对私有集群运行 GKEStartPodOperator 会将外部 IP 地址设置为端点。如果您希望使用内部 IP 作为端点,则需要将 use_internal_ip 参数设置为 True

与 Autopilot(无服务器)集群一起使用

在无服务器集群(如 GKE Autopilot)上运行时,由于冷启动,pod 启动有时可能需要更长时间。在 pod 启动期间,会定期以较短的时间间隔检查状态,如果 pod 尚未启动,则会发出警告消息。您可以通过 startup_check_interval_seconds 参数增加此时间间隔长度,建议为 60 秒。

使用 XCom

我们可以在操作员上启用 XCom 的使用。这通过使用指定的 Pod 启动边车容器来实现。当指定 XCom 使用时,边车会自动挂载,其挂载点是路径 /airflow/xcom。要向 XCom 提供值,请确保你的 Pod 将其写入边车中名为 return.json 的文件中。然后可以在 DAG 中的下游使用此内容。以下是一个使用示例

tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine.py[源代码]

pod_task_xcom = GKEStartPodOperator(
    task_id="pod_task_xcom",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    cluster_name=CLUSTER_NAME,
    do_xcom_push=True,
    namespace="default",
    image="alpine",
    cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"],
    name="test-pod-xcom",
    in_cluster=False,
    on_finish_action="delete_pod",
)

然后在其他操作员中使用它

tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine.py[源代码]

pod_task_xcom_result = BashOperator(
    bash_command="echo \"{{ task_instance.xcom_pull('pod_task_xcom')[0] }}\"",
    task_id="pod_task_xcom_result",
)

你可以为此操作使用可延迟模式,以便异步运行操作员。它将让你在知道必须等待时释放工作进程,并将恢复操作员的任务交给触发器。因此,在挂起(延迟)时,它不会占用工作进程槽,并且你的集群在空闲操作员或传感器上浪费的资源会大大减少

tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_async.py[源代码]

pod_task_xcom_async = GKEStartPodOperator(
    task_id="pod_task_xcom_async",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    cluster_name=CLUSTER_NAME,
    namespace="default",
    image="alpine",
    cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"],
    name="test-pod-xcom-async",
    in_cluster=False,
    on_finish_action="delete_pod",
    do_xcom_push=True,
    deferrable=True,
    get_logs=True,
)

在 GKE 集群上运行作业

有两种操作员可用于在 GKE 集群上运行作业

GKEStartJobOperator 扩展 KubernetesJobOperator 以使用 Google Cloud 凭据提供授权。无需管理 kube_config 文件,因为它将自动生成。所有 Kubernetes 参数(config_file 除外)也对 GKEStartJobOperator 有效。

tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py[源代码]

job_task = GKEStartJobOperator(
    task_id="job_task",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    cluster_name=CLUSTER_NAME,
    namespace=JOB_NAMESPACE,
    image="perl:5.34.0",
    cmds=["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"],
    name=JOB_NAME,
)

GKEStartJobOperator 还支持可延迟模式。请注意,仅当 wait_until_job_complete 参数设置为 True 时才有意义。

tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py[源代码]

job_task_def = GKEStartJobOperator(
    task_id="job_task_def",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    cluster_name=CLUSTER_NAME,
    namespace=JOB_NAMESPACE,
    image="perl:5.34.0",
    cmds=["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"],
    name=JOB_NAME_DEF,
    wait_until_job_complete=True,
    deferrable=True,
)

要在启用 Kueue 的 GKE 集群上运行作业,请使用 GKEStartKueueJobOperator

tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_kueue.py[源代码]

kueue_job_task = GKEStartKueueJobOperator(
    task_id="kueue_job_task",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    cluster_name=CLUSTER_NAME,
    queue_name=QUEUE_NAME,
    namespace="default",
    parallelism=3,
    image="perl:5.34.0",
    cmds=["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"],
    name="test-pi",
    suspend=True,
    container_resources=k8s.V1ResourceRequirements(
        requests={
            "cpu": 1,
            "memory": "200Mi",
        },
    ),
)

在 GKE 集群上删除作业

在 GKE 集群上删除作业时可以使用两种运算符

GKEDeleteJobOperator 扩展了 KubernetesDeleteJobOperator,以使用 Google Cloud 凭据提供授权。无需管理 kube_config 文件,因为它将自动生成。所有 Kubernetes 参数(config_file 除外)对于 GKEDeleteJobOperator 也是有效的。

tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py[源代码]

delete_job = GKEDeleteJobOperator(
    task_id="delete_job",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    cluster_name=CLUSTER_NAME,
    name=JOB_NAME,
    namespace=JOB_NAMESPACE,
)

通过给定的名称检索作业信息

您可以使用 GKEDescribeJobOperator 通过提供作业的名称和命名空间来检索现有作业的详细说明。

tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py[源代码]

describe_job_task = GKEDescribeJobOperator(
    task_id="describe_job_task",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    job_name=job_task.output["job_name"],
    namespace="default",
    cluster_name=CLUSTER_NAME,
)

检索作业列表

您可以使用 GKEListJobsOperator 检索现有作业的列表。如果提供了 namespace 参数,输出将包括给定命名空间中的作业。如果未指定 namespace 参数,将输出所有命名空间中的信息。

tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py[源代码]

list_job_task = GKEListJobsOperator(
    task_id="list_job_task", project_id=GCP_PROJECT_ID, location=GCP_LOCATION, cluster_name=CLUSTER_NAME
)

在 GKE 集群中创建资源

您可以使用 GKECreateCustomResourceOperator 在指定的 Google Kubernetes Engine 集群中创建资源。

tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_resource.py[源代码]

create_resource_task = GKECreateCustomResourceOperator(
    task_id="create_resource_task",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    cluster_name=CLUSTER_NAME,
    yaml_conf=PVC_CONF,
)

在 GKE 集群中删除资源

您可以使用 GKEDeleteCustomResourceOperator 在指定的 Google Kubernetes Engine 集群中删除资源。

tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_resource.py[源代码]

delete_resource_task = GKEDeleteCustomResourceOperator(
    task_id="delete_resource_task",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    cluster_name=CLUSTER_NAME,
    yaml_conf=PVC_CONF,
)

暂停 GKE 集群上的作业

您可以使用 GKESuspendJobOperator 暂停指定 Google Kubernetes Engine 集群中的作业。

tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py[源代码]

suspend_job = GKESuspendJobOperator(
    task_id="suspend_job",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    cluster_name=CLUSTER_NAME,
    name=job_task.output["job_name"],
    namespace="default",
)

恢复 GKE 集群上的作业

您可以使用 GKEResumeJobOperator 恢复指定 Google Kubernetes Engine 集群中的作业。

tests/system/providers/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py[源代码]

resume_job = GKEResumeJobOperator(
    task_id="resume_job",
    project_id=GCP_PROJECT_ID,
    location=GCP_LOCATION,
    cluster_name=CLUSTER_NAME,
    name=job_task.output["job_name"],
    namespace="default",
)

此条目有帮助吗?