airflow.providers.amazon.aws.hooks.datasync

使用 AWS boto3 库与 AWS DataSync 交互。

模块内容

DataSyncHook

与 AWS DataSync 交互。

class airflow.providers.amazon.aws.hooks.datasync.DataSyncHook(wait_interval_seconds=30, *args, **kwargs)[源代码]

基类: airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook

与 AWS DataSync 交互。

提供围绕 boto3.client("datasync") 的厚包装器。

可以指定其他参数(例如 aws_conn_id),并将传递给底层的 AwsBaseHook。

参数

wait_interval_seconds (int) – 检查 TaskExecution 状态的两次连续调用之间等待的时间。默认为 30 秒。

引发

ValueError – 如果 wait_interval_seconds 不在 0 到 15*60 秒之间。

TASK_EXECUTION_INTERMEDIATE_STATES = ('INITIALIZING', 'QUEUED', 'LAUNCHING', 'PREPARING', 'TRANSFERRING', 'VERIFYING')[源代码]
TASK_EXECUTION_FAILURE_STATES = ('ERROR',)[源代码]
TASK_EXECUTION_SUCCESS_STATES = ('SUCCESS',)[源代码]
create_location(location_uri, **create_location_kwargs)[源代码]

创建一个新位置。

参数
  • location_uri (str) – 用于确定位置类型(S3、SMB、NFS、EFS)的位置 URI。

  • create_location_kwargs – 传递给 DataSync.Client.create_location_* 方法。

返回

已创建位置的 LocationArn。

引发

AirflowException – 如果位置类型(来自 location_uri 的前缀)无效。

返回类型

str

get_location_arns(location_uri, case_sensitive=False, ignore_trailing_slash=True)[源代码]

返回与 LocationUri 匹配的所有 LocationArn。

参数
  • location_uri (str) – 要搜索的位置 URI,例如 s3://mybucket/mypath

  • case_sensitive (bool) – 对位置 URI 进行区分大小写的搜索。

  • ignore_trailing_slash (bool) – 匹配时忽略 URI 末尾的 /。

返回

LocationArn 列表。

引发

AirflowBadRequest – 如果 location_uri 为空

返回类型

list[str]

create_task(source_location_arn, destination_location_arn, **create_task_kwargs)[源代码]

在指定的源和目标 LocationArn 之间创建任务。

参数
  • source_location_arn (str) – 源 LocationArn。必须已存在。

  • destination_location_arn (str) – 目标 LocationArn。必须已存在。

  • create_task_kwargs – 传递给 boto.create_task()。请参阅 AWS boto3 datasync 文档。

返回

已创建任务的 TaskArn

返回类型

str

update_task(task_arn, **update_task_kwargs)[源代码]

更新任务。

参数
  • task_arn (str) – 要更新的 TaskArn。

  • update_task_kwargs – 传递给 boto.update_task(),请参阅 AWS boto3 datasync 文档。

delete_task(task_arn)[源代码]

删除任务。

参数

task_arn (str) – 要删除的 TaskArn。

get_task_arns_for_location_arns(source_location_arns, destination_location_arns)[源代码]

返回使用指定的源和目标 LocationArn 的 TaskArn 列表。

参数
  • source_location_arns (list) – 源 LocationArn 列表。

  • destination_location_arns (list) – 目标 LocationArn 列表。

引发

AirflowBadRequest – 如果 source_location_arnsdestination_location_arns 为空。

start_task_execution(task_arn, **kwargs)[源代码]

为指定的 task_arn 启动 TaskExecution。

每个任务最多可以有一个 TaskExecution。其他关键字参数发送到 start_task_execution boto3 方法。

参数

task_arn (str) – TaskArn

返回

TaskExecutionArn

引发
  • ClientError – 如果此 task_arn 已有 TaskExecution 正在忙碌运行。

  • AirflowBadRequest – 如果 task_arn 为空。

返回类型

str

cancel_task_execution(task_execution_arn)[源代码]

取消指定 task_execution_arn 的 TaskExecution。

参数

task_execution_arn (str) – TaskExecutionArn。

引发

AirflowBadRequest – 如果 task_execution_arn 为空。

get_task_description(task_arn)[源代码]

获取指定 task_arn 的描述信息。

参数

task_arn (str) – TaskArn

返回

关于任务的 AWS 元数据。

引发

AirflowBadRequest – 如果 task_arn 为空。

返回类型

dict

describe_task_execution(task_execution_arn)[源代码]

获取指定 task_execution_arn 的描述信息。

参数

task_execution_arn (str) – TaskExecutionArn

返回

关于任务执行的 AWS 元数据。

引发

AirflowBadRequest – 如果 task_execution_arn 为空。

返回类型

dict

get_current_task_execution_arn(task_arn)[源代码]

获取指定 task_arn 的当前 TaskExecutionArn(如果存在)。

参数

task_arn (str) – TaskArn

返回

task_arn 的 CurrentTaskExecutionArn 或 None。

引发

AirflowBadRequest – 如果 task_arn 为空。

返回类型

str | None

wait_for_task_execution(task_execution_arn, max_iterations=60)[源代码]

等待任务执行状态完成(成功/错误)。

task_execution_arn 必须存在,否则将引发 boto3 ClientError。

参数
  • task_execution_arn (str) – TaskExecutionArn

  • max_iterations (int) – 超时前的最大迭代次数。

返回

任务执行的结果。

引发
  • AirflowTaskTimeout – 如果超出最大迭代次数。

  • AirflowBadRequest – 如果 task_execution_arn 为空。

返回类型

bool

此条目是否有帮助?