AWS 数据库迁移服务 (DMS)¶
AWS 数据库迁移服务 (AWS DMS) 是一项 Web 服务,可用于将您的数据库中的数据从本地、Amazon Relational Database Service (Amazon RDS) 数据库实例或 Amazon Elastic Compute Cloud (Amazon EC2) 实例上的数据库迁移到 AWS 服务上的数据库。这些服务可以包括 Amazon RDS 上的数据库或 Amazon EC2 实例上的数据库。您还可以将数据库从 AWS 服务迁移到本地数据库。您可以在使用相同数据库引擎的源终端节点和目标终端节点之间进行迁移,例如从 Oracle 数据库迁移到 Oracle 数据库。您也可以在使用不同数据库引擎的源终端节点和目标终端节点之间进行迁移,例如从 Oracle 数据库迁移到 PostgreSQL 数据库。
先决条件任务¶
要使用这些操作符,您必须执行以下几项操作
通过 pip 安装 API 库。
pip install 'apache-airflow[amazon]'详细信息请参考 Airflow® 的安装
设置连接.
通用参数¶
- aws_conn_id
引用 Amazon Web Services 连接 ID。如果此参数设置为
None
,则使用默认的 boto3 行为,不进行连接查找。否则,使用存储在连接中的凭据。默认值:aws_default
- region_name
AWS 区域名称。如果此参数设置为
None
或省略,则将使用 AWS 连接额外参数 中的 region_name。否则,使用指定的值而不是连接值。默认值:None
- verify
是否验证 SSL 证书。
False
- 不验证 SSL 证书。path/to/cert/bundle.pem - 要使用的 CA 证书包的文件名。如果想使用与 botocore 使用的不同的 CA 证书包,可以指定此参数。
如果此参数设置为
None
或省略,则将使用 AWS 连接额外参数 中的 verify。否则,使用指定的值而不是连接值。默认值:None
- botocore_config
提供的字典用于构造 botocore.config.Config。此配置可用于配置 避免节流异常、超时等。
{ "signature_version": "unsigned", "s3": { "us_east_1_regional_endpoint": True, }, "retries": { "mode": "standard", "max_attempts": 10, }, "connect_timeout": 300, "read_timeout": 300, "tcp_keepalive": True, }
如果此参数设置为
None
或省略,则将使用 AWS 连接额外参数 中的 config_kwargs。否则,使用指定的值而不是连接值。默认值:None
注意
指定一个空字典
{}
,将覆盖 botocore.config.Config 的连接配置
操作符¶
创建复制任务¶
要创建复制任务,您可以使用 DmsCreateTaskOperator
。
tests/system/amazon/aws/example_dms.py
create_task = DmsCreateTaskOperator(
task_id="create_task",
replication_task_id=dms_replication_task_id,
source_endpoint_arn=create_assets["source_endpoint_arn"],
target_endpoint_arn=create_assets["target_endpoint_arn"],
replication_instance_arn=create_assets["replication_instance_arn"],
table_mappings=table_mappings,
)
启动复制任务¶
要启动复制任务,您可以使用 DmsStartTaskOperator
。
tests/system/amazon/aws/example_dms.py
start_task = DmsStartTaskOperator(
task_id="start_task",
replication_task_arn=task_arn,
)
获取复制任务的详细信息¶
要检索复制任务列表的详细信息,您可以使用 DmsDescribeTasksOperator
。
tests/system/amazon/aws/example_dms.py
describe_tasks = DmsDescribeTasksOperator(
task_id="describe_tasks",
describe_tasks_kwargs={
"Filters": [
{
"Name": "replication-instance-arn",
"Values": [create_assets["replication_instance_arn"]],
}
]
},
do_xcom_push=False,
)
停止复制任务¶
要停止复制任务,您可以使用 DmsStopTaskOperator
。
tests/system/amazon/aws/example_dms.py
stop_task = DmsStopTaskOperator(
task_id="stop_task",
replication_task_arn=task_arn,
)
删除复制任务¶
要删除复制任务,您可以使用 DmsDeleteTaskOperator
。
tests/system/amazon/aws/example_dms.py
delete_task = DmsDeleteTaskOperator(
task_id="delete_task",
replication_task_arn=task_arn,
)
创建无服务器复制配置¶
要创建无服务器复制配置,请使用 DmsCreateReplicationConfigOperator
。
tests/system/amazon/aws/example_dms_serverless.py
create_replication_config = DmsCreateReplicationConfigOperator(
task_id="create_replication_config",
replication_config_id=replication_id,
source_endpoint_arn=create_assets["source_endpoint_arn"],
target_endpoint_arn=create_assets["target_endpoint_arn"],
compute_config={
"MaxCapacityUnits": 4,
"MinCapacityUnits": 1,
"MultiAZ": False,
"ReplicationSubnetGroupId": "default",
},
replication_type="full-load",
table_mappings=json.dumps(table_mappings),
trigger_rule=TriggerRule.ALL_SUCCESS,
)
描述无服务器复制配置¶
要描述无服务器复制配置,请使用 DmsDescribeReplicationConfigsOperator
。
tests/system/amazon/aws/example_dms_serverless.py
describe_replication_configs = DmsDescribeReplicationConfigsOperator(
task_id="describe_replication_configs",
trigger_rule=TriggerRule.ALL_SUCCESS,
)
启动无服务器复制¶
要启动无服务器复制,请使用 DmsStartReplicationOperator
。
tests/system/amazon/aws/example_dms_serverless.py
replicate = DmsStartReplicationOperator(
task_id="replicate",
replication_config_arn="{{ task_instance.xcom_pull(task_ids='create_replication_config', key='return_value') }}",
replication_start_type="start-replication",
wait_for_completion=True,
waiter_delay=60,
waiter_max_attempts=200,
trigger_rule=TriggerRule.ALL_SUCCESS,
deferrable=False,
)
停止无服务器复制¶
要停止无服务器复制,请使用 DmsStopReplicationOperator
。
tests/system/amazon/aws/example_dms_serverless.py
stop_relication = DmsStopReplicationOperator(
task_id="stop_replication",
replication_config_arn="{{ task_instance.xcom_pull(task_ids='create_replication_config', key='return_value') }}",
wait_for_completion=True,
waiter_delay=120,
waiter_max_attempts=200,
trigger_rule=TriggerRule.ALL_SUCCESS,
deferrable=False,
)
获取无服务器复制的状态¶
要获取无服务器复制的状态,请使用 DmsDescribeReplicationsOperator
。
tests/system/amazon/aws/example_dms_serverless.py
describe_replications = DmsDescribeReplicationsOperator(
task_id="describe_replications",
trigger_rule=TriggerRule.ALL_SUCCESS,
)
删除无服务器复制配置¶
要删除无服务器复制配置,请使用 DmsDeleteReplicationConfigOperator
。
tests/system/amazon/aws/example_dms_serverless.py
delete_replication_config = DmsDeleteReplicationConfigOperator(
task_id="delete_replication_config",
wait_for_completion=True,
waiter_delay=60,
waiter_max_attempts=200,
deferrable=False,
replication_config_arn="{{ task_instance.xcom_pull(task_ids='create_replication_config', key='return_value') }}",
trigger_rule=TriggerRule.ALL_DONE,
)
传感器¶
等待复制任务完成¶
要检查复制任务的状态,直到它完成,您可以使用 DmsTaskCompletedSensor
。
tests/system/amazon/aws/example_dms.py
await_task_stop = DmsTaskCompletedSensor(
task_id="await_task_stop",
replication_task_arn=task_arn,
)