Google Cloud Batch 运算符

Cloud Batch 是一项完全托管的批处理服务,用于在 Google 的基础设施上计划、排队和执行批处理作业。

有关该服务的更多信息,请访问 Google Cloud Batch 文档

先决条件任务

要使用这些运算符,您必须执行以下几项操作

提交作业

在 Cloud Batch 中提交作业之前,您需要定义该作业。有关作业对象字段的更多信息,请访问 Google Cloud Batch 作业描述

简单的作业配置如下所示

tests/system/providers/google/cloud/cloud_batch/example_cloud_batch.py[源代码]

def _create_job():
    runnable = batch_v1.Runnable()
    runnable.container = batch_v1.Runnable.Container()
    runnable.container.image_uri = "gcr.io/google-containers/busybox"
    runnable.container.entrypoint = "/bin/sh"
    runnable.container.commands = [
        "-c",
        "echo Hello world! This is task ${BATCH_TASK_INDEX}.\
          This job has a total of ${BATCH_TASK_COUNT} tasks.",
    ]

    task = batch_v1.TaskSpec()
    task.runnables = [runnable]

    resources = batch_v1.ComputeResource()
    resources.cpu_milli = 2000
    resources.memory_mib = 16
    task.compute_resource = resources
    task.max_retry_count = 2

    group = batch_v1.TaskGroup()
    group.task_count = 2
    group.task_spec = task
    policy = batch_v1.AllocationPolicy.InstancePolicy()
    policy.machine_type = "e2-standard-4"
    instances = batch_v1.AllocationPolicy.InstancePolicyOrTemplate()
    instances.policy = policy
    allocation_policy = batch_v1.AllocationPolicy()
    allocation_policy.instances = [instances]

    job = batch_v1.Job()
    job.task_groups = [group]
    job.allocation_policy = allocation_policy
    job.labels = {"env": "testing", "type": "container"}

    job.logs_policy = batch_v1.LogsPolicy()
    job.logs_policy.destination = batch_v1.LogsPolicy.Destination.CLOUD_LOGGING

    return job


使用此配置,我们可以提交作业: CloudBatchSubmitJobOperator

tests/system/providers/google/cloud/cloud_batch/example_cloud_batch.py[源代码]

submit1 = CloudBatchSubmitJobOperator(
    task_id=submit1_task_name,
    project_id=PROJECT_ID,
    region=region,
    job_name=job1_name,
    job=_create_job(),
    dag=dag,
    deferrable=False,
)

或者,您可以在可延迟模式中定义相同的操作符:CloudBatchSubmitJobOperator

tests/system/providers/google/cloud/cloud_batch/example_cloud_batch.py[源代码]

submit2 = CloudBatchSubmitJobOperator(
    task_id=submit2_task_name,
    project_id=PROJECT_ID,
    region=region,
    job_name=job2_name,
    job=batch_v1.Job.to_dict(_create_job()),
    dag=dag,
    deferrable=True,
)

请注意,此操作符会等待作业完成执行,并且作业的字典表示会推送到 XCom。

列出作业的任务

要列出某个作业的任务,您可以使用

CloudBatchListTasksOperator

tests/system/providers/google/cloud/cloud_batch/example_cloud_batch.py[源代码]

list_tasks = CloudBatchListTasksOperator(
    task_id=list_tasks_task_name, project_id=PROJECT_ID, region=region, job_name=job1_name, dag=dag
)

该操作符采用两个可选参数:“limit”以限制返回的任务数量,以及“filter”以仅列出与过滤器匹配的任务。

列出作业

要列出作业,您可以使用

CloudBatchListJobsOperator

tests/system/providers/google/cloud/cloud_batch/example_cloud_batch.py[源代码]

list_jobs = CloudBatchListJobsOperator(
    task_id=list_jobs_task_name,
    project_id=PROJECT_ID,
    region=region,
    limit=2,
    filter=f"name:projects/{PROJECT_ID}/locations/{region}/jobs/{job_name_prefix}*",
    dag=dag,
)

该操作符采用两个可选参数:“limit”以限制返回的任务数量,以及“filter”以仅列出与过滤器匹配的任务。

删除作业

要删除作业,您可以使用

CloudBatchDeleteJobOperator

tests/system/providers/google/cloud/cloud_batch/example_cloud_batch.py[源代码]

delete_job1 = CloudBatchDeleteJobOperator(
    task_id="delete-job1",
    project_id=PROJECT_ID,
    region=region,
    job_name=job1_name,
    dag=dag,
    trigger_rule=TriggerRule.ALL_DONE,
)

请注意,此操作符会等待作业被删除,并且已删除作业的字典表示会推送到 XCom。

此条目是否有用?