Presto 到 Google Cloud Storage 传输算子

Presto 是一款开源分布式 SQL 查询引擎,用于对从千兆字节到拍字节大小的数据源运行交互式分析查询。Presto 允许在数据所在的位置(包括 Hive、Cassandra、关系数据库甚至专有数据存储)查询数据。单个 Presto 查询可以组合来自多个来源的数据,从而对整个组织进行分析。

Google Cloud Storage 允许随时随地存储和检索任意数量的数据。你可以使用它来存储备份和归档数据,以及BigQuery 的数据源

数据传输

使用PrestoToGCSOperator算子在 Presto 和 Google Storage 之间传输文件。

此算子有 3 个必需参数

  • sql - 要执行的 SQL。

  • bucket - 要上传到的存储分区。

  • filename - 上传到 Google Cloud Storage 时用作对象名称的文件名。在文件名中应指定一个{},以便在文件因大小而拆分时允许算子注入文件号。

所有参数在参考文档中都有说明 - PrestoToGCSOperator

示例操作符调用可能如下所示

airflow/providers/google/cloud/example_dags/example_presto_to_gcs.py[源代码]

presto_to_gcs_basic = PrestoToGCSOperator(
    task_id="presto_to_gcs_basic",
    sql=f"select * from {SOURCE_MULTIPLE_TYPES}",
    bucket=GCS_BUCKET,
    filename=f"{safe_name(SOURCE_MULTIPLE_TYPES)}.{{}}.json",
)

数据格式选择

该操作符支持两种输出格式

  • json - JSON 行(默认)

  • csv

你可以通过 export_format 参数指定这些选项。

如果你想创建一个 CSV 文件,你的操作符调用可能如下所示

airflow/providers/google/cloud/example_dags/example_presto_to_gcs.py[源代码]

presto_to_gcs_csv = PrestoToGCSOperator(
    task_id="presto_to_gcs_csv",
    sql=f"select * from {SOURCE_MULTIPLE_TYPES}",
    bucket=GCS_BUCKET,
    filename=f"{safe_name(SOURCE_MULTIPLE_TYPES)}.{{}}.csv",
    schema_filename=f"{safe_name(SOURCE_MULTIPLE_TYPES)}-schema.json",
    export_format="csv",
)

生成 BigQuery 架构

如果你设置了 schema_filename 参数,一个包含表的大查询架构字段的 .json 文件将从数据库中转储并上传到存储桶。

如果你想创建一个架构文件,那么示例操作符调用可能如下所示

airflow/providers/google/cloud/example_dags/example_presto_to_gcs.py[源代码]

presto_to_gcs_multiple_types = PrestoToGCSOperator(
    task_id="presto_to_gcs_multiple_types",
    sql=f"select * from {SOURCE_MULTIPLE_TYPES}",
    bucket=GCS_BUCKET,
    filename=f"{safe_name(SOURCE_MULTIPLE_TYPES)}.{{}}.json",
    schema_filename=f"{safe_name(SOURCE_MULTIPLE_TYPES)}-schema.json",
    gzip=False,
)

有关 BigQuery 架构的更多信息,请参阅 Big Query 文档中的 指定架构

结果划分为多个文件

此操作符支持将大型结果拆分为多个文件的能力。 approx_max_file_size_bytes 参数允许开发人员指定拆分的文件大小。默认情况下,文件不超过 1 900 000 000 字节 (1900 MB)

查看 Google Cloud Storage 中的配额和限制 以查看单个对象的允许的最大文件大小。

如果您想创建 10 MB 的文件,您的代码可能如下所示

airflow/providers/google/cloud/example_dags/example_presto_to_gcs.py[源代码]

read_data_from_gcs_many_chunks = BigQueryInsertJobOperator(
    task_id="read_data_from_gcs_many_chunks",
    configuration={
        "query": {
            "query": f"SELECT COUNT(*) FROM `{GCP_PROJECT_ID}.{DATASET_NAME}."
            f"{safe_name(SOURCE_CUSTOMER_TABLE)}`",
            "useLegacySql": False,
        }
    },
)

使用 BigQuery 查询数据

BigQuery 可以使用 Google Cloud Storage 中提供的数据。您可以将数据加载到 BigQuery,或直接在查询中引用 GCS 数据。有关将数据加载到 BigQuery 的信息,请参阅 BigQuery 文档中的 从 Cloud Storage 加载数据简介。有关查询 GCS 数据的信息,请参阅 BigQuery 文档中的 查询 Cloud Storage 数据

Airflow 还有许多允许您创建和使用 BigQuery 的运算符。例如,如果您想创建一个外部表,以便创建直接从 GCS 读取数据的查询,则可以使用 BigQueryCreateExternalTableOperator。使用此运算符如下所示

airflow/providers/google/cloud/example_dags/example_presto_to_gcs.py[源代码]

create_external_table_multiple_types = BigQueryCreateExternalTableOperator(
    task_id="create_external_table_multiple_types",
    bucket=GCS_BUCKET,
    source_objects=[f"{safe_name(SOURCE_MULTIPLE_TYPES)}.*.json"],
    table_resource={
        "tableReference": {
            "projectId": GCP_PROJECT_ID,
            "datasetId": DATASET_NAME,
            "tableId": f"{safe_name(SOURCE_MULTIPLE_TYPES)}",
        },
        "schema": {
            "fields": [
                {"name": "name", "type": "STRING"},
                {"name": "post_abbr", "type": "STRING"},
            ]
        },
        "externalDataConfiguration": {
            "sourceFormat": "NEWLINE_DELIMITED_JSON",
            "compression": "NONE",
            "csvOptions": {"skipLeadingRows": 1},
        },
    },
    schema_object=f"{safe_name(SOURCE_MULTIPLE_TYPES)}-schema.json",
)

有关 Airflow 和 BigQuery 集成的更多信息,请参阅 Python API 参考 - bigquery

参考

有关更多信息,请参阅

此条目是否有用?