Google Cloud Memorystore 操作符¶
Cloud Memorystore for Redis 是 Google Cloud 的一项完全托管的 Redis 服务。在 Google Cloud 上运行的应用程序可以通过利用高度可扩展、可用、安全的 Redis 服务来实现极高性能,而无需负担管理复杂的 Redis 部署。
先决条件任务¶
要使用这些操作符,您必须执行以下几项操作
使用 Cloud Console 选择或创建 Cloud Platform 项目。
按照 Google Cloud 文档中的说明,为您的项目启用结算功能。
按照 Cloud Console 文档中的说明,启用 API。
通过 pip 安装 API 库。
pip install 'apache-airflow[google]'有关详细信息,请参阅 安装。
实例¶
操作符使用 Instance
来表示实例。该对象也可以表示为兼容的字典。
这是一个实例的示例
FIRST_INSTANCE = {"tier": Instance.Tier.BASIC, "memory_size_gb": 1}
配置导入/导出存储桶权限¶
必须配置存储桶的权限才能导入和导出数据。要查找您的实例的服务帐户,请运行 CloudMemorystoreCreateInstanceOperator
或 CloudMemorystoreGetInstanceOperator
,并使用 persistenceIamIdentity
下列出的服务帐户。
您可以使用 GCSBucketCreateAclEntryOperator
操作符来设置权限。
set_acl_permission = GCSBucketCreateAclEntryOperator(
task_id="gcs-set-acl-permission",
bucket=BUCKET_NAME,
entity="user-{{ task_instance.xcom_pull('get-instance')['persistence_iam_identity']"
".split(':', 2)[1] }}",
role="OWNER",
)
有关更多信息,请查看:授予导入和导出的受限权限
创建实例¶
使用 CloudMemorystoreCreateInstanceOperator
操作符创建实例。
create_instance = CloudMemorystoreCreateInstanceOperator(
task_id="create-instance",
location=LOCATION,
instance_id=MEMORYSTORE_REDIS_INSTANCE_NAME,
instance=FIRST_INSTANCE,
project_id=PROJECT_ID,
)
您可以将 Jinja 模板 与 location
、instance_id
、instance
、project_id
、retry
、timeout
、metadata
、gcp_conn_id
、impersonation_chain
参数结合使用,从而允许您动态确定值。结果将保存到 XCom 中,从而允许其他操作符使用它。
create_instance_result = BashOperator(
task_id="create-instance-result",
bash_command=f"echo {create_instance.output}",
)
删除实例¶
使用 CloudMemorystoreDeleteInstanceOperator
操作符删除实例。
delete_instance = CloudMemorystoreDeleteInstanceOperator(
task_id="delete-instance",
location=LOCATION,
instance=MEMORYSTORE_REDIS_INSTANCE_NAME,
project_id=PROJECT_ID,
)
您可以将 Jinja 模板 与 location
、instance
、project_id
、retry
、timeout
、metadata
、gcp_conn_id
、impersonation_chain
参数结合使用,从而允许您动态确定值。
导出实例¶
使用 CloudMemorystoreExportInstanceOperator
操作符删除实例。
export_instance = CloudMemorystoreExportInstanceOperator(
task_id="export-instance",
location=LOCATION,
instance=MEMORYSTORE_REDIS_INSTANCE_NAME,
output_config={"gcs_destination": {"uri": EXPORT_GCS_URL}},
project_id=PROJECT_ID,
)
您可以将 Jinja 模板 与 location
、instance
、output_config
、project_id
、retry
、timeout
、metadata
、gcp_conn_id
、impersonation_chain
参数结合使用,从而允许您动态确定值。
故障转移实例¶
使用 CloudMemorystoreFailoverInstanceOperator
操作符删除实例。
failover_instance = CloudMemorystoreFailoverInstanceOperator(
task_id="failover-instance",
location=LOCATION,
instance=MEMORYSTORE_REDIS_INSTANCE_NAME_2,
data_protection_mode=FailoverInstanceRequest.DataProtectionMode(
FailoverInstanceRequest.DataProtectionMode.LIMITED_DATA_LOSS
),
project_id=PROJECT_ID,
)
您可以将 Jinja 模板 与 location
、instance
、data_protection_mode
、project_id
、retry
、timeout
、metadata
、gcp_conn_id
、impersonation_chain
参数结合使用,从而允许您动态确定值。
获取实例¶
使用 CloudMemorystoreGetInstanceOperator
操作符删除实例。
get_instance = CloudMemorystoreGetInstanceOperator(
task_id="get-instance",
location=LOCATION,
instance=MEMORYSTORE_REDIS_INSTANCE_NAME,
project_id=PROJECT_ID,
do_xcom_push=True,
)
您可以将 Jinja 模板 与 location
、instance
、project_id
、retry
、timeout
、metadata
、gcp_conn_id
、impersonation_chain
参数结合使用,从而允许您动态确定值。
导入实例¶
使用 CloudMemorystoreImportOperator
操作符删除实例。
import_instance = CloudMemorystoreImportOperator(
task_id="import-instance",
location=LOCATION,
instance=MEMORYSTORE_REDIS_INSTANCE_NAME_2,
input_config={"gcs_source": {"uri": EXPORT_GCS_URL}},
project_id=PROJECT_ID,
)
您可以使用 Jinja 模板 以及 location
、 instance
、 input_config
、 project_id
、 retry
、 timeout
、 metadata
、 gcp_conn_id
、 impersonation_chain
参数,这允许您动态地确定值。
列出实例¶
使用 CloudMemorystoreListInstancesOperator
操作符来执行列出实例的操作。
list_instances = CloudMemorystoreListInstancesOperator(
task_id="list-instances", location="-", page_size=100, project_id=PROJECT_ID
)
您可以使用 Jinja 模板 以及 location
、 page_size
、 project_id
、 retry
、 timeout
、 metadata
、 gcp_conn_id
、 impersonation_chain
参数,这允许您动态地确定值。结果将保存到 XCom 中,以便其他操作符使用。
list_instances_result = BashOperator(
task_id="list-instances-result", bash_command=f"echo {get_instance.output}"
)
更新实例¶
使用 CloudMemorystoreUpdateInstanceOperator
操作符来执行更新实例的操作。
update_instance = CloudMemorystoreUpdateInstanceOperator(
task_id="update-instance",
location=LOCATION,
instance_id=MEMORYSTORE_REDIS_INSTANCE_NAME,
project_id=PROJECT_ID,
update_mask={"paths": ["memory_size_gb"]},
instance={"memory_size_gb": 2},
)
您可以使用 Jinja 模板 以及 update_mask
、 instance
、 location
、 instance_id
、 project_id
、 retry
、 timeout
、 metadata
、 gcp_conn_id
、 impersonation_chain
参数,这允许您动态地确定值。
缩放实例¶
使用 CloudMemorystoreScaleInstanceOperator
操作符来执行缩放实例的操作。
scale_instance = CloudMemorystoreScaleInstanceOperator(
task_id="scale-instance",
location=LOCATION,
instance_id=MEMORYSTORE_REDIS_INSTANCE_NAME_3,
project_id=PROJECT_ID,
memory_size_gb=3,
)
您可以使用 Jinja 模板 以及 memory_size_gb
、 location
、 instance_id
、 project_id
、 retry
、 timeout
、 metadata
、 gcp_conn_id
、 impersonation_chain
参数,这允许您动态地确定值。
创建实例并导入¶
如果您想创建带有导入数据的实例,可以使用 CloudMemorystoreCreateInstanceAndImportOperator
操作符。
导出并删除实例¶
如果您想导出数据并立即删除实例,那么您可以使用 CloudMemorystoreExportAndDeleteInstanceOperator
操作符。
您可以使用 Jinja 模板 以及 memory_size_gb
、 location
、 instance_id
、 project_id
、 retry
、 timeout
、 metadata
、 gcp_conn_id
、 impersonation_chain
参数,这允许您动态地确定值。