DatabricksSubmitRunOperator¶
使用 DatabricksSubmitRunOperator
通过 Databricks api/2.1/jobs/runs/submit API 端点提交新的 Databricks 作业。
使用操作符¶
有三种方法可以实例化此操作符。第一种方法,您可以获取通常用于调用 api/2.1/jobs/runs/submit
端点的 JSON 有效负载,并通过 json
参数直接传递给我们的 DatabricksSubmitRunOperator
。使用此方法,您可以完全控制 Jobs REST API 的底层有效负载,包括执行具有多个任务的 Databricks 作业,但由于缺少类型检查,因此更难以检测错误。
json = {
"new_cluster": {"spark_version": "2.1.0-db3-scala2.11", "num_workers": 2},
"notebook_task": {
"notebook_path": "/Users/[email protected]/PrepareData",
},
}
notebook_run = DatabricksSubmitRunOperator(task_id="notebook_run", json=json)
完成相同操作的第二种方法是直接使用 DatabricksSubmitRunOperator
的命名参数。请注意,runs/submit
端点中的每个顶级参数都有且只有一个命名参数。使用命名参数时,您必须指定以下内容
任务规范 - 它应该是以下之一
spark_jar_task
- JAR 任务的主类和参数notebook_task
- 任务的笔记本路径和参数spark_python_task
- python 文件路径和使用 python 文件运行的参数spark_submit_task
- 运行spark-submit
命令所需的参数pipeline_task
- 运行 Delta Live Tables 管道所需的参数dbt_task
- 运行 dbt 项目所需的参数
集群规范 - 它应该是以下之一:*
new_cluster
- 将在此任务上运行的新集群的规范 *existing_cluster_id
- 用于运行此任务的现有集群的 IDpipeline_task
- 可以引用pipeline_id
或pipeline_name
如果同时提供了 json 参数 **和** 命名参数,它们将被合并在一起。如果在合并期间出现冲突,则命名参数将优先并覆盖顶级的 json
键。
- 目前
DatabricksSubmitRunOperator
支持的命名参数是 spark_jar_task
notebook_task
spark_python_task
spark_submit_task
pipeline_task
dbt_task
git_source
new_cluster
existing_cluster_id
libraries
run_name
timeout_seconds
new_cluster = {"spark_version": "10.1.x-scala2.12", "num_workers": 2}
notebook_task = {
"notebook_path": "/Users/[email protected]/PrepareData",
}
notebook_run = DatabricksSubmitRunOperator(
task_id="notebook_run", new_cluster=new_cluster, notebook_task=notebook_task
)
另一种方法是使用 param tasks 将对象数组传递给实例化此操作符。这里,用于调用 api/2.1/jobs/runs/submit
端点的 tasks 参数的值通过 DatabricksSubmitRunOperator
中的 tasks
参数传递。您可以传递任务数组并提交一次性运行,而不是调用单个任务。
tasks = [
{
"new_cluster": {"spark_version": "2.1.0-db3-scala2.11", "num_workers": 2},
"notebook_task": {"notebook_path": "/Users/[email protected]/PrepareData"},
}
]
notebook_run = DatabricksSubmitRunOperator(task_id="notebook_run", tasks=tasks)
示例¶
将参数指定为 JSON¶
DatabricksSubmitRunOperator 的用法示例如下
# Example of using the JSON parameter to initialize the operator.
new_cluster = {
"spark_version": "9.1.x-scala2.12",
"node_type_id": "r3.xlarge",
"aws_attributes": {"availability": "ON_DEMAND"},
"num_workers": 8,
}
notebook_task_params = {
"new_cluster": new_cluster,
"notebook_task": {
"notebook_path": "/Users/[email protected]/PrepareData",
},
}
notebook_task = DatabricksSubmitRunOperator(task_id="notebook_task", json=notebook_task_params)
使用命名参数¶
您还可以使用命名参数初始化操作符并运行作业。
# Example of using the named parameters of DatabricksSubmitRunOperator
# to initialize the operator.
spark_jar_task = DatabricksSubmitRunOperator(
task_id="spark_jar_task",
new_cluster=new_cluster,
spark_jar_task={"main_class_name": "com.example.ProcessData"},
libraries=[{"jar": "dbfs:/lib/etl-0.1.jar"}],
)
DatabricksSubmitRunDeferrableOperator¶
DatabricksSubmitRunOperator
操作符的可延迟版本。
它允许使用 Airflow 2.2.0 中引入的新功能 更有效地利用 Airflow 工作人员