DatabricksSubmitRunOperator

使用 DatabricksSubmitRunOperator 通过 Databricks api/2.1/jobs/runs/submit API 端点提交新的 Databricks 作业。

使用操作符

有三种方法可以实例化此操作符。在第一种方法中,你可以获取通常用于调用 api/2.1/jobs/runs/submit 端点的 JSON 有效负载,并通过 json 参数直接将其传递给我们的 DatabricksSubmitRunOperator。通过这种方法,你可以完全控制 Jobs REST API 的底层有效负载,包括执行具有多个任务的 Databricks 作业,但由于缺少类型检查,因此更难检测错误。

json = {
    "new_cluster": {"spark_version": "2.1.0-db3-scala2.11", "num_workers": 2},
    "notebook_task": {
        "notebook_path": "/Users/[email protected]/PrepareData",
    },
}
notebook_run = DatabricksSubmitRunOperator(task_id="notebook_run", json=json)

完成相同操作的第二种方法是直接使用 DatabricksSubmitRunOperator 的命名参数。请注意,runs/submit 端点中的每个顶级参数都正好有一个命名参数。使用命名参数时,您必须指定以下内容

  • 任务规范 - 它应该是以下之一

    • spark_jar_task - JAR 任务的主类和参数

    • notebook_task - 任务的笔记本路径和参数

    • spark_python_task - 使用 Python 文件路径和参数运行 Python 文件

    • spark_submit_task - 运行 spark-submit 命令所需的参数

    • pipeline_task - 运行 Delta Live Tables 管道所需的参数

    • dbt_task - 运行 dbt 项目所需的参数

  • 集群规范 - 它应该是以下之一:* new_cluster - 将在此任务上运行的新集群的规范 * existing_cluster_id - 在其上运行此任务的现有集群的 ID

  • pipeline_task - 可以引用 pipeline_idpipeline_name

在同时提供 json 参数命名参数的情况下,它们将合并在一起。如果在合并过程中发生冲突,则命名参数将优先并覆盖顶级 json 键。

目前 DatabricksSubmitRunOperator 支持的命名参数是
  • spark_jar_task

  • notebook_task

  • spark_python_task

  • spark_submit_task

  • pipeline_task

  • dbt_task

  • git_source

  • new_cluster

  • existing_cluster_id

  • 运行名称

  • 超时秒数

new_cluster = {"spark_version": "10.1.x-scala2.12", "num_workers": 2}
notebook_task = {
    "notebook_path": "/Users/[email protected]/PrepareData",
}
notebook_run = DatabricksSubmitRunOperator(
    task_id="notebook_run", new_cluster=new_cluster, notebook_task=notebook_task
)

另一种方法是使用 param 任务将对象数组传递给此运算符进行实例化。此处用于调用 api/2.1/jobs/runs/submit 端点的 tasks param 的值通过 tasks param 在 DatabricksSubmitRunOperator 中传递。你可以传递任务数组并提交一次性运行,而不是调用单个任务。

tasks = [
    {
        "new_cluster": {"spark_version": "2.1.0-db3-scala2.11", "num_workers": 2},
        "notebook_task": {"notebook_path": "/Users/[email protected]/PrepareData"},
    }
]
notebook_run = DatabricksSubmitRunOperator(task_id="notebook_run", tasks=tasks)

示例

将参数指定为 JSON

DatabricksSubmitRunOperator 的示例用法如下

tests/system/providers/databricks/example_databricks.py[源代码]

    # Example of using the JSON parameter to initialize the operator.
    new_cluster = {
        "spark_version": "9.1.x-scala2.12",
        "node_type_id": "r3.xlarge",
        "aws_attributes": {"availability": "ON_DEMAND"},
        "num_workers": 8,
    }

    notebook_task_params = {
        "new_cluster": new_cluster,
        "notebook_task": {
            "notebook_path": "/Users/[email protected]/PrepareData",
        },
    }

    notebook_task = DatabricksSubmitRunOperator(task_id="notebook_task", json=notebook_task_params)

使用命名参数

你还可以使用命名参数来初始化运算符并运行作业。

tests/system/providers/databricks/example_databricks.py[源代码]

    # Example of using the named parameters of DatabricksSubmitRunOperator
    # to initialize the operator.
    spark_jar_task = DatabricksSubmitRunOperator(
        task_id="spark_jar_task",
        new_cluster=new_cluster,
        spark_jar_task={"main_class_name": "com.example.ProcessData"},
        libraries=[{"jar": "dbfs:/lib/etl-0.1.jar"}],
    )

DatabricksSubmitRunDeferrableOperator

DatabricksSubmitRunOperator 运算符的可延迟版本。

它允许使用 Airflow 2.2.0 中引入的新功能 更有效地利用 Airflow worker。

此条目是否有用?