TeradataToTeradataOperator

TeradataToTeradataOperator 的目的是定义涉及两个 Teradata 实例之间的数据传输的任务。使用 TeradataToTeradataOperator 在两个 Teradata 实例之间传输数据。

在两个 Teradata 实例之间传输数据

要在两个 Teradata 实例之间传输数据,请使用 TeradataToTeradataOperator

TeradataToTeradataOperator 的示例用法如下

tests/system/providers/teradata/example_teradata_to_teradata_transfer.py[源代码]

    transfer_data = TeradataToTeradataOperator(
        task_id="transfer_data",
        dest_teradata_conn_id="teradata_default",
        destination_table="my_users_dest",
        source_teradata_conn_id="teradata_default",
        sql="select * from my_users_src",
        sql_params={},
        rows_chunk=2,
    )

完整的 TeradataToTeradata 传输操作符 DAG

当我们将所有内容放在一起时,我们的 DAG 应如下所示

tests/system/providers/teradata/example_teradata.py[源代码]



ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "example_teradata"
CONN_ID = "teradata_default"

with DAG(
    dag_id=DAG_ID,
    start_date=datetime.datetime(2020, 2, 2),
    schedule="@once",
    catchup=False,
    default_args={"teradata_conn_id": CONN_ID},
) as dag:
    create_table = TeradataOperator(
        task_id="create_table",
        sql=r"""
        CREATE TABLE Country (
            country_id INTEGER,
            name CHAR(25),
            continent CHAR(25)
        );
        """,
    )
    create_table_from_external_file = TeradataOperator(
        task_id="create_table_from_external_file",
        sql="create_table.sql",
        dag=dag,
    )
    populate_table = TeradataOperator(
        task_id="populate_table",
        sql=r"""
        INSERT INTO Users (username, description)
            VALUES ( 'Danny', 'Musician');
        INSERT INTO Users (username, description)
            VALUES ( 'Simone', 'Chef');
        INSERT INTO Users (username, description)
            VALUES ( 'Lily', 'Florist');
        INSERT INTO Users (username, description)
            VALUES ( 'Tim', 'Pet shop owner');
        """,
    )
    get_all_countries = TeradataOperator(
        task_id="get_all_countries",
        sql=r"SELECT * FROM Country;",
    )
    get_countries_from_continent = TeradataOperator(
        task_id="get_countries_from_continent",
        sql=r"SELECT * FROM Country WHERE {{ params.column }}='{{ params.value }}';",
        params={"column": "continent", "value": "Asia"},
    )
    drop_country_table = TeradataOperator(
        task_id="drop_country_table",
        sql=r"DROP TABLE Country;",
        dag=dag,
    )
    drop_users_table = TeradataOperator(
        task_id="drop_users_table",
        sql=r"DROP TABLE Users;",
        dag=dag,
    )
    create_schema = TeradataOperator(
        task_id="create_schema",
        sql=r"CREATE DATABASE airflow_temp AS PERM=10e6;",
    )
    create_table_with_schema = TeradataOperator(
        task_id="create_table_with_schema",
        sql=r"""
        CREATE TABLE schema_table (
           country_id INTEGER,
           name CHAR(25),
           continent CHAR(25)
        );
        """,
        schema="airflow_temp",
    )
    drop_schema_table = TeradataOperator(
        task_id="drop_schema_table",
        sql=r"DROP TABLE schema_table;",
        dag=dag,
        schema="airflow_temp",
    )
    drop_schema = TeradataOperator(
        task_id="drop_schema",
        sql=r"DROP DATABASE airflow_temp;",
        dag=dag,
    )
    (
        create_table
        >> create_table_from_external_file
        >> populate_table
        >> get_all_countries
        >> get_countries_from_continent
        >> drop_country_table
        >> drop_users_table
        >> create_schema
        >> create_table_with_schema
        >> drop_schema_table
        >> drop_schema
    )

此条目是否有用?