DatabricksWorkflowTaskGroup

使用 DatabricksWorkflowTaskGroup 将 Databricks 笔记本作业运行作为 Airflow 任务启动并监视。任务组启动 Databricks 工作流,并在其中运行笔记本作业,与在 DatabricksWorkflowTaskGroup 外部执行 DatabricksNotebookOperator 相比,可降低 75% 的成本(通用计算为 0.40 美元/DBU,作业计算为 0.07 美元/DBU)。

在 Airflow 中定义 Databricks 工作流有一些优势

创作界面

通过 Databricks(基于 Web 的 Databricks UI)

通过 Airflow(使用 Airflow DAG 编写代码)

工作流计算定价

源代码控制中的笔记本代码

源代码控制中的工作流结构

从头开始重试

重试单个任务

工作流中的任务组

从其他 DAG 触发工作流

工作流级参数

示例

包含 DatabricksWorkflowTaskGroup 的 DAG 示例

tests/system/providers/databricks/example_databricks_workflow.py[源代码]

    task_group = DatabricksWorkflowTaskGroup(
        group_id=f"test_workflow_{USER}_{GROUP_ID}",
        databricks_conn_id=DATABRICKS_CONN_ID,
        job_clusters=job_cluster_spec,
        notebook_params={"ts": "{{ ts }}"},
        notebook_packages=[
            {
                "pypi": {
                    "package": "simplejson==3.18.0",  # Pin specification version of a package like this.
                    "repo": "https://pypi.ac.cn/simple",  # You can specify your required Pypi index here.
                }
            },
        ],
        extra_job_params={
            "email_notifications": {
                "on_start": [DATABRICKS_NOTIFICATION_EMAIL],
            },
        },
    )
    with task_group:
        notebook_1 = DatabricksNotebookOperator(
            task_id="workflow_notebook_1",
            databricks_conn_id=DATABRICKS_CONN_ID,
            notebook_path="/Shared/Notebook_1",
            notebook_packages=[{"pypi": {"package": "Faker"}}],
            source="WORKSPACE",
            job_cluster_key="Shared_job_cluster",
            execution_timeout=timedelta(seconds=600),
        )

        notebook_2 = DatabricksNotebookOperator(
            task_id="workflow_notebook_2",
            databricks_conn_id=DATABRICKS_CONN_ID,
            notebook_path="/Shared/Notebook_2",
            source="WORKSPACE",
            job_cluster_key="Shared_job_cluster",
            notebook_params={"foo": "bar", "ds": "{{ ds }}"},
        )

        task_operator_nb_1 = DatabricksTaskOperator(
            task_id="nb_1",
            databricks_conn_id="databricks_conn",
            job_cluster_key="Shared_job_cluster",
            task_config={
                "notebook_task": {
                    "notebook_path": "/Shared/Notebook_1",
                    "source": "WORKSPACE",
                },
                "libraries": [
                    {"pypi": {"package": "Faker"}},
                    {"pypi": {"package": "simplejson"}},
                ],
            },
        )

        sql_query = DatabricksTaskOperator(
            task_id="sql_query",
            databricks_conn_id="databricks_conn",
            task_config={
                "sql_task": {
                    "query": {
                        "query_id": QUERY_ID,
                    },
                    "warehouse_id": WAREHOUSE_ID,
                }
            },
        )

        notebook_1 >> notebook_2 >> task_operator_nb_1 >> sql_query

使用此示例,Airflow 将生成一个名为 <dag_name>.test_workflow_<USER>_<GROUP_ID> 的作业,该作业将运行任务 notebook_1,然后运行 notebook_2。如果作业尚不存在,则将在 Databricks 工作区中创建该作业。如果作业已存在,它将更新为与 DAG 中定义的工作流相匹配。

下图显示了基于上述示例在 Airflow UI 中生成的 Databricks 工作流

../_images/databricks_workflow_task_group_airflow_graph_view.png

下面描述了从 Airflow DAG 触发的运行在 Databricks UI 中的相应 Databricks 工作流

../_images/workflow_run_databricks_graph_view.png

为了最大程度地减少更新冲突,我们建议您尽可能将参数保留在 notebook_params 中,而不是保留在 DatabricksWorkflowTaskGroup 中的 DatabricksNotebookOperator 中。这是因为 DatabricksWorkflowTaskGroup 中的任务在作业触发时间传递,并且不会修改作业定义。

此条目是否有用?