Presto 到 Google Cloud Storage 传输算子¶
Presto 是一款开源分布式 SQL 查询引擎,用于对从千兆字节到拍字节大小的数据源运行交互式分析查询。Presto 允许在数据所在的位置(包括 Hive、Cassandra、关系数据库甚至专有数据存储)查询数据。单个 Presto 查询可以组合来自多个来源的数据,从而对整个组织进行分析。
Google Cloud Storage 允许随时随地存储和检索任意数量的数据。你可以使用它来存储备份和归档数据,以及BigQuery 的数据源。
数据传输¶
使用PrestoToGCSOperator
算子在 Presto 和 Google Storage 之间传输文件。
此算子有 3 个必需参数
sql
- 要执行的 SQL。bucket
- 要上传到的存储分区。filename
- 上传到 Google Cloud Storage 时用作对象名称的文件名。在文件名中应指定一个{}
,以便在文件因大小而拆分时允许算子注入文件号。
所有参数在参考文档中都有说明 - PrestoToGCSOperator
。
示例操作符调用可能如下所示
presto_to_gcs_basic = PrestoToGCSOperator(
task_id="presto_to_gcs_basic",
sql=f"select * from {SOURCE_MULTIPLE_TYPES}",
bucket=GCS_BUCKET,
filename=f"{safe_name(SOURCE_MULTIPLE_TYPES)}.{{}}.json",
)
数据格式选择¶
该操作符支持两种输出格式
json
- JSON 行(默认)csv
你可以通过 export_format
参数指定这些选项。
如果你想创建一个 CSV 文件,你的操作符调用可能如下所示
presto_to_gcs_csv = PrestoToGCSOperator(
task_id="presto_to_gcs_csv",
sql=f"select * from {SOURCE_MULTIPLE_TYPES}",
bucket=GCS_BUCKET,
filename=f"{safe_name(SOURCE_MULTIPLE_TYPES)}.{{}}.csv",
schema_filename=f"{safe_name(SOURCE_MULTIPLE_TYPES)}-schema.json",
export_format="csv",
)
生成 BigQuery 架构¶
如果你设置了 schema_filename
参数,一个包含表的大查询架构字段的 .json
文件将从数据库中转储并上传到存储桶。
如果你想创建一个架构文件,那么示例操作符调用可能如下所示
presto_to_gcs_multiple_types = PrestoToGCSOperator(
task_id="presto_to_gcs_multiple_types",
sql=f"select * from {SOURCE_MULTIPLE_TYPES}",
bucket=GCS_BUCKET,
filename=f"{safe_name(SOURCE_MULTIPLE_TYPES)}.{{}}.json",
schema_filename=f"{safe_name(SOURCE_MULTIPLE_TYPES)}-schema.json",
gzip=False,
)
有关 BigQuery 架构的更多信息,请参阅 Big Query 文档中的 指定架构。
结果划分为多个文件¶
此操作符支持将大型结果拆分为多个文件的能力。 approx_max_file_size_bytes
参数允许开发人员指定拆分的文件大小。默认情况下,文件不超过 1 900 000 000 字节 (1900 MB)
查看 Google Cloud Storage 中的配额和限制 以查看单个对象的允许的最大文件大小。
如果您想创建 10 MB 的文件,您的代码可能如下所示
read_data_from_gcs_many_chunks = BigQueryInsertJobOperator(
task_id="read_data_from_gcs_many_chunks",
configuration={
"query": {
"query": f"SELECT COUNT(*) FROM `{GCP_PROJECT_ID}.{DATASET_NAME}."
f"{safe_name(SOURCE_CUSTOMER_TABLE)}`",
"useLegacySql": False,
}
},
)
使用 BigQuery 查询数据¶
BigQuery 可以使用 Google Cloud Storage 中提供的数据。您可以将数据加载到 BigQuery,或直接在查询中引用 GCS 数据。有关将数据加载到 BigQuery 的信息,请参阅 BigQuery 文档中的 从 Cloud Storage 加载数据简介。有关查询 GCS 数据的信息,请参阅 BigQuery 文档中的 查询 Cloud Storage 数据。
Airflow 还有许多允许您创建和使用 BigQuery 的运算符。例如,如果您想创建一个外部表,以便创建直接从 GCS 读取数据的查询,则可以使用 BigQueryCreateExternalTableOperator
。使用此运算符如下所示
create_external_table_multiple_types = BigQueryCreateExternalTableOperator(
task_id="create_external_table_multiple_types",
bucket=GCS_BUCKET,
source_objects=[f"{safe_name(SOURCE_MULTIPLE_TYPES)}.*.json"],
table_resource={
"tableReference": {
"projectId": GCP_PROJECT_ID,
"datasetId": DATASET_NAME,
"tableId": f"{safe_name(SOURCE_MULTIPLE_TYPES)}",
},
"schema": {
"fields": [
{"name": "name", "type": "STRING"},
{"name": "post_abbr", "type": "STRING"},
]
},
"externalDataConfiguration": {
"sourceFormat": "NEWLINE_DELIMITED_JSON",
"compression": "NONE",
"csvOptions": {"skipLeadingRows": 1},
},
},
schema_object=f"{safe_name(SOURCE_MULTIPLE_TYPES)}-schema.json",
)
有关 Airflow 和 BigQuery 集成的更多信息,请参阅 Python API 参考 - bigquery
。