Amazon Simple Queue Service (SQS)¶
Amazon Simple Queue Service (SQS) 是一种完全托管的消息队列服务,使您能够解耦和扩展微服务、分布式系统和无服务器应用程序。 SQS 消除了管理和操作面向消息的中间件相关的复杂性和开销,并使开发人员能够专注于差异化的工作。使用 SQS,您可以在任何容量的软件组件之间发送、存储和接收消息,而不会丢失消息或需要其他服务可用。
先决条件任务¶
要使用这些操作符,您必须执行以下几项操作
通过 pip 安装 API 库。
pip install 'apache-airflow[amazon]'详细信息请参阅 Airflow® 的安装
设置连接.
通用参数¶
- aws_conn_id
引用 Amazon Web Services 连接 ID。如果此参数设置为
None
,则使用默认的 boto3 行为,而无需查找连接。否则,请使用存储在连接中的凭据。默认值:aws_default
- region_name
AWS 区域名称。如果此参数设置为
None
或省略,则将使用 AWS 连接额外参数 中的 region_name。 否则,请使用指定的值而不是连接值。默认值:None
- verify
是否验证 SSL 证书。
False
- 不验证 SSL 证书。path/to/cert/bundle.pem - 要使用的 CA 证书捆绑包的文件名。如果要使用与 botocore 使用的不同的 CA 证书捆绑包,则可以指定此参数。
如果此参数设置为
None
或省略,则将使用 AWS 连接额外参数 中的 verify。 否则,请使用指定的值而不是连接值。默认值:None
- botocore_config
提供的字典用于构建 botocore.config.Config。此配置可用于配置避免节流异常、超时等。
{ "signature_version": "unsigned", "s3": { "us_east_1_regional_endpoint": True, }, "retries": { "mode": "standard", "max_attempts": 10, }, "connect_timeout": 300, "read_timeout": 300, "tcp_keepalive": True, }
如果此参数设置为
None
或省略,则将使用 AWS 连接额外参数 中的 config_kwargs。 否则,请使用指定的值而不是连接值。默认值:None
注意
指定一个空字典,
{}
,将覆盖 botocore.config.Config 的连接配置
操作符¶
将消息发布到 Amazon SQS 队列¶
要将消息发布到 Amazon SQS 队列,您可以使用 SqsPublishOperator
在以下示例中,任务 publish_to_queue
将包含任务实例和执行日期的消息发布到默认名称为 Airflow-Example-Queue
的队列。
tests/system/amazon/aws/example_sqs.py
publish_to_queue_1 = SqsPublishOperator(
task_id="publish_to_queue_1",
sqs_queue=sqs_queue,
message_content="{{ task_instance }}-{{ logical_date }}",
)
publish_to_queue_2 = SqsPublishOperator(
task_id="publish_to_queue_2",
sqs_queue=sqs_queue,
message_content="{{ task_instance }}-{{ logical_date }}",
)
传感器¶
从 Amazon SQS 队列读取消息¶
要从 Amazon SQS 队列中读取消息直到耗尽,请使用 SqsSensor
。 此传感器也可以通过将 deferrable
参数设置为 True
在可延迟模式下运行。
tests/system/amazon/aws/example_sqs.py
read_from_queue = SqsSensor(
task_id="read_from_queue",
sqs_queue=sqs_queue,
)
# Retrieve multiple batches of messages from SQS.
# The SQS API only returns a maximum of 10 messages per poll.
read_from_queue_in_batch = SqsSensor(
task_id="read_from_queue_in_batch",
sqs_queue=sqs_queue,
# Get maximum 10 messages each poll
max_messages=10,
# Combine 3 polls before returning results
num_batches=3,
)