DatabricksSubmitRunOperator¶
使用 DatabricksSubmitRunOperator
通过 Databricks api/2.1/jobs/runs/submit API 端点提交新的 Databricks 作业。
使用操作符¶
有三种方法可以实例化此操作符。在第一种方法中,你可以获取通常用于调用 api/2.1/jobs/runs/submit
端点的 JSON 有效负载,并通过 json
参数直接将其传递给我们的 DatabricksSubmitRunOperator
。通过这种方法,你可以完全控制 Jobs REST API 的底层有效负载,包括执行具有多个任务的 Databricks 作业,但由于缺少类型检查,因此更难检测错误。
json = {
"new_cluster": {"spark_version": "2.1.0-db3-scala2.11", "num_workers": 2},
"notebook_task": {
"notebook_path": "/Users/[email protected]/PrepareData",
},
}
notebook_run = DatabricksSubmitRunOperator(task_id="notebook_run", json=json)
完成相同操作的第二种方法是直接使用 DatabricksSubmitRunOperator
的命名参数。请注意,runs/submit
端点中的每个顶级参数都正好有一个命名参数。使用命名参数时,您必须指定以下内容
任务规范 - 它应该是以下之一
spark_jar_task
- JAR 任务的主类和参数notebook_task
- 任务的笔记本路径和参数spark_python_task
- 使用 Python 文件路径和参数运行 Python 文件spark_submit_task
- 运行spark-submit
命令所需的参数pipeline_task
- 运行 Delta Live Tables 管道所需的参数dbt_task
- 运行 dbt 项目所需的参数
集群规范 - 它应该是以下之一:*
new_cluster
- 将在此任务上运行的新集群的规范 *existing_cluster_id
- 在其上运行此任务的现有集群的 IDpipeline_task
- 可以引用pipeline_id
或pipeline_name
在同时提供 json 参数和命名参数的情况下,它们将合并在一起。如果在合并过程中发生冲突,则命名参数将优先并覆盖顶级 json
键。
- 目前
DatabricksSubmitRunOperator
支持的命名参数是 spark_jar_task
notebook_task
spark_python_task
spark_submit_task
pipeline_task
dbt_task
git_source
new_cluster
existing_cluster_id
库
运行名称
超时秒数
new_cluster = {"spark_version": "10.1.x-scala2.12", "num_workers": 2}
notebook_task = {
"notebook_path": "/Users/[email protected]/PrepareData",
}
notebook_run = DatabricksSubmitRunOperator(
task_id="notebook_run", new_cluster=new_cluster, notebook_task=notebook_task
)
另一种方法是使用 param 任务将对象数组传递给此运算符进行实例化。此处用于调用 api/2.1/jobs/runs/submit
端点的 tasks param 的值通过 tasks
param 在 DatabricksSubmitRunOperator
中传递。你可以传递任务数组并提交一次性运行,而不是调用单个任务。
tasks = [
{
"new_cluster": {"spark_version": "2.1.0-db3-scala2.11", "num_workers": 2},
"notebook_task": {"notebook_path": "/Users/[email protected]/PrepareData"},
}
]
notebook_run = DatabricksSubmitRunOperator(task_id="notebook_run", tasks=tasks)
示例¶
将参数指定为 JSON¶
DatabricksSubmitRunOperator 的示例用法如下
# Example of using the JSON parameter to initialize the operator.
new_cluster = {
"spark_version": "9.1.x-scala2.12",
"node_type_id": "r3.xlarge",
"aws_attributes": {"availability": "ON_DEMAND"},
"num_workers": 8,
}
notebook_task_params = {
"new_cluster": new_cluster,
"notebook_task": {
"notebook_path": "/Users/[email protected]/PrepareData",
},
}
notebook_task = DatabricksSubmitRunOperator(task_id="notebook_task", json=notebook_task_params)
使用命名参数¶
你还可以使用命名参数来初始化运算符并运行作业。
# Example of using the named parameters of DatabricksSubmitRunOperator
# to initialize the operator.
spark_jar_task = DatabricksSubmitRunOperator(
task_id="spark_jar_task",
new_cluster=new_cluster,
spark_jar_task={"main_class_name": "com.example.ProcessData"},
libraries=[{"jar": "dbfs:/lib/etl-0.1.jar"}],
)
DatabricksSubmitRunDeferrableOperator¶
DatabricksSubmitRunOperator
运算符的可延迟版本。
它允许使用 Airflow 2.2.0 中引入的新功能 更有效地利用 Airflow worker。