tests.system.amazon.aws.utils

子模块

属性

ENV_ID_ENVIRON_KEY

ENV_ID_KEY

DEFAULT_ENV_ID_PREFIX

DEFAULT_ENV_ID_LEN

DEFAULT_ENV_ID

PURGE_LOGS_INTERVAL_PERIOD

TEST_FILE_IDENTIFIERS

INVALID_ENV_ID_MSG

LOWERCASE_ENV_ID_MSG

NO_VALUE_MSG

log

Variable

存储要为 AWS 系统测试获取的变量的元数据。

SystemTestContextBuilder

此类最终构建一个 TaskFlow 任务,该任务在

函数

fetch_variable(key[, default_value, test_name, optional])

给定参数名称:首先检查现有的环境变量,

set_env_id()

检索或生成环境 ID,验证其是否合适,

all_tasks_passed(ti)

prune_logs(logs[, force_delete, retry, retry_times, ...])

如果此 dagrun 中的所有任务都已成功,则删除相关的日志。

split_string(string)

包内容

tests.system.amazon.aws.utils.ENV_ID_ENVIRON_KEY: str = 'SYSTEM_TESTS_ENV_ID'[source]
tests.system.amazon.aws.utils.ENV_ID_KEY: str = 'ENV_ID'[source]
tests.system.amazon.aws.utils.DEFAULT_ENV_ID_PREFIX: str = 'env'[source]
tests.system.amazon.aws.utils.DEFAULT_ENV_ID_LEN: int = 8[source]
tests.system.amazon.aws.utils.DEFAULT_ENV_ID: str[source]
tests.system.amazon.aws.utils.PURGE_LOGS_INTERVAL_PERIOD = 5[source]
tests.system.amazon.aws.utils.TEST_FILE_IDENTIFIERS: list[str] = ['example_', 'test_'][source]
tests.system.amazon.aws.utils.INVALID_ENV_ID_MSG: str = 'To maximize compatibility, the SYSTEM_TESTS_ENV_ID must be an alphanumeric string which starts...[source]
tests.system.amazon.aws.utils.LOWERCASE_ENV_ID_MSG: str = 'The provided Environment ID contains uppercase letters and will be converted to lowercase for...[source]
tests.system.amazon.aws.utils.NO_VALUE_MSG: str = 'No Value Found: Variable {key} could not be found, and no default value was provided.'[source]
tests.system.amazon.aws.utils.log[source]
class tests.system.amazon.aws.utils.Variable(name, to_split=False, delimiter=None, test_name=None, optional=False)[source]

存储要为 AWS 系统测试获取的变量的元数据。

参数:
  • name (str) – 要获取的变量的名称。

  • to_split (bool) – 如果为 True,则输入是字符串格式的列表,需要进行拆分。默认为 False。

  • delimiter (str | None) – 如果 to_split 为 true,这将用于拆分字符串。默认为 ‘,’。

  • test_name (str | None) – 变量关联的系统测试名称。

name[source]
test_name = None[source]
to_split = False[source]
optional = False[source]
get_value()[source]
set_default(default)[source]
class tests.system.amazon.aws.utils.SystemTestContextBuilder[source]

此类最终构建一个 TaskFlow 任务,该任务在运行时(任务执行时)运行。此任务生成并存储测试 ENV_ID 以及任何请求的外部资源(例如 IAM 角色、VPC 等)。

variables[source]
env_id = ''[source]
test_name[source]
add_variable(variable_name, split_string=False, delimiter=None, optional=False, **kwargs)[source]

注册要从环境或云参数存储中获取的变量

build()[source]

构建并返回一个 TaskFlow 任务,该任务将创建 env_id 并获取请求的变量。将所有内容存储在 xcom 中供下游任务使用。

tests.system.amazon.aws.utils.fetch_variable(key, default_value=None, test_name=None, optional=False)[source]

给定参数名称:首先检查现有的环境变量,然后检查 SSM 中的值。如果两者都不可用,则回退到可选的默认值。

参数:
  • key (str) – 要获取值的参数名称。

  • default_value (str | None) – 如果找不到值,则使用的默认值。

  • test_name (str | None) – 系统测试名称。

  • optional (bool) – 变量是否可选。如果为 True,则在变量不存在时不会引发 ValueError。

返回:

参数的值。

返回类型:

str | None

tests.system.amazon.aws.utils.set_env_id()[source]

检索或生成环境 ID,验证其是否合适,将其作为环境变量导出并返回。

如果已生成环境 ID,则使用该 ID。否则,尝试获取并将其作为环境变量导出。如果没有可获取的,则生成一个并将其作为环境变量导出。

返回:

有效的系统测试环境 ID。

返回类型:

str

tests.system.amazon.aws.utils.all_tasks_passed(ti)[source]
tests.system.amazon.aws.utils.prune_logs(logs, force_delete=False, retry=False, retry_times=3, delete_log_groups=True, ti=None)[source]

如果此 dagrun 中的所有任务都已成功,则删除相关的日志。否则,为日志附加保留策略。这使得日志可以用于故障排除,同时也确保它们不会无限期地累积。

参数:
  • logs (list[tuple[str, str | None]]) – 要删除的 log_group/stream_prefix 元组列表。

  • force_delete (bool) – 是否在删除前检查日志组内的日志流。如果为 True,则删除日志组及其内部所有日志流。

  • retry (bool) – 如果未找到日志组/流是否重试。在某些情况下,日志组/流会在主要资源创建几秒后创建。默认情况下,它会重试 3 次,每次间隔 5 秒。

  • retry_times (int) – 重试次数。

  • delete_log_groups (bool) – 如果日志组为空,是否删除它们。force_delete 参数会覆盖此设置。

  • ti – 用于检查任务的状态。这从 DAG 的上下文获取,无需手动传递。

tests.system.amazon.aws.utils.split_string(string)[source]

此条目有帮助吗?