DatabricksCreateJobsOperator

使用 DatabricksCreateJobsOperator 创建(或重置)Databricks 作业。此操作符依赖于过去的 XCom 来记住创建的 job_id,以便使用此操作符进行的重复调用将更新现有作业,而不是创建新作业。与 DatabricksRunNowOperator 配对时,所有运行都将在 Databricks UI 中的同一作业下进行。

使用操作符

有三种方法可以实例化此操作符。在第一种方法中,你可以获取通常用于调用 api/2.1/jobs/create 端点的 JSON 有效负载,并通过 json 参数将其直接传递给我们的 DatabricksCreateJobsOperator。通过这种方法,你可以完全控制作业 REST API 的底层有效负载,包括执行具有多个任务的 Databricks 作业,但由于缺少类型检查,因此更难检测错误。

完成相同操作的第二种方法是直接使用 DatabricksCreateJobsOperator 的命名参数。请注意,api/2.1/jobs/create 端点中每个顶级参数恰好有一个命名参数。

第三种方法是同时使用 json 参数命名参数。它们将合并在一起。如果合并期间发生冲突,则命名参数将优先并覆盖顶级 json 键。

目前 DatabricksCreateJobsOperator 支持的命名参数是
  • name

  • description

  • tags

  • tasks

  • job_clusters

  • email_notifications

  • webhook_notifications

  • notification_settings

  • timeout_seconds

  • schedule

  • max_concurrent_runs

  • git_source

  • access_control_list

示例

将参数指定为 JSON

DatabricksCreateJobsOperator 的示例用法如下

tests/system/providers/databricks/example_databricks.py[源代码]

    # Example of using the JSON parameter to initialize the operator.
    job = {
        "tasks": [
            {
                "task_key": "test",
                "job_cluster_key": "job_cluster",
                "notebook_task": {
                    "notebook_path": "/Shared/test",
                },
            },
        ],
        "job_clusters": [
            {
                "job_cluster_key": "job_cluster",
                "new_cluster": {
                    "spark_version": "7.3.x-scala2.12",
                    "node_type_id": "i3.xlarge",
                    "num_workers": 2,
                },
            },
        ],
    }

    jobs_create_json = DatabricksCreateJobsOperator(task_id="jobs_create_json", json=job)

使用命名参数

你还可以使用命名参数来初始化操作符并运行作业。

tests/system/providers/databricks/example_databricks.py[源代码]

    # Example of using the named parameters to initialize the operator.
    tasks = [
        {
            "task_key": "test",
            "job_cluster_key": "job_cluster",
            "notebook_task": {
                "notebook_path": "/Shared/test",
            },
        },
    ]
    job_clusters = [
        {
            "job_cluster_key": "job_cluster",
            "new_cluster": {
                "spark_version": "7.3.x-scala2.12",
                "node_type_id": "i3.xlarge",
                "num_workers": 2,
            },
        },
    ]

    jobs_create_named = DatabricksCreateJobsOperator(
        task_id="jobs_create_named", tasks=tasks, job_clusters=job_clusters
    )

与 DatabricksRunNowOperator 配合使用

你可以将 DatabricksCreateJobsOperator 在 return_value XCom 中返回的 job_id 作为参数用于 DatabricksRunNowOperator 来运行作业。

tests/system/providers/databricks/example_databricks.py[源代码]

    # Example of using the DatabricksRunNowOperator after creating a job with DatabricksCreateJobsOperator.
    run_now = DatabricksRunNowOperator(
        task_id="run_now", job_id="{{ ti.xcom_pull(task_ids='jobs_create_named') }}"
    )

    jobs_create_named >> run_now

此条目有帮助吗?