Google Cloud Workflows 运算符

你可以使用 Workflows 创建无服务器工作流,按你定义的顺序将一系列无服务器任务链接在一起。结合 Google Cloud 的 API、Cloud Functions 和 Cloud Run 等无服务器产品以及对外部 API 的调用,创建灵活的无服务器应用程序。

有关此服务的更多信息,请访问 Workflows 产品文档 <产品文档

先决条件任务

要使用这些运算符,你必须执行以下操作

创建工作流

要创建工作流,请使用 WorkflowsCreateWorkflowOperator

tests/system/providers/google/cloud/workflows/example_workflows.py[源代码]

create_workflow = WorkflowsCreateWorkflowOperator(
    task_id="create_workflow",
    location=LOCATION,
    project_id=PROJECT_ID,
    workflow=WORKFLOW,
    workflow_id=WORKFLOW_ID,
)

工作流应以类似于此示例的方式定义

tests/system/providers/google/cloud/workflows/example_workflows.py[源代码]

WORKFLOW_CONTENT = """
- getLanguage:
    assign:
        - inputLanguage: "English"
- readWikipedia:
    call: http.get
    args:
        url: https://www.wikipedia.org/
        query:
            action: opensearch
            search: ${inputLanguage}
    result: wikiResult
- returnResult:
    return: ${wikiResult}
"""

WORKFLOW = {
    "description": "Test workflow",
    "labels": {"airflow-version": "dev"},
    "source_contents": WORKFLOW_CONTENT,
}

有关编写工作流的详细信息,请查看官方产品文档 <产品文档

更新工作流

要更新工作流,请使用 WorkflowsUpdateWorkflowOperator

tests/system/providers/google/cloud/workflows/example_workflows.py[源代码]

update_workflow = WorkflowsUpdateWorkflowOperator(
    task_id="update_workflow",
    location=LOCATION,
    project_id=PROJECT_ID,
    workflow_id=WORKFLOW_ID,
    update_mask=FieldMask(paths=["name", "description"]),
)

获取工作流

要获取工作流,请使用 WorkflowsGetWorkflowOperator

tests/system/providers/google/cloud/workflows/example_workflows.py[源代码]

get_workflow = WorkflowsGetWorkflowOperator(
    task_id="get_workflow", location=LOCATION, project_id=PROJECT_ID, workflow_id=WORKFLOW_ID
)

列出工作流

要列出工作流,请使用 WorkflowsListWorkflowsOperator

tests/system/providers/google/cloud/workflows/example_workflows.py[源代码]

list_workflows = WorkflowsListWorkflowsOperator(
    task_id="list_workflows",
    location=LOCATION,
    project_id=PROJECT_ID,
)

删除工作流

要删除工作流,请使用 WorkflowsDeleteWorkflowOperator

tests/system/providers/google/cloud/workflows/example_workflows.py[源代码]

delete_workflow = WorkflowsDeleteWorkflowOperator(
    task_id="delete_workflow",
    location=LOCATION,
    project_id=PROJECT_ID,
    workflow_id=WORKFLOW_ID,
    trigger_rule=TriggerRule.ALL_DONE,
)

创建执行

要创建执行,请使用 WorkflowsCreateExecutionOperator。由于 API 限制,此操作不是幂等的。

tests/system/providers/google/cloud/workflows/example_workflows.py[源代码]

create_execution = WorkflowsCreateExecutionOperator(
    task_id="create_execution",
    location=LOCATION,
    project_id=PROJECT_ID,
    execution=EXECUTION,
    workflow_id=WORKFLOW_ID,
)

创建操作符不会等待执行完成。要等待执行结果,请使用 WorkflowExecutionSensor

tests/system/providers/google/cloud/workflows/example_workflows.py[源代码]

wait_for_execution = WorkflowExecutionSensor(
    task_id="wait_for_execution",
    location=LOCATION,
    project_id=PROJECT_ID,
    workflow_id=WORKFLOW_ID,
    execution_id=create_execution_id,
)

获取执行

要获取执行,请使用 WorkflowsGetExecutionOperator

tests/system/providers/google/cloud/workflows/example_workflows.py[源代码]

get_execution = WorkflowsGetExecutionOperator(
    task_id="get_execution",
    location=LOCATION,
    project_id=PROJECT_ID,
    workflow_id=WORKFLOW_ID,
    execution_id=create_execution_id,
)

列出执行

要列出执行,请使用 WorkflowsListExecutionsOperator。默认情况下,此操作符仅返回过去 60 分钟的执行。

tests/system/providers/google/cloud/workflows/example_workflows.py[源代码]

list_executions = WorkflowsListExecutionsOperator(
    task_id="list_executions", location=LOCATION, project_id=PROJECT_ID, workflow_id=WORKFLOW_ID
)

取消执行

要取消执行,请使用 WorkflowsCancelExecutionOperator

tests/system/providers/google/cloud/workflows/example_workflows.py[源代码]

cancel_execution = WorkflowsCancelExecutionOperator(
    task_id="cancel_execution",
    location=LOCATION,
    project_id=PROJECT_ID,
    workflow_id=SLEEP_WORKFLOW_ID,
    execution_id=cancel_execution_id,
)

此条目是否有用?