Google Cloud Batch 运算符¶
Cloud Batch 是一项完全托管的批处理服务,用于在 Google 的基础设施上计划、排队和执行批处理作业。
有关该服务的更多信息,请访问 Google Cloud Batch 文档。
先决条件任务¶
要使用这些运算符,您必须执行以下几项操作
使用 Cloud 控制台 选择或创建 Cloud Platform 项目。
为您的项目启用帐单,如 Google Cloud 文档 中所述。
启用 API,如 Cloud 控制台文档 中所述。
通过 pip 安装 API 库。
pip install 'apache-airflow[google]'有关 安装 的详细信息,请访问。
提交作业¶
在 Cloud Batch 中提交作业之前,您需要定义该作业。有关作业对象字段的更多信息,请访问 Google Cloud Batch 作业描述。
简单的作业配置如下所示
def _create_job():
runnable = batch_v1.Runnable()
runnable.container = batch_v1.Runnable.Container()
runnable.container.image_uri = "gcr.io/google-containers/busybox"
runnable.container.entrypoint = "/bin/sh"
runnable.container.commands = [
"-c",
"echo Hello world! This is task ${BATCH_TASK_INDEX}.\
This job has a total of ${BATCH_TASK_COUNT} tasks.",
]
task = batch_v1.TaskSpec()
task.runnables = [runnable]
resources = batch_v1.ComputeResource()
resources.cpu_milli = 2000
resources.memory_mib = 16
task.compute_resource = resources
task.max_retry_count = 2
group = batch_v1.TaskGroup()
group.task_count = 2
group.task_spec = task
policy = batch_v1.AllocationPolicy.InstancePolicy()
policy.machine_type = "e2-standard-4"
instances = batch_v1.AllocationPolicy.InstancePolicyOrTemplate()
instances.policy = policy
allocation_policy = batch_v1.AllocationPolicy()
allocation_policy.instances = [instances]
job = batch_v1.Job()
job.task_groups = [group]
job.allocation_policy = allocation_policy
job.labels = {"env": "testing", "type": "container"}
job.logs_policy = batch_v1.LogsPolicy()
job.logs_policy.destination = batch_v1.LogsPolicy.Destination.CLOUD_LOGGING
return job
使用此配置,我们可以提交作业: CloudBatchSubmitJobOperator
submit1 = CloudBatchSubmitJobOperator(
task_id=submit1_task_name,
project_id=PROJECT_ID,
region=region,
job_name=job1_name,
job=_create_job(),
dag=dag,
deferrable=False,
)
或者,您可以在可延迟模式中定义相同的操作符:CloudBatchSubmitJobOperator
submit2 = CloudBatchSubmitJobOperator(
task_id=submit2_task_name,
project_id=PROJECT_ID,
region=region,
job_name=job2_name,
job=batch_v1.Job.to_dict(_create_job()),
dag=dag,
deferrable=True,
)
请注意,此操作符会等待作业完成执行,并且作业的字典表示会推送到 XCom。
列出作业的任务¶
要列出某个作业的任务,您可以使用
list_tasks = CloudBatchListTasksOperator(
task_id=list_tasks_task_name, project_id=PROJECT_ID, region=region, job_name=job1_name, dag=dag
)
该操作符采用两个可选参数:“limit”以限制返回的任务数量,以及“filter”以仅列出与过滤器匹配的任务。
列出作业¶
要列出作业,您可以使用
list_jobs = CloudBatchListJobsOperator(
task_id=list_jobs_task_name,
project_id=PROJECT_ID,
region=region,
limit=2,
filter=f"name:projects/{PROJECT_ID}/locations/{region}/jobs/{job_name_prefix}*",
dag=dag,
)
该操作符采用两个可选参数:“limit”以限制返回的任务数量,以及“filter”以仅列出与过滤器匹配的任务。
删除作业¶
要删除作业,您可以使用
delete_job1 = CloudBatchDeleteJobOperator(
task_id="delete-job1",
project_id=PROJECT_ID,
region=region,
job_name=job1_name,
dag=dag,
trigger_rule=TriggerRule.ALL_DONE,
)
请注意,此操作符会等待作业被删除,并且已删除作业的字典表示会推送到 XCom。