DatabricksSqlOperator

使用 DatabricksSqlOperatorDatabricks SQL 仓库Databricks 集群 上执行 SQL。

使用操作符

操作符针对已配置的仓库执行给定的 SQL 查询。唯一必需的参数是

  • sql - 要执行的 SQL 查询。有 3 种指定 SQL 查询的方法

    1. 包含 SQL 语句的简单字符串。

    2. 表示 SQL 语句的字符串列表。

    3. 包含 SQL 查询的文件名。文件必须具有 .sql 扩展名。每个查询应以 ;<new_line> 结尾

  • sql_warehouse_name(要使用的 Databricks SQL 仓库的名称)或 http_path(Databricks SQL 仓库或 Databricks 集群的 HTTP 路径)之一。

其他参数是可选的,可以在类文档中找到。

示例

选择数据

以下是如何使用 DatabricksSqlOperator 从表中选择数据的示例

tests/system/providers/databricks/example_databricks_sql.py[源代码]

    # Example of using the Databricks SQL Operator to select data.
    select = DatabricksSqlOperator(
        databricks_conn_id=connection_id,
        sql_endpoint_name=sql_endpoint_name,
        task_id="select_data",
        sql="select * from default.my_airflow_table",
    )

将数据选择到文件中

以下是使用 DatabricksSqlOperator 从表中选择数据并存储到文件中的示例

tests/system/providers/databricks/example_databricks_sql.py[源代码]

    # Example of using the Databricks SQL Operator to select data into a file with JSONL format.
    select_into_file = DatabricksSqlOperator(
        databricks_conn_id=connection_id,
        sql_endpoint_name=sql_endpoint_name,
        task_id="select_data_into_file",
        sql="select * from default.my_airflow_table",
        output_path="/tmp/1.jsonl",
        output_format="jsonl",
    )

执行多条语句

以下是使用 DatabricksSqlOperator 执行多条 SQL 语句的示例

tests/system/providers/databricks/example_databricks_sql.py[源代码]

    # Example of using the Databricks SQL Operator to perform multiple operations.
    create = DatabricksSqlOperator(
        databricks_conn_id=connection_id,
        sql_endpoint_name=sql_endpoint_name,
        task_id="create_and_populate_table",
        sql=[
            "drop table if exists default.my_airflow_table",
            "create table default.my_airflow_table(id int, v string)",
            "insert into default.my_airflow_table values (1, 'test 1'), (2, 'test 2')",
        ],
    )

从文件中执行多条语句

以下是使用 DatabricksSqlOperator 从文件中执行语句的示例

tests/system/providers/databricks/example_databricks_sql.py[源代码]

    # Example of using the Databricks SQL Operator to select data.
    # SQL statements should be in the file with name test.sql
    create_file = DatabricksSqlOperator(
        databricks_conn_id=connection_id,
        sql_endpoint_name=sql_endpoint_name,
        task_id="create_and_populate_from_file",
        sql="test.sql",
    )

DatabricksSqlSensor

使用 DatabricksSqlSensor 为可通过 Databricks SQL 仓库或交互式集群访问的表运行传感器。

使用传感器

传感器执行用户提供的 SQL 语句。唯一必需的参数是

  • sql - 为传感器执行的 SQL 查询。

  • sql_warehouse_name(要使用的 Databricks SQL 仓库的名称)或 http_path(Databricks SQL 仓库或 Databricks 集群的 HTTP 路径)之一。

其他参数是可选的,可以在类文档中找到。

示例

配置要与传感器一起使用的 Databricks 连接。

tests/system/providers/databricks/example_databricks_sensors.py[源代码]

# Connection string setup for Databricks workspace.
connection_id = "databricks_default"
sql_warehouse_name = "Starter Warehouse"

使用 SQL 语句探测特定表

tests/system/providers/databricks/example_databricks_sensors.py[源代码]

# Example of using the Databricks SQL Sensor to check existence of data in a table.
sql_sensor = DatabricksSqlSensor(
    databricks_conn_id=connection_id,
    sql_warehouse_name=sql_warehouse_name,
    catalog="hive_metastore",
    task_id="sql_sensor_task",
    sql="select * from hive_metastore.temp.sample_table_3 limit 1",
    timeout=60 * 2,
)

DatabricksPartitionSensor

传感器是一种特殊类型的操作符,旨在执行一项任务 - 等待某事发生。它可以基于时间、等待文件或外部事件,但它们所做的只是等待某事发生,然后成功,以便其下游任务可以运行。

对于 Databricks 分区传感器,我们检查分区及其相关值是否存在,如果不存在,它将等待分区值到达。等待时间和检查间隔可以在 timeout 和 poke_interval 参数中配置。

使用 DatabricksPartitionSensor 为可通过 Databricks SQL 仓库或交互式集群访问的表运行传感器。

使用传感器

传感器接受表名和分区名(名称)、用户的值(值),并生成 SQL 查询来检查指定的表中是否存在指定的分区名、值(值)。

必需的参数是

  • table_name(用于分区检查的表名)。

  • partitions(要检查的分区名)。

  • partition_operator(分区的比较运算符,用于值的范围或限制,例如 partition_name >= partition_value)。Databricks 比较运算符受支持。

  • sql_warehouse_name(要使用的 Databricks SQL 仓库的名称)或 http_path(Databricks SQL 仓库或 Databricks 集群的 HTTP 路径)之一。

其他参数是可选的,可以在类文档中找到。

示例

配置要与传感器一起使用的 Databricks 连接。

tests/system/providers/databricks/example_databricks_sensors.py[源代码]

# Connection string setup for Databricks workspace.
connection_id = "databricks_default"
sql_warehouse_name = "Starter Warehouse"

检查特定表是否存在数据/分区

tests/system/providers/databricks/example_databricks_sensors.py[源代码]

# Example of using the Databricks Partition Sensor to check the presence
# of the specified partition(s) in a table.
partition_sensor = DatabricksPartitionSensor(
    databricks_conn_id=connection_id,
    sql_warehouse_name=sql_warehouse_name,
    catalog="hive_metastore",
    task_id="partition_sensor_task",
    table_name="sample_table_2",
    schema="temp",
    partitions={"date": "2023-01-03", "name": ["abc", "def"]},
    partition_operator="=",
    timeout=60 * 2,
)

此条目是否有帮助?