Amazon Bedrock¶
Amazon Bedrock 是一项完全托管的服务,通过单个 API 提供来自领先 AI 公司(如 AI21 Labs、Anthropic、Cohere、Meta、Mistral AI、Stability AI 和 Amazon)的高性能基础模型 (FM) 选择,以及构建具有安全性、隐私性和负责任 AI 的生成式 AI 应用程序所需的广泛功能。
先决条件任务¶
要使用这些操作器,您必须执行以下操作
通过 pip 安装 API 库。
pip install 'apache-airflow[amazon]'详细信息请参阅 Airflow™ 的安装
设置连接.
通用参数¶
- aws_conn_id
对 Amazon Web Services 连接 ID 的引用。如果此参数设置为
None
,则使用默认的 boto3 行为,不进行连接查找。否则,使用存储在连接中的凭据。默认值:aws_default
- region_name
AWS 区域名称。如果此参数设置为
None
或省略,则将使用 AWS 连接额外参数 中的 region_name。否则,使用指定的值而不是连接值。默认值:None
- verify
是否验证 SSL 证书。
False
- 不验证 SSL 证书。path/to/cert/bundle.pem - 要使用的 CA 证书包的文件名。如果要使用与 botocore 使用的 CA 证书包不同的 CA 证书包,则可以指定此参数。
如果此参数设置为
None
或省略,则将使用 AWS 连接额外参数 中的 verify。否则,使用指定的值而不是连接值。默认值:None
- botocore_config
提供的字典用于构造 botocore.config.Config。此配置可用于配置 避免限制异常、超时等。
{ "signature_version": "unsigned", "s3": { "us_east_1_regional_endpoint": True, }, "retries": { "mode": "standard", "max_attempts": 10, }, "connect_timeout": 300, "read_timeout": 300, "tcp_keepalive": True, }
如果此参数设置为
None
或省略,则将使用 AWS 连接额外参数 中的 config_kwargs。否则,使用指定的值而不是连接值。默认值:None
注意
指定空字典
{}
将覆盖 botocore.config.Config 的连接配置
操作器¶
调用现有的 Amazon Bedrock 模型¶
要调用现有的 Amazon Bedrock 模型,可以使用 BedrockInvokeModelOperator
。
请注意,每个模型系列都有不同的输入和输出格式。下面包含一些示例,但有关不同格式的详细信息,请参阅 基础模型的推理参数
例如,要调用 Meta Llama 模型,您可以使用
invoke_llama_model = BedrockInvokeModelOperator(
task_id="invoke_llama",
model_id=LLAMA_SHORT_MODEL_ID,
input_data={"prompt": PROMPT},
)
要调用 Amazon Titan 模型,您可以使用
invoke_titan_model = BedrockInvokeModelOperator(
task_id="invoke_titan",
model_id=TITAN_SHORT_MODEL_ID,
input_data={"inputText": PROMPT},
)
要使用 Completions API 调用 Claude V2 模型,您可以使用
invoke_claude_completions = BedrockInvokeModelOperator(
task_id="invoke_claude_completions",
model_id=CLAUDE_MODEL_ID,
input_data={"max_tokens_to_sample": 4000, "prompt": f"\n\nHuman: {PROMPT}\n\nAssistant:"},
)
自定义现有的 Amazon Bedrock 模型¶
要创建微调作业以自定义基础模型,可以使用 BedrockCustomizeModelOperator
。
模型自定义作业是异步的,完成时间取决于基础模型和训练/验证数据大小。要监控作业状态,可以使用“model_customization_job_complete”Waiter、BedrockCustomizeModelCompletedSensor
传感器或 BedrockCustomizeModelCompletedTrigger
触发器。
customize_model = BedrockCustomizeModelOperator(
task_id="customize_model",
job_name=custom_model_job_name,
custom_model_name=custom_model_name,
role_arn=test_context[ROLE_ARN_KEY],
base_model_id=f"{model_arn_prefix}{TITAN_SHORT_MODEL_ID}",
hyperparameters=HYPERPARAMETERS,
training_data_uri=training_data_uri,
output_data_uri=f"s3://{bucket_name}/myOutputData",
)
为现有的 Amazon Bedrock 模型配置吞吐量¶
要为基础模型或微调模型创建具有专用容量的配置吞吐量,可以使用 BedrockCreateProvisionedModelThroughputOperator
。
配置吞吐量作业是异步的。要监控作业状态,可以使用“provisioned_model_throughput_complete”Waiter、BedrockProvisionModelThroughputCompletedSensor
传感器或 BedrockProvisionModelThroughputCompletedSensorTrigger
触发器。
provision_throughput = BedrockCreateProvisionedModelThroughputOperator(
task_id="provision_throughput",
model_units=1,
provisioned_model_name=provisioned_model_name,
model_id=f"{model_arn_prefix}{TITAN_MODEL_ID}",
)
创建 Amazon Bedrock 知识库¶
要创建 Amazon Bedrock 知识库,可以使用 BedrockCreateKnowledgeBaseOperator
。
有关哪些模型支持将数据嵌入到向量存储中的更多信息,请参阅 https://docs.aws.amazon.com/bedrock/latest/userguide/knowledge-base-supported.html
create_knowledge_base = BedrockCreateKnowledgeBaseOperator(
task_id="create_knowledge_base",
name=knowledge_base_name,
embedding_model_arn=f"arn:aws:bedrock:{region_name}::foundation-model/{TITAN_MODEL_ID}",
role_arn=test_context[ROLE_ARN_KEY],
storage_config={
"type": "OPENSEARCH_SERVERLESS",
"opensearchServerlessConfiguration": {
"collectionArn": get_collection_arn(collection),
"vectorIndexName": index_name,
"fieldMapping": {
"vectorField": "vector",
"textField": "text",
"metadataField": "text-metadata",
},
},
},
)
删除 Amazon Bedrock 知识库¶
删除知识库是一个简单的 boto API 调用,可以在 TaskFlow 任务中完成,如下例所示。
@task(trigger_rule=TriggerRule.ALL_DONE)
def delete_knowledge_base(knowledge_base_id: str):
"""
Delete the Amazon Bedrock knowledge base created earlier.
.. seealso::
For more information on how to use this sensor, take a look at the guide:
:ref:`howto/operator:BedrockDeleteKnowledgeBase`
:param knowledge_base_id: The unique identifier of the knowledge base to delete.
"""
log.info("Deleting Knowledge Base %s.", knowledge_base_id)
bedrock_agent_client.delete_knowledge_base(knowledgeBaseId=knowledge_base_id)
创建 Amazon Bedrock 数据源¶
要创建 Amazon Bedrock 数据源,可以使用 BedrockCreateDataSourceOperator
。
create_data_source = BedrockCreateDataSourceOperator(
task_id="create_data_source",
knowledge_base_id=create_knowledge_base.output,
name=data_source_name,
bucket_name=bucket_name,
)
删除 Amazon Bedrock 数据源¶
删除数据源是一个简单的 boto API 调用,可以在 TaskFlow 任务中完成,如下例所示。
@task(trigger_rule=TriggerRule.ALL_DONE)
def delete_data_source(knowledge_base_id: str, data_source_id: str):
"""
Delete the Amazon Bedrock data source created earlier.
.. seealso::
For more information on how to use this sensor, take a look at the guide:
:ref:`howto_operator:BedrockDeleteDataSource`
:param knowledge_base_id: The unique identifier of the knowledge base which the data source is attached to.
:param data_source_id: The unique identifier of the data source to delete.
"""
log.info("Deleting data source %s from Knowledge Base %s.", data_source_id, knowledge_base_id)
bedrock_agent_client.delete_data_source(dataSourceId=data_source_id, knowledgeBaseId=knowledge_base_id)
将数据提取到 Amazon Bedrock 数据源中¶
要将 Amazon S3 存储桶中的数据添加到 Amazon Bedrock 数据源中,可以使用 BedrockIngestDataOperator
。
ingest_data = BedrockIngestDataOperator(
task_id="ingest_data",
knowledge_base_id=create_knowledge_base.output,
data_source_id=create_data_source.output,
)
Amazon Bedrock 检索¶
要查询知识库,可以使用 BedrockRetrieveOperator
。
响应将仅包含与查询相关的来源的引用。如果要通过 LLM 传递结果以生成文本响应,请参阅 BedrockRaGOperator
有关支持从知识库检索信息的模型的更多信息,请参阅 https://docs.aws.amazon.com/bedrock/latest/userguide/knowledge-base-supported.html
retrieve = BedrockRetrieveOperator(
task_id="retrieve",
knowledge_base_id=create_knowledge_base.output,
retrieval_query="Who was the CEO of Amazon in 1997?",
)
Amazon Bedrock 检索和生成 (RaG)¶
要查询知识库或外部资源并根据检索到的结果生成文本响应,您可以使用 BedrockRaGOperator
。
响应将包含与查询相关的来源引用以及生成的文本回复。有关支持从知识库检索信息的模型的更多信息,请参阅 https://docs.aws.amazon.com/bedrock/latest/userguide/knowledge-base-supported.html
注意:boto 1.34.90 中添加了对“外部资源”的支持
使用 Amazon Bedrock 知识库的示例
knowledge_base_rag = BedrockRaGOperator(
task_id="knowledge_base_rag",
input="Who was the CEO of Amazon on 2022?",
source_type="KNOWLEDGE_BASE",
model_arn=f"arn:aws:bedrock:{region_name}::foundation-model/{CLAUDE_MODEL_ID}",
knowledge_base_id=create_knowledge_base.output,
)
使用 Amazon S3 存储桶中 PDF 文件的示例
external_sources_rag = BedrockRaGOperator(
task_id="external_sources_rag",
input="Who was the CEO of Amazon in 2022?",
source_type="EXTERNAL_SOURCES",
model_arn=f"arn:aws:bedrock:{region_name}::foundation-model/anthropic.claude-3-sonnet-20240229-v1:0",
sources=[
{
"sourceType": "S3",
"s3Location": {"uri": f"s3://{bucket_name}/AMZN-2022-Shareholder-Letter.pdf"},
}
],
)
传感器¶
等待 Amazon Bedrock 自定义模型作业¶
要等待 Amazon Bedrock 自定义模型作业的状态,直到它达到最终状态,您可以使用 BedrockCustomizeModelCompletedSensor
await_custom_model_job = BedrockCustomizeModelCompletedSensor(
task_id="await_custom_model_job",
job_name=custom_model_job_name,
)
等待 Amazon Bedrock 配置模型吞吐量作业¶
要等待 Amazon Bedrock 配置模型吞吐量作业的状态,直到它达到最终状态,您可以使用 BedrockProvisionModelThroughputCompletedSensor
await_provision_throughput = BedrockProvisionModelThroughputCompletedSensor(
task_id="await_provision_throughput",
model_id=provision_throughput.output,
)
等待 Amazon Bedrock 知识库¶
要等待 Amazon Bedrock 知识库的状态,直到它达到最终状态,您可以使用 BedrockKnowledgeBaseActiveSensor
await_knowledge_base = BedrockKnowledgeBaseActiveSensor(
task_id="await_knowledge_base", knowledge_base_id=create_knowledge_base.output
)
等待 Amazon Bedrock 提取作业完成¶
要等待 Amazon Bedrock 数据提取作业的状态,直到它达到最终状态,您可以使用 BedrockIngestionJobSensor
await_ingest = BedrockIngestionJobSensor(
task_id="await_ingest",
knowledge_base_id=create_knowledge_base.output,
data_source_id=create_data_source.output,
ingestion_job_id=ingest_data.output,
)