AWS Batch¶
AWS Batch 使您能够在 AWS 云上运行批量计算工作负载。批量计算是开发人员、科学家和工程师访问大量计算资源的常用方式。AWS Batch 消除了配置和管理所需基础设施的繁重工作。
前提条件任务¶
要使用这些运算符,您需要做几件事:
通过 pip 安装 API 库。
pip install 'apache-airflow[amazon]'详细信息可在 安装 Airflow® 中找到
设置连接.
通用参数¶
- aws_conn_id
- Amazon Web Services 连接 ID 的引用。如果此参数设置为 - None,则使用默认的 boto3 行为,而不进行连接查找。否则使用连接中存储的凭据。默认值:- aws_default
- region_name
- AWS 区域名称。如果此参数设置为 - None或省略,则将使用 AWS Connection Extra Parameter 中的 region_name。否则使用指定的值而不是连接值。默认值:- None
- verify
- 是否验证 SSL 证书。 - False- 不验证 SSL 证书。
- path/to/cert/bundle.pem - 要使用的 CA 证书包的文件名。如果您想使用与 botocore 使用的不同的 CA 证书包,可以指定此参数。 
 - 如果此参数设置为 - None或省略,则将使用 AWS Connection Extra Parameter 中的 verify。否则使用指定的值而不是连接值。默认值:- None
- botocore_config
- 提供的字典用于构造一个 botocore.config.Config。此配置可用于配置 避免限制异常、超时等。 示例,有关参数的更多详细信息,请参阅 botocore.config.Config¶- { "signature_version": "unsigned", "s3": { "us_east_1_regional_endpoint": True, }, "retries": { "mode": "standard", "max_attempts": 10, }, "connect_timeout": 300, "read_timeout": 300, "tcp_keepalive": True, } - 如果此参数设置为 - None或省略,则将使用 AWS Connection Extra Parameter 中的 config_kwargs。否则使用指定的值而不是连接值。默认值:- None- 注意 - 指定一个空字典 - {}将覆盖 botocore.config.Config 的连接配置。
运算符¶
提交新的 AWS Batch 作业¶
要提交新的 AWS Batch 作业并监控其直到达到终止状态,您可以使用 BatchOperator。
tests/system/amazon/aws/example_batch.py
submit_batch_job = BatchOperator(
    task_id="submit_batch_job",
    job_name=batch_job_name,
    job_queue=batch_job_queue_name,
    job_definition=batch_job_definition_name,
    container_overrides=JOB_OVERRIDES,
)
创建 AWS Batch 计算环境¶
要创建新的 AWS Batch 计算环境,您可以使用 BatchCreateComputeEnvironmentOperator。
tests/system/amazon/aws/example_batch.py
create_compute_environment = BatchCreateComputeEnvironmentOperator(
    task_id="create_compute_environment",
    compute_environment_name=batch_job_compute_environment_name,
    environment_type="MANAGED",
    state="ENABLED",
    compute_resources={
        "type": "FARGATE",
        "maxvCpus": 10,
        "securityGroupIds": security_groups,
        "subnets": subnets,
    },
)
传感器¶
等待 AWS Batch 作业状态¶
要等待 AWS Batch 作业状态直到达到终止状态,您可以使用 BatchSensor。
tests/system/amazon/aws/example_batch.py
wait_for_batch_job = BatchSensor(
    task_id="wait_for_batch_job",
    job_id=submit_batch_job.output,
)
为了异步监控 AWS Batch 作业的状态,请使用 BatchSensor,并将参数 deferrable 设置为 True。
由于这将释放 Airflow worker 插槽,因此可以有效利用 Airflow 部署中的可用资源。这还需要在您的 Airflow 部署中提供触发器组件。
等待 AWS Batch 计算环境状态¶
要等待 AWS Batch 计算环境状态直到达到终止状态,您可以使用 BatchComputeEnvironmentSensor。
tests/system/amazon/aws/example_batch.py
wait_for_compute_environment_valid = BatchComputeEnvironmentSensor(
    task_id="wait_for_compute_environment_valid",
    compute_environment=batch_job_compute_environment_name,
)
等待 AWS Batch 作业队列状态¶
要等待 AWS Batch 作业队列状态直到达到终止状态,您可以使用 BatchJobQueueSensor。
tests/system/amazon/aws/example_batch.py
wait_for_job_queue_valid = BatchJobQueueSensor(
    task_id="wait_for_job_queue_valid",
    job_queue=batch_job_queue_name,
)