DatabricksWorkflowTaskGroup¶
使用 DatabricksWorkflowTaskGroup
启动和监控作为 Airflow 任务的 Databricks notebook 作业运行。该任务组启动一个 Databricks Workflow 并从中运行 notebook 作业,与在 DatabricksWorkflowTaskGroup
之外执行 DatabricksNotebookOperator
相比,可降低 75% 的成本(通用计算为 0.40 美元/DBU,作业计算为 0.07 美元/DBU)。
在 Airflow 中定义 Databricks Workflow 有以下几个优点
创作界面 |
通过 Databricks (基于 Web 的 Databricks UI) |
通过 Airflow (使用 Airflow DAG 的代码) |
---|---|---|
Workflow 计算定价 |
✅ |
✅ |
源代码控制中的 Notebook 代码 |
✅ |
✅ |
源代码控制中的 Workflow 结构 |
✅ |
✅ |
从头重试 |
✅ |
✅ |
重试单个任务 |
✅ |
✅ |
Workflow 中的任务组 |
✅ |
|
从其他 DAG 触发工作流 |
✅ |
|
工作流级别参数 |
✅ |
示例¶
带有 DatabricksWorkflowTaskGroup 的 DAG 示例¶
task_group = DatabricksWorkflowTaskGroup(
group_id=f"test_workflow_{USER}_{GROUP_ID}",
databricks_conn_id=DATABRICKS_CONN_ID,
job_clusters=job_cluster_spec,
notebook_params={"ts": "{{ ts }}"},
notebook_packages=[
{
"pypi": {
"package": "simplejson==3.18.0", # Pin specification version of a package like this.
"repo": "https://pypi.ac.cn/simple", # You can specify your required Pypi index here.
}
},
],
extra_job_params={
"email_notifications": {
"on_start": [DATABRICKS_NOTIFICATION_EMAIL],
},
},
)
with task_group:
notebook_1 = DatabricksNotebookOperator(
task_id="workflow_notebook_1",
databricks_conn_id=DATABRICKS_CONN_ID,
notebook_path="/Shared/Notebook_1",
notebook_packages=[{"pypi": {"package": "Faker"}}],
source="WORKSPACE",
job_cluster_key="Shared_job_cluster",
execution_timeout=timedelta(seconds=600),
)
notebook_2 = DatabricksNotebookOperator(
task_id="workflow_notebook_2",
databricks_conn_id=DATABRICKS_CONN_ID,
notebook_path="/Shared/Notebook_2",
source="WORKSPACE",
job_cluster_key="Shared_job_cluster",
notebook_params={"foo": "bar", "ds": "{{ ds }}"},
)
task_operator_nb_1 = DatabricksTaskOperator(
task_id="nb_1",
databricks_conn_id=DATABRICKS_CONN_ID,
job_cluster_key="Shared_job_cluster",
task_config={
"notebook_task": {
"notebook_path": "/Shared/Notebook_1",
"source": "WORKSPACE",
},
"libraries": [
{"pypi": {"package": "Faker"}},
],
},
)
sql_query = DatabricksTaskOperator(
task_id="sql_query",
databricks_conn_id=DATABRICKS_CONN_ID,
task_config={
"sql_task": {
"query": {
"query_id": QUERY_ID,
},
"warehouse_id": WAREHOUSE_ID,
}
},
)
notebook_1 >> notebook_2 >> task_operator_nb_1 >> sql_query
在此示例中,Airflow 将生成一个名为 <dag_name>.test_workflow_<USER>_<GROUP_ID>
的作业,该作业将运行任务 notebook_1
,然后运行 notebook_2
。如果该作业在 databricks 工作区中不存在,则将创建该作业。如果该作业已存在,则将更新该作业以匹配 DAG 中定义的工作流。
下图显示了 Airflow UI 中生成的 Databricks Workflow(基于上面提供的示例)¶
下面描绘了在 Databricks UI 中,由 Airflow DAG 触发的运行所对应的 Databricks Workflow¶
为了最大限度地减少更新冲突,我们建议您尽可能将参数保留在 DatabricksWorkflowTaskGroup
的 notebook_params
中,而不是 DatabricksNotebookOperator
中。这是因为,在作业触发时会传入 DatabricksWorkflowTaskGroup
中的任务,并且不会修改作业定义。