使用 SQLExecuteQueryOperator 连接到 Trino

使用 SQLExecuteQueryOperatorTrino 查询引擎中执行 SQL 命令。

警告

TrinoOperator 已弃用,建议使用 SQLExecuteQueryOperator。如果你正在使用 TrinoOperator,你应该尽快迁移。

使用算子

使用 trino_conn_id 参数连接到你的 Trino 实例

以下是如何使用 SQLExecuteQueryOperator 连接到 Trino 的示例

tests/system/providers/trino/example_trino.py[源代码]


with models.DAG(
    dag_id="example_trino",
    schedule="@once",  # Override to match your needs
    start_date=datetime(2022, 1, 1),
    catchup=False,
    tags=["example"],
) as dag:
    trino_create_schema = SQLExecuteQueryOperator(
        task_id="trino_create_schema",
        sql=f"CREATE SCHEMA IF NOT EXISTS {SCHEMA} WITH (location = 's3://irisbkt/cities/');",
        handler=list,
    )
    trino_create_table = SQLExecuteQueryOperator(
        task_id="trino_create_table",
        sql=f"""CREATE TABLE IF NOT EXISTS {SCHEMA}.{TABLE}(
        cityid bigint,
        cityname varchar
        )""",
        handler=list,
    )
    trino_insert = SQLExecuteQueryOperator(
        task_id="trino_insert",
        sql=f"""INSERT INTO {SCHEMA}.{TABLE} VALUES (1, 'San Francisco');""",
        handler=list,
    )
    trino_multiple_queries = SQLExecuteQueryOperator(
        task_id="trino_multiple_queries",
        sql=f"""CREATE TABLE IF NOT EXISTS {SCHEMA}.{TABLE1}(cityid bigint,cityname varchar);
        INSERT INTO {SCHEMA}.{TABLE1} VALUES (2, 'San Jose');
        CREATE TABLE IF NOT EXISTS {SCHEMA}.{TABLE2}(cityid bigint,cityname varchar);
        INSERT INTO {SCHEMA}.{TABLE2} VALUES (3, 'San Diego');""",
        handler=list,
    )
    trino_templated_query = SQLExecuteQueryOperator(
        task_id="trino_templated_query",
        sql="SELECT * FROM {{ params.SCHEMA }}.{{ params.TABLE }}",
        handler=list,
        params={"SCHEMA": SCHEMA, "TABLE": TABLE1},
    )
    trino_parameterized_query = SQLExecuteQueryOperator(
        task_id="trino_parameterized_query",
        sql=f"select * from {SCHEMA}.{TABLE2} where cityname = ?",
        parameters=("San Diego",),
        handler=list,
    )

    (
        trino_create_schema
        >> trino_create_table
        >> trino_insert
        >> trino_multiple_queries
        >> trino_templated_query
        >> trino_parameterized_query
    )

注意

此算子可用于运行任何语法正确的 Trino 查询,并且可以使用 liststring 传递多个查询

此条目是否有用?