Google Cloud Tasks

Cloud Tasks 是一项完全托管的服务,允许您管理大量分布式任务的执行、分派和交付。使用 Cloud Tasks,您可以在用户或服务到服务请求之外异步执行工作。

有关该服务的更多信息,请访问 Cloud Tasks 产品文档

先决条件任务

要使用这些操作符,您必须执行以下几个操作

队列操作

创建队列

要创建新队列,请使用 CloudTasksQueueCreateOperator

tests/system/google/cloud/tasks/example_queue.py[源代码]

create_queue = CloudTasksQueueCreateOperator(
    location=LOCATION,
    task_queue=Queue(stackdriver_logging_config=dict(sampling_ratio=0.5)),
    queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    retry=Retry(maximum=10.0),
    timeout=5,
    task_id="create_queue",
)

删除队列

要删除队列,请使用 CloudTasksQueueDeleteOperator

tests/system/google/cloud/tasks/example_queue.py[源代码]

delete_queue = CloudTasksQueueDeleteOperator(
    location=LOCATION,
    queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    task_id="delete_queue",
)

恢复队列

要恢复队列,请使用 CloudTasksQueueResumeOperator

tests/system/google/cloud/tasks/example_queue.py[源代码]

resume_queue = CloudTasksQueueResumeOperator(
    location=LOCATION,
    queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    task_id="resume_queue",
)

暂停队列

要暂停队列,请使用 CloudTasksQueuePauseOperator

tests/system/google/cloud/tasks/example_queue.py[源代码]

pause_queue = CloudTasksQueuePauseOperator(
    location=LOCATION,
    queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    task_id="pause_queue",
)

清除队列

要清除队列,请使用 CloudTasksQueuePurgeOperator

tests/system/google/cloud/tasks/example_queue.py[源代码]

purge_queue = CloudTasksQueuePurgeOperator(
    location=LOCATION,
    queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    task_id="purge_queue",
)

获取队列

要获取队列,请使用 CloudTasksQueueGetOperator

tests/system/google/cloud/tasks/example_queue.py[源代码]

get_queue = CloudTasksQueueGetOperator(
    location=LOCATION,
    queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    task_id="get_queue",
)

get_queue_result = BashOperator(
    task_id="get_queue_result",
    bash_command=f"echo {get_queue.output}",
)

更新队列

要更新队列,请使用 CloudTasksQueueUpdateOperator

tests/system/google/cloud/tasks/example_queue.py[源代码]

update_queue = CloudTasksQueueUpdateOperator(
    task_queue=Queue(stackdriver_logging_config=dict(sampling_ratio=1)),
    location=LOCATION,
    queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    update_mask=FieldMask(paths=["stackdriver_logging_config.sampling_ratio"]),
    task_id="update_queue",
)

列出队列

要列出所有队列,请使用 CloudTasksQueuesListOperator

tests/system/google/cloud/tasks/example_queue.py[源代码]

list_queue = CloudTasksQueuesListOperator(location=LOCATION, task_id="list_queue")

任务操作

创建任务

要在特定队列中创建新任务,请使用 CloudTasksTaskCreateOperator

tests/system/google/cloud/tasks/example_tasks.py[源代码]

create_task = CloudTasksTaskCreateOperator(
    location=LOCATION,
    queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    task=TASK,
    task_name=TASK_NAME + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    retry=Retry(maximum=10.0),
    timeout=5,
    task_id="create_task_to_run",
)

获取任务

要获取特定队列中的任务,请使用 CloudTasksTaskGetOperator

tests/system/google/cloud/tasks/example_tasks.py[源代码]

tasks_get = CloudTasksTaskGetOperator(
    location=LOCATION,
    queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    task_name=TASK_NAME + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    task_id="tasks_get",
)

运行任务

要运行特定队列中的任务,请使用 CloudTasksTaskRunOperator

tests/system/google/cloud/tasks/example_tasks.py[源代码]

run_task = CloudTasksTaskRunOperator(
    location=LOCATION,
    queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    task_name=TASK_NAME + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    retry=Retry(maximum=10.0),
    task_id="run_task",
)

列出任务

要列出特定队列中的所有任务,请使用 CloudTasksTasksListOperator

tests/system/google/cloud/tasks/example_tasks.py[源代码]

list_tasks = CloudTasksTasksListOperator(
    location=LOCATION,
    queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    task_id="list_tasks",
)

删除任务

要从特定队列中删除任务,请使用 CloudTasksTaskDeleteOperator

tests/system/google/cloud/tasks/example_tasks.py[源代码]

create_task = CloudTasksTaskCreateOperator(
    location=LOCATION,
    queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    task=TASK,
    task_name=TASK_NAME + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
    retry=Retry(maximum=10.0),
    timeout=5,
    task_id="create_task_to_run",
)

参考

有关更多信息,请查看

此条目是否有帮助?