Trino 到 Google Cloud Storage 传输算子

Trino 是一款开源、快速、分布式 SQL 查询引擎,用于针对从千兆字节到拍字节大小的数据源运行交互式分析查询。Trino 允许在数据所在的位置查询数据,包括 Hive、Cassandra、关系数据库,甚至是专有数据存储。单个 Trino 查询可以组合来自多个源的数据,从而实现整个组织的分析。

Google Cloud Storage 允许随时随地存储和检索任意数量的数据。您可以使用它来存储备份和存档数据,以及BigQuery 的数据源

数据传输

使用 TrinoToGCSOperator 运算符在 Trino 和 Google Storage 之间传输文件。

此运算符有 3 个必需参数

  • sql - 要执行的 SQL。

  • bucket - 要上传到的存储分区。

  • filename - 上传到 Google Cloud Storage 时用作对象名称的文件名。应在文件名中指定 {},以便在文件因大小而拆分时允许运算符注入文件号。

所有参数都在参考文档中进行了描述 - TrinoToGCSOperator

示例运算符调用可能如下所示

tests/system/providers/google/cloud/gcs/example_trino_to_gcs.py[source]

trino_to_gcs_basic = TrinoToGCSOperator(
    task_id="trino_to_gcs_basic",
    sql=f"select * from {SOURCE_SCHEMA_COLUMNS}",
    bucket=GCS_BUCKET,
    filename=f"{safe_name(SOURCE_SCHEMA_COLUMNS)}.{{}}.json",
)

数据格式选择

该运算符支持两种输出格式

  • json - JSON Lines(默认)

  • csv

您可以通过 export_format 参数指定这些选项。

如果您希望创建 CSV 文件,您的运算符调用可能如下所示

tests/system/providers/google/cloud/gcs/example_trino_to_gcs.py[source]

trino_to_gcs_csv = TrinoToGCSOperator(
    task_id="trino_to_gcs_csv",
    sql=f"select * from {SOURCE_SCHEMA_COLUMNS}",
    bucket=GCS_BUCKET,
    filename=f"{safe_name(SOURCE_SCHEMA_COLUMNS)}.{{}}.csv",
    schema_filename=f"{safe_name(SOURCE_SCHEMA_COLUMNS)}-schema.json",
    export_format="csv",
)

生成 BigQuery 架构

如果设置 schema_filename 参数,将从数据库中转储包含表格的 BigQuery 模式字段的 .json 文件并上传到存储分区。

如果要创建模式文件,则示例运算符调用可能如下所示

tests/system/providers/google/cloud/gcs/example_trino_to_gcs.py[source]

trino_to_gcs_multiple_types = TrinoToGCSOperator(
    task_id="trino_to_gcs_multiple_types",
    sql=f"select * from {SOURCE_SCHEMA_COLUMNS}",
    bucket=GCS_BUCKET,
    filename=f"{safe_name(SOURCE_SCHEMA_COLUMNS)}.{{}}.json",
    schema_filename=f"{safe_name(SOURCE_SCHEMA_COLUMNS)}-schema.json",
    gzip=False,
)

有关 BigQuery 模式的更多信息,请参阅 Big Query 文档中的 指定模式

将结果划分为多个文件

此运算符支持将较大的结果拆分为多个文件的能力。 approx_max_file_size_bytes 参数允许开发者指定拆分的文件大小。默认情况下,文件不超过 1 900 000 000 字节(1900 MB)

查看 Google Cloud Storage 中的配额和限制 以了解单个对象的允许最大文件大小。

如果要创建 10 MB 文件,您的代码可能如下所示

tests/system/providers/google/cloud/gcs/example_trino_to_gcs.py[source]

read_data_from_gcs_many_chunks = BigQueryInsertJobOperator(
    task_id="read_data_from_gcs_many_chunks",
    configuration={
        "query": {
            "query": f"SELECT COUNT(*) FROM `{GCP_PROJECT_ID}.{DATASET_NAME}."
            f"{safe_name(SOURCE_CUSTOMER_TABLE)}`",
            "useLegacySql": False,
        }
    },
)

使用 BigQuery 查询数据

Google Cloud Storage 中可用的数据可由 BigQuery 使用。您可以将数据加载到 BigQuery 或直接在查询中引用 GCS 数据。有关将数据加载到 BigQuery 的信息,请参阅 BigQuery 文档中的 从 Cloud Storage 加载数据的简介。有关查询 GCS 数据的信息,请参阅 BigQuery 文档中的 查询 Cloud Storage 数据

Airflow 还具有许多允许你创建使用 BigQuery 的运算符。例如,如果你想创建一个外部表,它允许你创建直接从 GCS 读取数据的查询,那么你可以使用 BigQueryCreateExternalTableOperator。使用此运算符如下所示

tests/system/providers/google/cloud/gcs/example_trino_to_gcs.py[source]

create_external_table_multiple_types = BigQueryCreateExternalTableOperator(
    task_id="create_external_table_multiple_types",
    bucket=GCS_BUCKET,
    table_resource={
        "tableReference": {
            "projectId": GCP_PROJECT_ID,
            "datasetId": DATASET_NAME,
            "tableId": f"{safe_name(SOURCE_SCHEMA_COLUMNS)}",
        },
        "schema": {
            "fields": [
                {"name": "table_catalog", "type": "STRING"},
                {"name": "table_schema", "type": "STRING"},
                {"name": "table_name", "type": "STRING"},
                {"name": "column_name", "type": "STRING"},
                {"name": "ordinal_position", "type": "INT64"},
                {"name": "column_default", "type": "STRING"},
                {"name": "is_nullable", "type": "STRING"},
                {"name": "data_type", "type": "STRING"},
            ],
        },
        "externalDataConfiguration": {
            "sourceFormat": "NEWLINE_DELIMITED_JSON",
            "compression": "NONE",
            "sourceUris": [f"gs://{GCS_BUCKET}/{safe_name(SOURCE_SCHEMA_COLUMNS)}.*.json"],
        },
    },
    source_objects=[f"{safe_name(SOURCE_SCHEMA_COLUMNS)}.*.json"],
    schema_object=f"{safe_name(SOURCE_SCHEMA_COLUMNS)}-schema.json",
)

有关 Airflow 和 BigQuery 集成的更多信息,请查看 Python API 参考 - bigquery

参考

有关更多信息,请查看

此条目是否有用?