Google Cloud Run 操作符¶
Cloud Run 用于在完全托管的平台上构建和部署用任何语言(包括 Go、Python、Java、Node.js、.NET 和 Ruby)编写的可扩展容器化应用程序。
有关此服务的更多信息,请访问 Google Cloud Run 文档。
先决条件任务¶
要使用这些操作符,您必须执行以下操作:
使用 Cloud Console 选择或创建 Cloud Platform 项目。
按照 Google Cloud 文档中所述,为您的项目启用结算功能。
按照 Cloud Console 文档中所述,启用 API。
通过 pip 安装 API 库。
pip install 'apache-airflow[google]'有关详细信息,请参阅 安装。
创建作业¶
在 Cloud Run 中创建作业之前,您需要定义它。有关 Job 对象字段的更多信息,请访问 Google Cloud Run Job 描述
可以使用 Job 对象创建简单的作业配置
def _create_job_instance() -> Job:
"""
Create a Cloud Run job configuration with google.cloud.run_v2.Job object.
As a minimum the configuration must contain a container image name in its template.
The rest of the configuration parameters are optional and will be populated with default values if not set.
"""
job = Job()
container = k8s_min.Container()
container.image = "us-docker.pkg.dev/cloudrun/container/job:latest"
container.resources.limits = {"cpu": "2", "memory": "1Gi"}
job.template.template.containers.append(container)
return job
或使用 Python 字典
def _create_job_dict() -> dict:
"""
Create a Cloud Run job configuration with a Python dict.
As a minimum the configuration must contain a container image name in its template.
"""
return {
"template": {
"template": {
"containers": [
{
"image": "us-docker.pkg.dev/cloudrun/container/job:latest",
"resources": {
"limits": {"cpu": "1", "memory": "512Mi"},
"cpu_idle": False,
"startup_cpu_boost": False,
},
"name": "",
"command": [],
"args": [],
"env": [],
"ports": [],
"volume_mounts": [],
"working_dir": "",
"depends_on": [],
}
],
"volumes": [],
"execution_environment": 0,
"encryption_key": "",
},
"labels": {},
"annotations": {},
"parallelism": 0,
"task_count": 0,
},
"name": "",
"uid": "",
"generation": "0",
"labels": {},
"annotations": {},
"creator": "",
"last_modifier": "",
"client": "",
"client_version": "",
"launch_stage": 0,
"observed_generation": "0",
"conditions": [],
"execution_count": 0,
"reconciling": False,
"satisfies_pzs": False,
"etag": "",
}
您可以使用以下任何一种配置创建 Cloud Run 作业:CloudRunCreateJobOperator
create1 = CloudRunCreateJobOperator(
task_id=create1_task_name,
project_id=PROJECT_ID,
region=region,
job_name=job1_name,
job=_create_job_instance(),
dag=dag,
)
请注意,此操作符仅创建作业而不执行它。作业的字典表示形式被推送到 XCom。
创建服务¶
在 Cloud Run 中创建服务之前,您需要定义它。有关 Service 对象字段的更多信息,请访问 Google Cloud Run Service 描述
一个简单的服务配置如下所示
def _create_service():
service = Service()
container = k8s_min.Container()
container.image = "us-docker.pkg.dev/cloudrun/container/placeholder:latest"
service.template.containers.append(container)
return service
使用此配置,我们可以创建服务:CloudRunCreateServiceOperator
create_cloud_run_service = CloudRunCreateServiceOperator(
task_id="create-cloud-run-service",
project_id=PROJECT_ID,
region="us-central1",
service=_create_service(),
service_name="cloudrun-system-test-service",
)
请注意,此操作符仅创建服务而不执行它。服务的字典表示形式被推送到 XCom。
删除服务¶
使用此配置,我们可以删除服务:CloudRunDeleteServiceOperator
delete_cloud_run_service = CloudRunDeleteServiceOperator(
task_id="delete-cloud-run-service",
project_id=PROJECT_ID,
region="us-central1",
service_name="cloudrun-system-test-service",
dag=dag,
)
请注意,此操作符会等待服务被删除,并且已删除的服务的字典表示形式会被推送到 XCom。
执行作业¶
要执行作业,可以使用
execute1 = CloudRunExecuteJobOperator(
task_id=execute1_task_name,
project_id=PROJECT_ID,
region=region,
job_name=job1_name,
dag=dag,
deferrable=False,
)
或者,您可以在可延迟模式下定义相同的操作符
execute2 = CloudRunExecuteJobOperator(
task_id=execute2_task_name,
project_id=PROJECT_ID,
region=region,
job_name=job2_name,
dag=dag,
deferrable=True,
)
您还可以指定覆盖,以便为作业提供新的入口点命令等
overrides = {
"container_overrides": [
{
"name": "job",
"args": ["python", "main.py"],
"env": [{"name": "ENV_VAR", "value": "value"}],
"clear_args": False,
}
],
"task_count": 1,
"timeout": "60s",
}
execute3 = CloudRunExecuteJobOperator(
task_id=execute3_task_name,
project_id=PROJECT_ID,
region=region,
overrides=overrides,
job_name=job3_name,
dag=dag,
deferrable=False,
)
更新作业¶
要更新作业,可以使用
update_job1 = CloudRunUpdateJobOperator(
task_id=update_job1_task_name,
project_id=PROJECT_ID,
region=region,
job_name=job1_name,
job=_create_job_instance_with_label(),
dag=dag,
)
作业的字典表示形式被推送到 XCom。
列出作业¶
要列出作业,可以使用
list_jobs = CloudRunListJobsOperator(
task_id=list_jobs_task_name, project_id=PROJECT_ID, region=region, dag=dag
)
该操作符接受两个可选参数:“limit” 用于限制返回的任务数,“show_deleted” 用于在结果中包含已删除的作业。
删除作业¶
要删除作业,可以使用
delete_job1 = CloudRunDeleteJobOperator(
task_id="delete-job1",
project_id=PROJECT_ID,
region=region,
job_name=job1_name,
dag=dag,
trigger_rule=TriggerRule.ALL_DONE,
)
请注意,此操作符会等待作业被删除,并且已删除的作业的字典表示形式会被推送到 XCom。