Amazon关系型数据库服务 (RDS)¶
Amazon关系型数据库服务 (Amazon RDS) 是一种 Web 服务,可让您更轻松地在云中设置、操作和扩展关系型数据库。它为行业标准关系型数据库提供经济高效、可调整大小的容量,并管理常见的数据库管理任务。
先决任务¶
要使用这些操作符,您必须执行以下几步
通过 pip 安装 API 库。
pip install 'apache-airflow[amazon]'详细信息请参阅 Airflow® 安装
设置连接.
通用参数¶
- aws_conn_id
- 引用 Amazon Web Services 连接 ID。如果此参数设置为 - None,则使用默认的 boto3 行为而不进行连接查找。否则使用存储在连接中的凭据。默认值:- aws_default
- region_name
- AWS 区域名称。如果此参数设置为 - None或省略,则使用 AWS 连接额外参数 中的 region_name。否则使用指定的值而不是连接值。默认值:- None
- verify
- 是否验证 SSL 证书。 - False- 不验证 SSL 证书。
- path/to/cert/bundle.pem - 要使用的 CA 证书包的文件名。如果要使用与 botocore 所用不同的 CA 证书包,可以指定此参数。 
 - 如果此参数设置为 - None或省略,则使用 AWS 连接额外参数 中的 verify。否则使用指定的值而不是连接值。默认值:- None
