Amazon Elastic Kubernetes Service (EKS)¶
Amazon Elastic Kubernetes Service (Amazon EKS) 是一项托管服务,可让您轻松在 AWS 上运行 Kubernetes,而无需自行建立或维护 Kubernetes 控制平面。Kubernetes 是一个开源系统,用于自动化容器化应用的部署、扩缩和管理。
Airflow 提供了用于创建 EKS 集群和计算基础设施以及与其交互的 Operator。
前置任务¶
要使用这些 Operator,您必须执行以下几项操作
使用 AWS Console 或 AWS CLI 创建必要的资源。
通过 pip 安装 API 库。
pip install 'apache-airflow[amazon]'详细信息请参阅 安装 Airflow®
设置连接.
通用参数¶
- aws_conn_id
- 引用 Amazon Web Services 连接 ID。如果此参数设置为 - None,则使用默认的 boto3 行为,无需查找连接。否则,使用 Connection 中存储的凭据。默认值:- aws_default
- region_name
- AWS 区域名称。如果此参数设置为 - None或省略,则使用 AWS 连接额外参数 中的 region_name。否则,使用指定的值代替连接中的值。默认值:- None
- verify
- 是否验证 SSL 证书。 - False- 不验证 SSL 证书。
- path/to/cert/bundle.pem - 要使用的 CA 证书捆绑包的文件名。如果您想使用与 botocore 使用的 CA 证书捆绑包不同的捆绑包,可以指定此参数。 
 - 如果此参数设置为 - None或省略,则使用 AWS 连接额外参数 中的 verify。否则,使用指定的值代替连接中的值。默认值:- None
