Trino 到 Google Cloud Storage 传输算子¶
Trino 是一款开源、快速、分布式 SQL 查询引擎,用于针对从千兆字节到拍字节大小的数据源运行交互式分析查询。Trino 允许在数据所在的位置查询数据,包括 Hive、Cassandra、关系数据库,甚至是专有数据存储。单个 Trino 查询可以组合来自多个源的数据,从而实现整个组织的分析。
Google Cloud Storage 允许随时随地存储和检索任意数量的数据。您可以使用它来存储备份和存档数据,以及BigQuery 的数据源。
数据传输¶
使用 TrinoToGCSOperator
运算符在 Trino 和 Google Storage 之间传输文件。
此运算符有 3 个必需参数
sql
- 要执行的 SQL。bucket
- 要上传到的存储分区。filename
- 上传到 Google Cloud Storage 时用作对象名称的文件名。应在文件名中指定{}
,以便在文件因大小而拆分时允许运算符注入文件号。
所有参数都在参考文档中进行了描述 - TrinoToGCSOperator
。
示例运算符调用可能如下所示
trino_to_gcs_basic = TrinoToGCSOperator(
task_id="trino_to_gcs_basic",
sql=f"select * from {SOURCE_SCHEMA_COLUMNS}",
bucket=GCS_BUCKET,
filename=f"{safe_name(SOURCE_SCHEMA_COLUMNS)}.{{}}.json",
)
数据格式选择¶
该运算符支持两种输出格式
json
- JSON Lines(默认)csv
您可以通过 export_format
参数指定这些选项。
如果您希望创建 CSV 文件,您的运算符调用可能如下所示
trino_to_gcs_csv = TrinoToGCSOperator(
task_id="trino_to_gcs_csv",
sql=f"select * from {SOURCE_SCHEMA_COLUMNS}",
bucket=GCS_BUCKET,
filename=f"{safe_name(SOURCE_SCHEMA_COLUMNS)}.{{}}.csv",
schema_filename=f"{safe_name(SOURCE_SCHEMA_COLUMNS)}-schema.json",
export_format="csv",
)
生成 BigQuery 架构¶
如果设置 schema_filename
参数,将从数据库中转储包含表格的 BigQuery 模式字段的 .json
文件并上传到存储分区。
如果要创建模式文件,则示例运算符调用可能如下所示
trino_to_gcs_multiple_types = TrinoToGCSOperator(
task_id="trino_to_gcs_multiple_types",
sql=f"select * from {SOURCE_SCHEMA_COLUMNS}",
bucket=GCS_BUCKET,
filename=f"{safe_name(SOURCE_SCHEMA_COLUMNS)}.{{}}.json",
schema_filename=f"{safe_name(SOURCE_SCHEMA_COLUMNS)}-schema.json",
gzip=False,
)
有关 BigQuery 模式的更多信息,请参阅 Big Query 文档中的 指定模式。
将结果划分为多个文件¶
此运算符支持将较大的结果拆分为多个文件的能力。 approx_max_file_size_bytes
参数允许开发者指定拆分的文件大小。默认情况下,文件不超过 1 900 000 000 字节(1900 MB)
查看 Google Cloud Storage 中的配额和限制 以了解单个对象的允许最大文件大小。
如果要创建 10 MB 文件,您的代码可能如下所示
read_data_from_gcs_many_chunks = BigQueryInsertJobOperator(
task_id="read_data_from_gcs_many_chunks",
configuration={
"query": {
"query": f"SELECT COUNT(*) FROM `{GCP_PROJECT_ID}.{DATASET_NAME}."
f"{safe_name(SOURCE_CUSTOMER_TABLE)}`",
"useLegacySql": False,
}
},
)
使用 BigQuery 查询数据¶
Google Cloud Storage 中可用的数据可由 BigQuery 使用。您可以将数据加载到 BigQuery 或直接在查询中引用 GCS 数据。有关将数据加载到 BigQuery 的信息,请参阅 BigQuery 文档中的 从 Cloud Storage 加载数据的简介。有关查询 GCS 数据的信息,请参阅 BigQuery 文档中的 查询 Cloud Storage 数据。
Airflow 还具有许多允许你创建使用 BigQuery 的运算符。例如,如果你想创建一个外部表,它允许你创建直接从 GCS 读取数据的查询,那么你可以使用 BigQueryCreateExternalTableOperator
。使用此运算符如下所示
create_external_table_multiple_types = BigQueryCreateExternalTableOperator(
task_id="create_external_table_multiple_types",
bucket=GCS_BUCKET,
table_resource={
"tableReference": {
"projectId": GCP_PROJECT_ID,
"datasetId": DATASET_NAME,
"tableId": f"{safe_name(SOURCE_SCHEMA_COLUMNS)}",
},
"schema": {
"fields": [
{"name": "table_catalog", "type": "STRING"},
{"name": "table_schema", "type": "STRING"},
{"name": "table_name", "type": "STRING"},
{"name": "column_name", "type": "STRING"},
{"name": "ordinal_position", "type": "INT64"},
{"name": "column_default", "type": "STRING"},
{"name": "is_nullable", "type": "STRING"},
{"name": "data_type", "type": "STRING"},
],
},
"externalDataConfiguration": {
"sourceFormat": "NEWLINE_DELIMITED_JSON",
"compression": "NONE",
"sourceUris": [f"gs://{GCS_BUCKET}/{safe_name(SOURCE_SCHEMA_COLUMNS)}.*.json"],
},
},
source_objects=[f"{safe_name(SOURCE_SCHEMA_COLUMNS)}.*.json"],
schema_object=f"{safe_name(SOURCE_SCHEMA_COLUMNS)}-schema.json",
)
有关 Airflow 和 BigQuery 集成的更多信息,请查看 Python API 参考 - bigquery
。