Google Cloud Tasks¶
Cloud Tasks 是一项完全托管的服务,可让你管理大量分布式任务的执行、调度和交付。使用 Cloud Tasks,你可以在用户或服务到服务请求之外异步执行工作。
有关该服务的更多信息,请访问 Cloud Tasks 产品文档
先决任务¶
要使用这些操作符,你必须执行以下操作
使用 Cloud 控制台 选择或创建 Cloud Platform 项目。
为你的项目启用结算,如 Google Cloud 文档 中所述。
启用 API,如 Cloud 控制台文档 中所述。
通过 pip 安装 API 库。
pip install 'apache-airflow[google]'有关详细信息,请访问 安装。
队列操作¶
创建队列¶
要创建新队列,请使用 CloudTasksQueueCreateOperator
create_queue = CloudTasksQueueCreateOperator(
location=LOCATION,
task_queue=Queue(stackdriver_logging_config=dict(sampling_ratio=0.5)),
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
retry=Retry(maximum=10.0),
timeout=5,
task_id="create_queue",
)
删除队列¶
要删除队列,请使用 CloudTasksQueueDeleteOperator
delete_queue = CloudTasksQueueDeleteOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_id="delete_queue",
)
恢复队列¶
要恢复队列,请使用 CloudTasksQueueResumeOperator
resume_queue = CloudTasksQueueResumeOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_id="resume_queue",
)
暂停队列¶
要暂停队列,请使用 CloudTasksQueuePauseOperator
pause_queue = CloudTasksQueuePauseOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_id="pause_queue",
)
清除队列¶
要清除队列,请使用 CloudTasksQueuePurgeOperator
purge_queue = CloudTasksQueuePurgeOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_id="purge_queue",
)
获取队列¶
要获取队列,请使用 CloudTasksQueueGetOperator
get_queue = CloudTasksQueueGetOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_id="get_queue",
)
get_queue_result = BashOperator(
task_id="get_queue_result",
bash_command=f"echo {get_queue.output}",
)
更新队列¶
要更新队列,请使用 CloudTasksQueueUpdateOperator
update_queue = CloudTasksQueueUpdateOperator(
task_queue=Queue(stackdriver_logging_config=dict(sampling_ratio=1)),
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
update_mask=FieldMask(paths=["stackdriver_logging_config.sampling_ratio"]),
task_id="update_queue",
)
列出队列¶
要列出所有队列,请使用 CloudTasksQueuesListOperator
list_queue = CloudTasksQueuesListOperator(location=LOCATION, task_id="list_queue")
任务操作¶
创建任务¶
要在特定队列中创建新任务,请使用 CloudTasksTaskCreateOperator
create_task = CloudTasksTaskCreateOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task=TASK,
task_name=TASK_NAME + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
retry=Retry(maximum=10.0),
timeout=5,
task_id="create_task_to_run",
)
获取任务¶
要获取特定队列中的任务,请使用 CloudTasksTaskGetOperator
tasks_get = CloudTasksTaskGetOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_name=TASK_NAME + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_id="tasks_get",
)
运行任务¶
要运行特定队列中的任务,请使用 CloudTasksTaskRunOperator
run_task = CloudTasksTaskRunOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_name=TASK_NAME + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
retry=Retry(maximum=10.0),
task_id="run_task",
)
列出任务¶
要列出特定队列中的所有任务,请使用 CloudTasksTasksListOperator
list_tasks = CloudTasksTasksListOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task_id="list_tasks",
)
删除任务¶
要从特定队列中删除任务,请使用 CloudTasksTaskDeleteOperator
create_task = CloudTasksTaskCreateOperator(
location=LOCATION,
queue_name=QUEUE_ID + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
task=TASK,
task_name=TASK_NAME + "{{ task_instance.xcom_pull(task_ids='random_string') }}",
retry=Retry(maximum=10.0),
timeout=5,
task_id="create_task_to_run",
)