DatabricksNotebookOperator

使用 DatabricksNotebookOperator 在 Databricks 上启动和监控笔记本作业运行,作为 Airflow 任务。

示例

在 Databricks 中的新集群上运行笔记本

tests/system/providers/databricks/example_databricks.py[源代码]

    new_cluster_spec = {
        "cluster_name": "",
        "spark_version": "11.3.x-scala2.12",
        "aws_attributes": {
            "first_on_demand": 1,
            "availability": "SPOT_WITH_FALLBACK",
            "zone_id": "us-east-2b",
            "spot_bid_price_percent": 100,
            "ebs_volume_count": 0,
        },
        "node_type_id": "i3.xlarge",
        "spark_env_vars": {"PYSPARK_PYTHON": "/databricks/python3/bin/python3"},
        "enable_elastic_disk": False,
        "data_security_mode": "LEGACY_SINGLE_USER_STANDARD",
        "runtime_engine": "STANDARD",
        "num_workers": 8,
    }

    notebook_1 = DatabricksNotebookOperator(
        task_id="notebook_1",
        notebook_path="/Shared/Notebook_1",
        notebook_packages=[
            {
                "pypi": {
                    "package": "simplejson==3.18.0",
                    "repo": "https://pypi.ac.cn/simple",
                }
            },
            {"pypi": {"package": "Faker"}},
        ],
        source="WORKSPACE",
        new_cluster=new_cluster_spec,
    )

在 Databricks 中的现有集群上运行笔记本

tests/system/providers/databricks/example_databricks.py[源代码]

    notebook_2 = DatabricksNotebookOperator(
        task_id="notebook_2",
        notebook_path="/Shared/Notebook_2",
        notebook_packages=[
            {
                "pypi": {
                    "package": "simplejson==3.18.0",
                    "repo": "https://pypi.ac.cn/simple",
                }
            },
        ],
        source="WORKSPACE",
        existing_cluster_id="existing_cluster_id",
    )

此条目有帮助吗?