Amazon 弹性 Kubernetes 服务 (EKS)¶
Amazon 弹性 Kubernetes 服务 (Amazon EKS) 是一项托管服务,使您可以在 AWS 上轻松运行 Kubernetes,而无需启动或维护您自己的 Kubernetes 控制平面。Kubernetes 是一个开源系统,用于自动化容器化应用程序的部署、扩展和管理。
Airflow 提供了用于创建 EKS 集群和计算基础设施并与其交互的操作器。
先决条件任务¶
要使用这些操作器,您必须执行以下操作
通过 pip 安装 API 库。
pip install 'apache-airflow[amazon]'详细信息请参阅 Airflow™ 安装
设置连接.
操作器¶
创建 Amazon EKS 集群¶
要创建 Amazon EKS 集群,您可以使用 EksCreateClusterOperator
。
- 注意:需要具有以下权限的 AWS IAM 角色
eks.amazonaws.com
必须添加到信任关系中AmazonEKSClusterPolicy
IAM 策略必须附加
# Create an Amazon EKS Cluster control plane without attaching compute service.
create_cluster = EksCreateClusterOperator(
task_id="create_cluster",
cluster_name=cluster_name,
cluster_role_arn=test_context[ROLE_ARN_KEY],
resources_vpc_config={"subnetIds": test_context[SUBNETS_KEY]},
compute=None,
)
一步创建 Amazon EKS 集群和节点组¶
要在一个命令中创建 Amazon EKS 集群和 EKS 托管节点组,您可以使用 EksCreateClusterOperator
。
- 注意:需要具有以下权限的 AWS IAM 角色
ec2.amazon.aws.com
必须位于信任关系中eks.amazonaws.com
必须添加到信任关系中AmazonEC2ContainerRegistryReadOnly
IAM 策略必须附加AmazonEKSClusterPolicy
IAM 策略必须附加AmazonEKSWorkerNodePolicy
IAM 策略必须附加
# Create an Amazon EKS cluster control plane and an EKS nodegroup compute platform in one step.
create_cluster_and_nodegroup = EksCreateClusterOperator(
task_id="create_cluster_and_nodegroup",
cluster_name=cluster_name,
nodegroup_name=nodegroup_name,
cluster_role_arn=test_context[ROLE_ARN_KEY],
# Opting to use the same ARN for the cluster and the nodegroup here,
# but a different ARN could be configured and passed if desired.
nodegroup_role_arn=test_context[ROLE_ARN_KEY],
resources_vpc_config={"subnetIds": test_context[SUBNETS_KEY]},
# ``compute='nodegroup'`` is the default, explicitly set here for demo purposes.
compute="nodegroup",
# The launch template enforces IMDSv2 and is required for internal
# compliance when running these system tests on AWS infrastructure.
create_nodegroup_kwargs={"launchTemplate": {"name": launch_template_name}},
)
一步创建 Amazon EKS 集群和 AWS Fargate 配置文件¶
要在一个命令中创建 Amazon EKS 集群和 AWS Fargate 配置文件,您可以使用 EksCreateClusterOperator
。您还可以通过将 deferrable
参数设置为 True
来以可延迟模式运行此操作器。
- 注意:需要具有以下权限的 AWS IAM 角色
ec2.amazon.aws.com
必须位于信任关系中eks.amazonaws.com
必须添加到信任关系中AmazonEC2ContainerRegistryReadOnly
IAM 策略必须附加AmazonEKSClusterPolicy
IAM 策略必须附加AmazonEKSWorkerNodePolicy
IAM 策略必须附加
# Create an Amazon EKS cluster control plane and an AWS Fargate compute platform in one step.
create_cluster_and_fargate_profile = EksCreateClusterOperator(
task_id="create_eks_cluster_and_fargate_profile",
cluster_name=cluster_name,
cluster_role_arn=cluster_role_arn,
resources_vpc_config={
"subnetIds": subnets,
"endpointPublicAccess": True,
"endpointPrivateAccess": False,
},
compute="fargate",
fargate_profile_name=fargate_profile_name,
# Opting to use the same ARN for the cluster and the pod here,
# but a different ARN could be configured and passed if desired.
fargate_pod_execution_role_arn=fargate_pod_role_arn,
)
删除 Amazon EKS 集群¶
要删除现有的 Amazon EKS 集群,您可以使用 EksDeleteClusterOperator
。您还可以通过将 deferrable
参数设置为 True
来以可延迟模式运行此操作器。
delete_cluster = EksDeleteClusterOperator(
task_id="delete_cluster",
cluster_name=cluster_name,
)
- 注意:如果集群有任何附加资源,例如 Amazon EKS 节点组或 AWS
Fargate 配置文件,则无法删除该集群。使用
force
参数将尝试先删除任何附加资源。
# An Amazon EKS cluster can not be deleted with attached resources such as nodegroups or Fargate profiles.
# Setting the `force` to `True` will delete any attached resources before deleting the cluster.
delete_nodegroup_and_cluster = EksDeleteClusterOperator(
task_id="delete_nodegroup_and_cluster",
cluster_name=cluster_name,
force_delete_compute=True,
)
创建 Amazon EKS 托管节点组¶
要创建 Amazon EKS 托管节点组,您可以使用 EksCreateNodegroupOperator
。您还可以通过将 deferrable
参数设置为 True
来以可延迟模式运行此操作器。
- 注意:需要具有以下权限的 AWS IAM 角色
ec2.amazon.aws.com
必须位于信任关系中AmazonEC2ContainerRegistryReadOnly
IAM 策略必须附加AmazonEKSWorkerNodePolicy
IAM 策略必须附加
create_nodegroup = EksCreateNodegroupOperator(
task_id="create_nodegroup",
cluster_name=cluster_name,
nodegroup_name=nodegroup_name,
nodegroup_subnets=test_context[SUBNETS_KEY],
nodegroup_role_arn=test_context[ROLE_ARN_KEY],
)
删除 Amazon EKS 托管节点组¶
要删除现有的 Amazon EKS 托管节点组,您可以使用 EksDeleteNodegroupOperator
。您还可以通过将 deferrable
参数设置为 True
来以可延迟模式运行此操作器。
delete_nodegroup = EksDeleteNodegroupOperator(
task_id="delete_nodegroup",
cluster_name=cluster_name,
nodegroup_name=nodegroup_name,
)
创建 AWS Fargate 配置文件¶
要创建 AWS Fargate 配置文件,您可以使用 EksCreateFargateProfileOperator
。
- 注意:需要具有以下权限的 AWS IAM 角色
ec2.amazon.aws.com
必须位于信任关系中AmazonEC2ContainerRegistryReadOnly
IAM 策略必须附加AmazonEKSWorkerNodePolicy
IAM 策略必须附加
create_fargate_profile = EksCreateFargateProfileOperator(
task_id="create_eks_fargate_profile",
cluster_name=cluster_name,
pod_execution_role_arn=fargate_pod_role_arn,
fargate_profile_name=fargate_profile_name,
selectors=SELECTORS,
)
删除 AWS Fargate 配置文件¶
要删除现有的 AWS Fargate 配置文件,您可以使用 EksDeleteFargateProfileOperator
。
delete_fargate_profile = EksDeleteFargateProfileOperator(
task_id="delete_eks_fargate_profile",
cluster_name=cluster_name,
fargate_profile_name=fargate_profile_name,
)
在 Amazon EKS 集群上执行任务¶
要在现有的 Amazon EKS 集群上运行 Pod,您可以使用 EksPodOperator
。
注意:需要具有底层计算基础设施的 Amazon EKS 集群。
start_pod = EksPodOperator(
task_id="start_pod",
pod_name="test_pod",
cluster_name=cluster_name,
image="amazon/aws-cli:latest",
cmds=["sh", "-c", "echo Test Airflow; date"],
labels={"demo": "hello_world"},
get_logs=True,
on_finish_action="keep_pod",
)
传感器¶
等待 Amazon EKS 集群状态¶
要检查 Amazon EKS 集群的状态,直到它达到目标状态或另一个终端状态,您可以使用 EksClusterStateSensor
。
await_create_cluster = EksClusterStateSensor(
task_id="await_create_cluster",
cluster_name=cluster_name,
target_state=ClusterStates.ACTIVE,
)
等待 Amazon EKS 托管节点组状态¶
要检查 Amazon EKS 托管节点组的状态,直到它达到目标状态或另一个终端状态,您可以使用 EksNodegroupStateSensor
。
await_create_nodegroup = EksNodegroupStateSensor(
task_id="await_create_nodegroup",
cluster_name=cluster_name,
nodegroup_name=nodegroup_name,
target_state=NodegroupStates.ACTIVE,
)
等待 AWS Fargate 配置文件状态¶
要检查 AWS Fargate 配置文件的状态,直到它达到目标状态或另一个终端状态,您可以使用 EksFargateProfileSensor
。
await_create_fargate_profile = EksFargateProfileStateSensor(
task_id="wait_for_create_fargate_profile",
cluster_name=cluster_name,
fargate_profile_name=fargate_profile_name,
target_state=FargateProfileStates.ACTIVE,
)