Google Cloud Memorystore 算子¶
Cloud Memorystore for Redis 是 Google Cloud 的全托管 Redis 服务。在 Google Cloud 上运行的应用程序可以通过利用高度可扩展、可用、安全的 Redis 服务实现极高的性能,而无需管理复杂的 Redis 部署。
先决条件任务¶
要使用这些算子,您必须执行以下操作
使用 Cloud 控制台 选择或创建 Cloud Platform 项目。
为您的项目启用计费,如 Google Cloud 文档 中所述。
启用 API,如 Cloud 控制台文档 中所述。
通过 pip 安装 API 库。
pip install 'apache-airflow[google]'有关 安装 的详细信息,请参阅。
实例¶
算子使用 Instance
来表示实例。该对象也可以表示为兼容的字典。
下面是实例的一个示例
FIRST_INSTANCE = {"tier": Instance.Tier.BASIC, "memory_size_gb": 1}
配置用于导入/导出的存储分区权限¶
必须为存储分区配置权限才能导入和导出数据。要查找实例的服务帐号,请运行 CloudMemorystoreCreateInstanceOperator
或 CloudMemorystoreGetInstanceOperator
,并使用 persistenceIamIdentity
下列出的服务帐号。
你可以使用 GCSBucketCreateAclEntryOperator
运算符设置权限。
set_acl_permission = GCSBucketCreateAclEntryOperator(
task_id="gcs-set-acl-permission",
bucket=BUCKET_NAME,
entity="user-{{ task_instance.xcom_pull('get-instance')['persistence_iam_identity']"
".split(':', 2)[1] }}",
role="OWNER",
)
有关详细信息,请参阅:授予导入和导出受限权限
创建实例¶
使用 CloudMemorystoreCreateInstanceOperator
运算符创建实例。
create_instance = CloudMemorystoreCreateInstanceOperator(
task_id="create-instance",
location=LOCATION,
instance_id=MEMORYSTORE_REDIS_INSTANCE_NAME,
instance=FIRST_INSTANCE,
project_id=PROJECT_ID,
)
您可以将 Jinja 模板 与 location
、instance_id
、instance
、project_id
、retry
、timeout
、metadata
、gcp_conn_id
、impersonation_chain
参数配合使用,这些参数允许您动态确定值。结果将保存到 XCom 中,这样其他运算符就可以使用它。
create_instance_result = BashOperator(
task_id="create-instance-result",
bash_command=f"echo {create_instance.output}",
)
删除实例¶
使用 CloudMemorystoreDeleteInstanceOperator
运算符删除实例。
delete_instance = CloudMemorystoreDeleteInstanceOperator(
task_id="delete-instance",
location=LOCATION,
instance=MEMORYSTORE_REDIS_INSTANCE_NAME,
project_id=PROJECT_ID,
)
您可以将 Jinja 模板 与 location
、instance
、project_id
、retry
、timeout
、metadata
、gcp_conn_id
、impersonation_chain
参数配合使用,这些参数允许您动态确定值。
导出实例¶
使用 CloudMemorystoreExportInstanceOperator
运算符导出实例。
export_instance = CloudMemorystoreExportInstanceOperator(
task_id="export-instance",
location=LOCATION,
instance=MEMORYSTORE_REDIS_INSTANCE_NAME,
output_config={"gcs_destination": {"uri": EXPORT_GCS_URL}},
project_id=PROJECT_ID,
)
您可以将 Jinja 模板 与 location
、instance
、output_config
、project_id
、retry
、timeout
、metadata
、gcp_conn_id
、impersonation_chain
参数结合使用,这些参数允许您动态确定值。
故障转移实例¶
使用 CloudMemorystoreFailoverInstanceOperator
运算符执行删除实例操作。
failover_instance = CloudMemorystoreFailoverInstanceOperator(
task_id="failover-instance",
location=LOCATION,
instance=MEMORYSTORE_REDIS_INSTANCE_NAME_2,
data_protection_mode=FailoverInstanceRequest.DataProtectionMode(
FailoverInstanceRequest.DataProtectionMode.LIMITED_DATA_LOSS
),
project_id=PROJECT_ID,
)
您可以将 Jinja 模板 与 location
、instance
、data_protection_mode
、project_id
、retry
、timeout
、metadata
、gcp_conn_id
、impersonation_chain
参数结合使用,这些参数允许您动态确定值。
获取实例¶
使用 CloudMemorystoreGetInstanceOperator
运算符执行删除实例操作。
get_instance = CloudMemorystoreGetInstanceOperator(
task_id="get-instance",
location=LOCATION,
instance=MEMORYSTORE_REDIS_INSTANCE_NAME,
project_id=PROJECT_ID,
do_xcom_push=True,
)
您可以将 Jinja 模板 与 location
、instance
、project_id
、retry
、timeout
、metadata
、gcp_conn_id
、impersonation_chain
参数配合使用,这些参数允许您动态确定值。
导入实例¶
使用 CloudMemorystoreImportOperator
运算符执行删除实例操作。
import_instance = CloudMemorystoreImportOperator(
task_id="import-instance",
location=LOCATION,
instance=MEMORYSTORE_REDIS_INSTANCE_NAME_2,
input_config={"gcs_source": {"uri": EXPORT_GCS_URL}},
project_id=PROJECT_ID,
)
你可以将 Jinja 模板 与 location
、instance
、input_config
、project_id
、retry
、timeout
、metadata
、gcp_conn_id
、impersonation_chain
参数配合使用,这些参数允许你动态确定值。
列出实例¶
使用 CloudMemorystoreListInstancesOperator
运算符列出实例。
list_instances = CloudMemorystoreListInstancesOperator(
task_id="list-instances", location="-", page_size=100, project_id=PROJECT_ID
)
你可以将 Jinja 模板 与 location
、page_size
、project_id
、retry
、timeout
、metadata
、gcp_conn_id
、impersonation_chain
参数配合使用,这些参数允许你动态确定值。结果会保存到 XCom 中,这样其他运算符就可以使用它。
list_instances_result = BashOperator(
task_id="list-instances-result", bash_command=f"echo {get_instance.output}"
)
更新实例¶
使用 CloudMemorystoreUpdateInstanceOperator
运算符更新实例。
update_instance = CloudMemorystoreUpdateInstanceOperator(
task_id="update-instance",
location=LOCATION,
instance_id=MEMORYSTORE_REDIS_INSTANCE_NAME,
project_id=PROJECT_ID,
update_mask={"paths": ["memory_size_gb"]},
instance={"memory_size_gb": 2},
)
您可以将 Jinja 模板 与 update_mask
、instance
、location
、instance_id
、project_id
、retry
、timeout
、metadata
、gcp_conn_id
、impersonation_chain
参数结合使用,以便动态确定值。
缩放实例¶
使用 CloudMemorystoreScaleInstanceOperator
运算符缩放实例。
scale_instance = CloudMemorystoreScaleInstanceOperator(
task_id="scale-instance",
location=LOCATION,
instance_id=MEMORYSTORE_REDIS_INSTANCE_NAME_3,
project_id=PROJECT_ID,
memory_size_gb=3,
)
您可以将 Jinja 模板 与 memory_size_gb
、location
、instance_id
、project_id
、retry
、timeout
、metadata
、gcp_conn_id
、impersonation_chain
参数结合使用,以便动态确定值。
创建实例并导入¶
如果您想使用导入的数据创建实例,则可以使用 CloudMemorystoreCreateInstanceAndImportOperator
运算符。
导出和删除实例¶
如果您想导出数据并立即删除实例,则可以使用 CloudMemorystoreExportAndDeleteInstanceOperator
运算符。
您可以将 Jinja 模板 与 memory_size_gb
、location
、instance_id
、project_id
、retry
、timeout
、metadata
、gcp_conn_id
、impersonation_chain
参数结合使用,以便动态确定值。