Presto 到 Google Cloud Storage 传输操作符

Presto 是一个开源分布式 SQL 查询引擎,用于对从千兆字节到拍字节大小的各种数据源执行交互式分析查询。Presto 允许查询数据所在的位置,包括 Hive、Cassandra、关系数据库甚至专有数据存储。单个 Presto 查询可以组合来自多个数据源的数据,从而允许在整个组织中进行分析。

Google Cloud Storage 允许随时随地存储和检索任意数量的数据。 您可以使用它来存储备份和存档数据,以及作为 BigQuery 的数据源

数据传输

Presto 和 Google Storage 之间的文件传输通过 PrestoToGCSOperator 操作符执行。

此操作符有 3 个必需的参数

  • sql - 要执行的 SQL。

  • bucket - 要上传到的存储桶。

  • filename - 上传到 Google Cloud Storage 时用作对象名称的文件名。应在文件名中指定 {},以便在文件因大小而被分割的情况下,操作符可以注入文件编号。

所有参数都在参考文档中进行了描述 - PrestoToGCSOperator

一个示例操作符调用可能如下所示

airflow/providers/google/cloud/example_dags/example_presto_to_gcs.py[源代码]

presto_to_gcs_basic = PrestoToGCSOperator(
    task_id="presto_to_gcs_basic",
    sql=f"select * from {SOURCE_MULTIPLE_TYPES}",
    bucket=GCS_BUCKET,
    filename=f"{safe_name(SOURCE_MULTIPLE_TYPES)}.{{}}.json",
)

数据格式的选择

此操作符支持两种输出格式

  • json - JSON Lines(默认)

  • csv

您可以通过 export_format 参数指定这些选项。

如果您想创建 CSV 文件,您的操作符调用可能如下所示

airflow/providers/google/cloud/example_dags/example_presto_to_gcs.py[源代码]

presto_to_gcs_csv = PrestoToGCSOperator(
    task_id="presto_to_gcs_csv",
    sql=f"select * from {SOURCE_MULTIPLE_TYPES}",
    bucket=GCS_BUCKET,
    filename=f"{safe_name(SOURCE_MULTIPLE_TYPES)}.{{}}.csv",
    schema_filename=f"{safe_name(SOURCE_MULTIPLE_TYPES)}-schema.json",
    export_format="csv",
)

生成 BigQuery 模式

如果您设置 schema_filename 参数,则将从数据库中转储一个包含该表 BigQuery 模式字段的 .json 文件,并上传到存储桶。

如果您想创建一个模式文件,则示例操作符调用可能如下所示

airflow/providers/google/cloud/example_dags/example_presto_to_gcs.py[源代码]

presto_to_gcs_multiple_types = PrestoToGCSOperator(
    task_id="presto_to_gcs_multiple_types",
    sql=f"select * from {SOURCE_MULTIPLE_TYPES}",
    bucket=GCS_BUCKET,
    filename=f"{safe_name(SOURCE_MULTIPLE_TYPES)}.{{}}.json",
    schema_filename=f"{safe_name(SOURCE_MULTIPLE_TYPES)}-schema.json",
    gzip=False,
)

有关 BigQuery 模式的更多信息,请查看 BigQuery 文档中的指定模式

将结果划分为多个文件

此操作符支持将大型结果拆分为多个文件的能力。approx_max_file_size_bytes 参数允许开发人员指定拆分的文件大小。默认情况下,文件大小不超过 1 900 000 000 字节(1900 MB)

查看Google Cloud Storage 中的配额和限制,了解单个对象允许的最大文件大小。

如果您想创建 10 MB 的文件,您的代码可能如下所示

airflow/providers/google/cloud/example_dags/example_presto_to_gcs.py[源代码]

read_data_from_gcs_many_chunks = BigQueryInsertJobOperator(
    task_id="read_data_from_gcs_many_chunks",
    configuration={
        "query": {
            "query": f"SELECT COUNT(*) FROM `{GCP_PROJECT_ID}.{DATASET_NAME}."
            f"{safe_name(SOURCE_CUSTOMER_TABLE)}`",
            "useLegacySql": False,
        }
    },
)

使用 BigQuery 查询数据

BigQuery 可以使用 Google Cloud Storage 中可用的数据。您可以将数据加载到 BigQuery,或直接在查询中引用 GCS 数据。有关将数据加载到 BigQuery 的信息,请参阅 BigQuery 文档中的从 Cloud Storage 加载数据简介。有关查询 GCS 数据的信息,请参阅 BigQuery 文档中的查询 Cloud Storage 数据

Airflow 还有许多允许您创建和使用 BigQuery 的操作符。例如,如果您想创建一个外部表,允许您创建直接从 GCS 读取数据的查询,则可以使用 BigQueryCreateExternalTableOperator。使用此操作符如下所示

airflow/providers/google/cloud/example_dags/example_presto_to_gcs.py[源代码]

create_external_table_multiple_types = BigQueryCreateExternalTableOperator(
    task_id="create_external_table_multiple_types",
    bucket=GCS_BUCKET,
    source_objects=[f"{safe_name(SOURCE_MULTIPLE_TYPES)}.*.json"],
    table_resource={
        "tableReference": {
            "projectId": GCP_PROJECT_ID,
            "datasetId": DATASET_NAME,
            "tableId": f"{safe_name(SOURCE_MULTIPLE_TYPES)}",
        },
        "schema": {
            "fields": [
                {"name": "name", "type": "STRING"},
                {"name": "post_abbr", "type": "STRING"},
            ]
        },
        "externalDataConfiguration": {
            "sourceFormat": "NEWLINE_DELIMITED_JSON",
            "compression": "NONE",
            "csvOptions": {"skipLeadingRows": 1},
        },
    },
    schema_object=f"{safe_name(SOURCE_MULTIPLE_TYPES)}-schema.json",
)

有关 Airflow 和 BigQuery 集成的更多信息,请查看 Python API 参考 - bigquery

参考

有关更多信息,请查看

此条目是否对您有帮助?