- botocore_config
- 提供的字典用于构造一个 botocore.config.Config。此配置可用于配置 避免限制异常、超时等。 示例,有关参数的更多详情,请参阅 botocore.config.Config¶- { "signature_version": "unsigned", "s3": { "us_east_1_regional_endpoint": True, }, "retries": { "mode": "standard", "max_attempts": 10, }, "connect_timeout": 300, "read_timeout": 300, "tcp_keepalive": True, } - 如果此参数设置为 - None或省略,则使用 AWS 连接额外参数 中的 config_kwargs。否则使用指定的值而不是连接值。默认值:- None- 注意 - 指定一个空字典 - {}将覆盖 botocore.config.Config 的连接配置
操作符¶
创建数据库快照¶
要创建 Amazon RDS 数据库实例或集群的快照,可以使用 RDSCreateDBSnapshotOperator。源数据库实例必须处于 available 或 storage-optimization 状态。
tests/system/amazon/aws/example_rds_snapshot.py
create_snapshot = RdsCreateDbSnapshotOperator(
    task_id="create_snapshot",
    db_type="instance",
    db_identifier=rds_instance_name,
    db_snapshot_identifier=rds_snapshot_name,
)
复制数据库快照¶
要复制 Amazon RDS 数据库实例或集群的快照,可以使用 RDSCopyDBSnapshotOperator。源数据库快照必须处于 available 状态。
tests/system/amazon/aws/example_rds_snapshot.py
copy_snapshot = RdsCopyDbSnapshotOperator(
    task_id="copy_snapshot",
    db_type="instance",
    source_db_snapshot_identifier=rds_snapshot_name,
    target_db_snapshot_identifier=rds_snapshot_copy_name,
)
删除数据库快照¶
要删除 Amazon RDS 数据库实例或集群的快照,可以使用 RDSDeleteDBSnapshotOperator。数据库快照必须处于 available 状态才能被删除。
tests/system/amazon/aws/example_rds_snapshot.py
delete_snapshot = RdsDeleteDbSnapshotOperator(
    task_id="delete_snapshot",
    db_type="instance",
    db_snapshot_identifier=rds_snapshot_name,
)
将 Amazon RDS 快照导出到 Amazon S3¶
要将 Amazon RDS 快照导出到 Amazon S3,可以使用 RDSStartExportTaskOperator。提供的 IAM 角色必须具有访问 S3 存储桶的权限。
tests/system/amazon/aws/example_rds_export.py
start_export = RdsStartExportTaskOperator(
    task_id="start_export",
    export_task_identifier=rds_export_task_id,
    source_arn=snapshot_arn,
    s3_bucket_name=bucket_name,
    s3_prefix="rds-test",
    iam_role_arn=test_context[ROLE_ARN_KEY],
    kms_key_id=test_context[KMS_KEY_ID_KEY],
)
取消 Amazon RDS 导出任务¶
要取消到 S3 的 Amazon RDS 导出任务,可以使用 RDSCancelExportTaskOperator。已写入 S3 存储桶的任何数据都不会被删除。
tests/system/amazon/aws/example_rds_export.py
cancel_export = RdsCancelExportTaskOperator(
    task_id="cancel_export",
    export_task_identifier=rds_export_task_id,
)
订阅 Amazon RDS 事件通知¶
要创建 Amazon RDS 事件订阅,可以使用 RDSCreateEventSubscriptionOperator。此操作需要 Amazon SNS 主题的 Amazon Resource Name (ARN)。Amazon RDS 事件通知仅适用于未加密的 SNS 主题。如果指定加密的 SNS 主题,则不会为该主题发送事件通知。
tests/system/amazon/aws/example_rds_event.py
create_subscription = RdsCreateEventSubscriptionOperator(
    task_id="create_subscription",
    subscription_name=rds_subscription_name,
    sns_topic_arn=sns_topic,
    source_type="db-instance",
    source_ids=[rds_instance_name],
    event_categories=["availability"],
)
取消订阅 Amazon RDS 事件通知¶
要删除 Amazon RDS 事件订阅,可以使用 RDSDeleteEventSubscriptionOperator。
tests/system/amazon/aws/example_rds_event.py
delete_subscription = RdsDeleteEventSubscriptionOperator(
    task_id="delete_subscription",
    subscription_name=rds_subscription_name,
)
创建数据库实例¶
要创建 AWS 数据库实例,可以使用 RdsCreateDbInstanceOperator。您还可以通过将 deferrable 参数设置为 True 来以可延迟模式运行此操作符。
tests/system/amazon/aws/example_rds_instance.py
create_db_instance = RdsCreateDbInstanceOperator(
    task_id="create_db_instance",
    db_instance_identifier=rds_db_identifier,
    db_instance_class="db.t4g.micro",
    engine="postgres",
    rds_kwargs={
        "MasterUsername": RDS_USERNAME,
        "MasterUserPassword": RDS_PASSWORD,
        "AllocatedStorage": 20,
        "PubliclyAccessible": False,
    },
)
删除数据库实例¶
要删除 AWS 数据库实例,可以使用 RDSDeleteDbInstanceOperator。您还可以通过将 deferrable 参数设置为 True 来以可延迟模式运行此操作符。
tests/system/amazon/aws/example_rds_instance.py
delete_db_instance = RdsDeleteDbInstanceOperator(
    task_id="delete_db_instance",
    db_instance_identifier=rds_db_identifier,
    rds_kwargs={
        "SkipFinalSnapshot": True,
    },
)
启动数据库实例或集群¶
要启动 Amazon RDS 数据库实例或集群,可以使用 RdsStartDbOperator。
tests/system/amazon/aws/example_rds_instance.py
start_db_instance = RdsStartDbOperator(
    task_id="start_db_instance",
    db_identifier=rds_db_identifier,
)
停止数据库实例或集群¶
要停止 Amazon RDS 数据库实例或集群,可以使用 RdsStopDbOperator。
tests/system/amazon/aws/example_rds_instance.py
stop_db_instance = RdsStopDbOperator(
    task_id="stop_db_instance",
    db_identifier=rds_db_identifier,
)
传感器¶
等待 Amazon RDS 实例或集群状态¶
要等待 Amazon RDS 实例或集群达到特定状态,可以使用 RdsDbSensor。默认情况下,传感器等待数据库实例达到 available 状态。
tests/system/amazon/aws/example_rds_instance.py
await_db_instance = RdsDbSensor(
    task_id="await_db_instance",
    db_identifier=rds_db_identifier,
)
等待 Amazon RDS 快照状态¶
要等待具有特定状态的 Amazon RDS 快照,可以使用 RdsSnapshotExistenceSensor。默认情况下,传感器等待状态为 available 的快照存在。
tests/system/amazon/aws/example_rds_snapshot.py
snapshot_sensor = RdsSnapshotExistenceSensor(
    task_id="snapshot_sensor",
    db_type="instance",
    db_snapshot_identifier=rds_snapshot_name,
    target_statuses=["available"],
)
等待 Amazon RDS 导出任务状态¶
要等待具有特定状态的 Amazon RDS 快照导出任务,可以使用 RdsExportTaskExistenceSensor。默认情况下,传感器等待状态为 available 的快照存在。
tests/system/amazon/aws/example_rds_export.py
export_sensor = RdsExportTaskExistenceSensor(
    task_id="export_sensor",
    export_task_identifier=rds_export_task_id,
    target_statuses=["canceled"],
)