airflow.providers.amazon.aws.hooks.batch_client
¶
用于 AWS Batch 服务的客户端。
另请参阅
模块内容¶
类¶
|
|
与 AWS Batch 交互。 |
- class airflow.providers.amazon.aws.hooks.batch_client.BatchProtocol[源代码]¶
基类:
airflow.typing_compat.Protocol
boto3.client('batch') -> botocore.client.Batch
的结构化协议。这用于
BatchClient.client()
的类型提示;它仅涵盖所需客户端方法的子集。另请参阅
- get_waiter(waiterName)[源代码]¶
获取 AWS Batch 服务等待器。
- 参数
waiterName (str) – 等待器的名称。该名称应与等待器模型文件中的键名称(通常是 CamelCasing)的名称(包括大小写)匹配。
- 返回
命名 AWS Batch 服务的等待器对象
- 返回类型
botocore.waiter.Waiter
注意
AWS Batch 可能没有任何等待器(直到 botocore PR-1307 发布)。
import boto3 boto3.client("batch").waiter_names == []
- submit_job(jobName, jobQueue, jobDefinition, arrayProperties, parameters, containerOverrides, ecsPropertiesOverride, eksPropertiesOverride, tags)[源代码]¶
提交一个 Batch 作业。
- 参数
jobName (str) – AWS Batch 作业的名称
jobQueue (str) – AWS Batch 上的队列名称
jobDefinition (str) – AWS Batch 上的作业定义名称
arrayProperties (dict) – boto3 将接收的相同参数
parameters (dict) – boto3 将接收的相同参数
containerOverrides (dict) – boto3 将接收的相同参数
ecsPropertiesOverride (dict) – boto3 将接收的相同参数
eksPropertiesOverride (dict) – boto3 将接收的相同参数
tags (dict) – boto3 将接收的相同参数
- 返回
API 响应
- 返回类型
- class airflow.providers.amazon.aws.hooks.batch_client.BatchClientHook(*args, max_retries=None, status_retries=None, **kwargs)[源代码]¶
基类:
airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook
与 AWS Batch 交互。
提供对
boto3.client("batch")
的厚封装。- 参数
注意
一些方法使用默认的随机延迟来检查或轮询作业状态,例如
random.uniform(DEFAULT_DELAY_MIN, DEFAULT_DELAY_MAX)
。当许多并发任务请求作业描述时,使用随机间隔有助于避免 AWS API 限制。要修改用于检查 Batch 作业状态时使用的随机延迟范围的全局默认值,请修改这些默认值,例如:.. code-block
BatchClient.DEFAULT_DELAY_MIN = 0 BatchClient.DEFAULT_DELAY_MAX = 5
当使用显式延迟值时,会将 1 秒的随机抖动应用于延迟(例如,0 秒的延迟将是
random.uniform(0, 1)
延迟)。通常建议将随机抖动添加到 API 请求中。为此提供了一个方便的方法,例如,要获得 10 秒 +/- 5 秒的随机延迟:delay = BatchClient.add_jitter(10, width=5, minima=0)
可以指定其他参数(例如
aws_conn_id
),并将这些参数传递给底层的 AwsBaseHook。另请参阅
- property client: BatchProtocol | botocore.client.BaseClient[源代码]¶
用于 Batch 服务的 AWS API 客户端。
- 返回
用于
.region_name
的 boto3 “batch” 客户端- 返回类型
BatchProtocol | botocore.client.BaseClient
- check_job_success(job_id)[source]¶
检查 Batch 作业的最终状态。
如果作业“SUCCEEDED”,则返回 True,否则引发 AirflowException。
- 参数
job_id (str) – Batch 作业 ID
- Raises
AirflowException
- wait_for_job(job_id, delay=None, get_batch_log_fetcher=None)[source]¶
等待 Batch 作业完成。
- 参数
job_id (str) – Batch 作业 ID
delay (int | float | None) – 轮询作业状态前的延迟
:param get_batch_log_fetcher : 返回 batch_log_fetcher 的方法
- Raises
AirflowException
- poll_for_job_running(job_id, delay=None)[source]¶
轮询作业运行状态。
指示作业正在运行或已完成的状态为:“RUNNING”|“SUCCEEDED”|“FAILED”。
因此,此方法将等待的状态转换包括:“SUBMITTED”>“PENDING”>“RUNNABLE”>“STARTING”>“RUNNING”|“SUCCEEDED”|“FAILED”
包括已完成状态选项,以应对状态变化太快而轮询无法检测到从“STARTING”快速移动到“RUNNING”再到完成(通常是失败)的“RUNNING”状态的情况。
- 参数
job_id (str) – Batch 作业 ID
delay (int | float | None) – 轮询作业状态前的延迟
- Raises
AirflowException
- poll_for_job_complete(job_id, delay=None)[source]¶
轮询作业完成状态。
指示作业完成的状态为:“SUCCEEDED”|“FAILED”。
因此,此方法将等待的状态转换包括:“SUBMITTED”>“PENDING”>“RUNNABLE”>“STARTING”>“RUNNING”>“SUCCEEDED”|“FAILED”
- 参数
job_id (str) – Batch 作业 ID
delay (int | float | None) – 轮询作业状态前的延迟
- Raises
AirflowException
- poll_job_status(job_id, match_status)[source]¶
使用指数退避策略(具有 max_retries)轮询作业状态。
- 参数
job_id (str) – Batch 作业 ID
match_status (list[str]) – 要匹配的作业状态列表;Batch 作业状态包括:“SUBMITTED”|“PENDING”|“RUNNABLE”|“STARTING”|“RUNNING”|“SUCCEEDED”|“FAILED”
- Raises
AirflowException
- get_job_description(job_id)[source]¶
获取作业描述(使用 status_retries)。
- 参数
job_id (str) – Batch 作业 ID
- 返回
describe jobs 的 API 响应
- Raises
AirflowException
- 返回类型
- static parse_job_description(job_id, response)[source]¶
解析作业描述以提取 job_id 的描述。
- 参数
job_id (str) – Batch 作业 ID
response (dict) – describe jobs 的 API 响应
- 返回
describe job_id 的 API 响应
- Raises
AirflowException
- 返回类型
- get_job_all_awslogs_info(job_id)[source]¶
解析作业描述以提取 AWS CloudWatch 信息。
- 参数
job_id (str) – AWS Batch 作业 ID
- static add_jitter(delay, width=1, minima=0)[source]¶
使用延迟 +/- 宽度来产生随机抖动。
在状态轮询中添加抖动可以帮助避免在 Airflow 任务中高并发监控 Batch 作业时触发 AWS Batch API 限制。
- 参数
delay (int | float) – 暂停的秒数;假设延迟为正数。
width (int | float) – 用于随机抖动的延迟 +/- 宽度;假设宽度为正数。
minima (int | float) – 允许的最小延迟;假设最小延迟为非负数。
- 返回
均匀分布 (delay - width, delay + width) 的抖动,且为非负数。
- 返回类型
- static delay(delay=None)[源代码]¶
暂停执行
delay
秒。- 参数
delay (int | float | None) – 使用
time.sleep(delay)
暂停执行的延迟时间;延迟会应用 1 秒的随机抖动。
注意
此方法使用默认的随机延迟,即
random.uniform(DEFAULT_DELAY_MIN, DEFAULT_DELAY_MAX)
;使用随机间隔有助于在许多并发任务请求作业描述时避免 AWS API 限制。
- static exponential_delay(tries)[源代码]¶
应用带有随机抖动的指数退避延迟。
最大间隔为 10 分钟(随机抖动在 3 到 10 分钟之间)。这在
poll_for_job_status()
方法中使用。行为示例
def exp(tries): max_interval = 600.0 # 10 minutes in seconds delay = 1 + pow(tries * 0.6, 2) delay = min(max_interval, delay) print(delay / 3, delay) for tries in range(10): exp(tries) # 0.33 1.0 # 0.45 1.35 # 0.81 2.44 # 1.41 4.23 # 2.25 6.76 # 3.33 10.00 # 4.65 13.95 # 6.21 18.64 # 8.01 24.04 # 10.05 30.15
另请参阅
- 参数
tries (int) – 尝试次数