- botocore_config
- 提供的字典用于构造一个 botocore.config.Config。此配置可用于配置 避免限流异常、超时等。 示例,有关参数的更多详细信息,请参阅 botocore.config.Config¶- { "signature_version": "unsigned", "s3": { "us_east_1_regional_endpoint": True, }, "retries": { "mode": "standard", "max_attempts": 10, }, "connect_timeout": 300, "read_timeout": 300, "tcp_keepalive": True, } - 如果此参数设置为 - None或省略,则使用 AWS 连接额外参数 中的 config_kwargs。否则,使用指定的值代替连接中的值。默认值:- None- 注意 - 指定一个空字典, - {},将覆盖 botocore.config.Config 的连接配置
Operator¶
创建 Amazon EKS 集群¶
要创建 Amazon EKS 集群,您可以使用 EksCreateClusterOperator。
- 注意:需要具有以下权限的 AWS IAM 角色
- eks.amazonaws.com必须添加到信任关系中- AmazonEKSClusterPolicyIAM Policy 必须附加
tests/system/amazon/aws/example_eks_with_nodegroups.py
# Create an Amazon EKS Cluster control plane without attaching compute service.
create_cluster = EksCreateClusterOperator(
    task_id="create_cluster",
    cluster_name=cluster_name,
    cluster_role_arn=test_context[ROLE_ARN_KEY],
    resources_vpc_config={"subnetIds": test_context[SUBNETS_KEY]},
    compute=None,
)
一步创建 Amazon EKS 集群和节点组¶
要一步创建 Amazon EKS 集群和 EKS 托管节点组,您可以使用 EksCreateClusterOperator。
- 注意:需要具有以下权限的 AWS IAM 角色
- ec2.amazon.aws.com必须在信任关系中- eks.amazonaws.com必须添加到信任关系中- AmazonEC2ContainerRegistryReadOnlyIAM Policy 必须附加- AmazonEKSClusterPolicyIAM Policy 必须附加- AmazonEKSWorkerNodePolicyIAM Policy 必须附加
tests/system/amazon/aws/example_eks_with_nodegroup_in_one_step.py
# Create an Amazon EKS cluster control plane and an EKS nodegroup compute platform in one step.
create_cluster_and_nodegroup = EksCreateClusterOperator(
    task_id="create_cluster_and_nodegroup",
    cluster_name=cluster_name,
    nodegroup_name=nodegroup_name,
    cluster_role_arn=test_context[ROLE_ARN_KEY],
    # Opting to use the same ARN for the cluster and the nodegroup here,
    # but a different ARN could be configured and passed if desired.
    nodegroup_role_arn=test_context[ROLE_ARN_KEY],
    resources_vpc_config={"subnetIds": test_context[SUBNETS_KEY]},
    # ``compute='nodegroup'`` is the default, explicitly set here for demo purposes.
    compute="nodegroup",
    # The launch template enforces IMDSv2 and is required for internal
    # compliance when running these system tests on AWS infrastructure.
    create_nodegroup_kwargs={"launchTemplate": {"name": launch_template_name}},
)
一步创建 Amazon EKS 集群和 AWS Fargate 配置文件¶
要一步创建 Amazon EKS 集群和 AWS Fargate 配置文件,您可以使用 EksCreateClusterOperator。您还可以通过将 deferrable 参数设置为 True,在可延迟模式下运行此 Operator。
- 注意:需要具有以下权限的 AWS IAM 角色
- ec2.amazon.aws.com必须在信任关系中- eks.amazonaws.com必须添加到信任关系中- AmazonEC2ContainerRegistryReadOnlyIAM Policy 必须附加- AmazonEKSClusterPolicyIAM Policy 必须附加- AmazonEKSWorkerNodePolicyIAM Policy 必须附加
tests/system/amazon/aws/example_eks_with_fargate_in_one_step.py
# Create an Amazon EKS cluster control plane and an AWS Fargate compute platform in one step.
create_cluster_and_fargate_profile = EksCreateClusterOperator(
    task_id="create_eks_cluster_and_fargate_profile",
    cluster_name=cluster_name,
    cluster_role_arn=cluster_role_arn,
    resources_vpc_config={
        "subnetIds": subnets,
        "endpointPublicAccess": True,
        "endpointPrivateAccess": False,
    },
    compute="fargate",
    fargate_profile_name=fargate_profile_name,
    # Opting to use the same ARN for the cluster and the pod here,
    # but a different ARN could be configured and passed if desired.
    fargate_pod_execution_role_arn=fargate_pod_role_arn,
)
删除 Amazon EKS 集群¶
要删除现有的 Amazon EKS 集群,您可以使用 EksDeleteClusterOperator。您还可以通过将 deferrable 参数设置为 True,在可延迟模式下运行此 Operator。
tests/system/amazon/aws/example_eks_with_nodegroups.py
delete_cluster = EksDeleteClusterOperator(
    task_id="delete_cluster",
    cluster_name=cluster_name,
)
- 注意:如果集群附加了任何资源,例如 Amazon EKS 节点组或 AWS
- Fargate 配置文件,则无法删除集群。使用 - force参数将首先尝试删除任何附加的资源。
tests/system/amazon/aws/example_eks_with_nodegroup_in_one_step.py
# An Amazon EKS cluster can not be deleted with attached resources such as nodegroups or Fargate profiles.
# Setting the `force` to `True` will delete any attached resources before deleting the cluster.
delete_nodegroup_and_cluster = EksDeleteClusterOperator(
    task_id="delete_nodegroup_and_cluster",
    cluster_name=cluster_name,
    force_delete_compute=True,
)
创建 Amazon EKS 托管节点组¶
要创建 Amazon EKS 托管节点组,您可以使用 EksCreateNodegroupOperator。您还可以通过将 deferrable 参数设置为 True,在可延迟模式下运行此 Operator。
- 注意:需要具有以下权限的 AWS IAM 角色
- ec2.amazon.aws.com必须在信任关系中- AmazonEC2ContainerRegistryReadOnlyIAM Policy 必须附加- AmazonEKSWorkerNodePolicyIAM Policy 必须附加
tests/system/amazon/aws/example_eks_with_nodegroups.py
create_nodegroup = EksCreateNodegroupOperator(
    task_id="create_nodegroup",
    cluster_name=cluster_name,
    nodegroup_name=nodegroup_name,
    nodegroup_subnets=test_context[SUBNETS_KEY],
    nodegroup_role_arn=test_context[ROLE_ARN_KEY],
)
删除 Amazon EKS 托管节点组¶
要删除现有的 Amazon EKS 托管节点组,您可以使用 EksDeleteNodegroupOperator。您还可以通过将 deferrable 参数设置为 True,在可延迟模式下运行此 Operator。
tests/system/amazon/aws/example_eks_with_nodegroups.py
delete_nodegroup = EksDeleteNodegroupOperator(
    task_id="delete_nodegroup",
    cluster_name=cluster_name,
    nodegroup_name=nodegroup_name,
)
创建 AWS Fargate 配置文件¶
要创建 AWS Fargate 配置文件,您可以使用 EksCreateFargateProfileOperator。
- 注意:需要具有以下权限的 AWS IAM 角色
- ec2.amazon.aws.com必须在信任关系中- AmazonEC2ContainerRegistryReadOnlyIAM Policy 必须附加- AmazonEKSWorkerNodePolicyIAM Policy 必须附加
tests/system/amazon/aws/example_eks_with_fargate_profile.py
create_fargate_profile = EksCreateFargateProfileOperator(
    task_id="create_eks_fargate_profile",
    cluster_name=cluster_name,
    pod_execution_role_arn=fargate_pod_role_arn,
    fargate_profile_name=fargate_profile_name,
    selectors=SELECTORS,
)
删除 AWS Fargate 配置文件¶
要删除现有的 AWS Fargate 配置文件,您可以使用 EksDeleteFargateProfileOperator。
tests/system/amazon/aws/example_eks_with_fargate_profile.py
delete_fargate_profile = EksDeleteFargateProfileOperator(
    task_id="delete_eks_fargate_profile",
    cluster_name=cluster_name,
    fargate_profile_name=fargate_profile_name,
)
在 Amazon EKS 集群上执行任务¶
要在现有的 Amazon EKS 集群上运行 Pod,您可以使用 EksPodOperator。
注意:需要具有底层计算基础设施的 Amazon EKS 集群。
tests/system/amazon/aws/example_eks_with_nodegroups.py
start_pod = EksPodOperator(
    task_id="start_pod",
    pod_name="test_pod",
    cluster_name=cluster_name,
    image="amazon/aws-cli:latest",
    cmds=["sh", "-c", "echo Test Airflow; date"],
    labels={"demo": "hello_world"},
    get_logs=True,
    on_finish_action="keep_pod",
)
Sensor¶
等待 Amazon EKS 集群状态¶
要检查 Amazon EKS 集群的状态,直到其达到目标状态或其他终端状态,您可以使用 EksClusterStateSensor。
tests/system/amazon/aws/example_eks_with_nodegroups.py
await_create_cluster = EksClusterStateSensor(
    task_id="await_create_cluster",
    cluster_name=cluster_name,
    target_state=ClusterStates.ACTIVE,
)
等待 Amazon EKS 托管节点组状态¶
要检查 Amazon EKS 托管节点组的状态,直到其达到目标状态或其他终端状态,您可以使用 EksNodegroupStateSensor。
tests/system/amazon/aws/example_eks_with_nodegroups.py
await_create_nodegroup = EksNodegroupStateSensor(
    task_id="await_create_nodegroup",
    cluster_name=cluster_name,
    nodegroup_name=nodegroup_name,
    target_state=NodegroupStates.ACTIVE,
)
等待 AWS Fargate 配置文件状态¶
要检查 AWS Fargate 配置文件的状态,直到其达到目标状态或其他终端状态,您可以使用 EksFargateProfileSensor。
tests/system/amazon/aws/example_eks_with_fargate_profile.py
await_create_fargate_profile = EksFargateProfileStateSensor(
    task_id="wait_for_create_fargate_profile",
    cluster_name=cluster_name,
    fargate_profile_name=fargate_profile_name,
    target_state=FargateProfileStates.ACTIVE,
)