Google Cloud Storage 传输算子到 BigQuery

Google Cloud Storage (GCS) 是一项用于存储非结构化数据的托管服务。 Google Cloud BigQuery 是 Google Cloud 的无服务器数据仓库产品。此算子可用于使用存储在 Cloud Storage 存储桶中的文件中的数据填充 BigQuery 表。

先决条件任务

要使用这些算子,您必须执行以下几项操作

算子

使用 GCSToBigQueryOperator 算子执行从 GCS 到 BigQuery 的文件传输。

使用 bucketsource_objectsschema_objectschema_object_bucketdestination_project_dataset_tableimpersonation_chainsrc_fmt_configsJinja 模板 动态定义值。

您可以使用 source_objects 参数从单个存储桶加载多个对象。您还可以定义架构以及其他设置,例如压缩格式。有关更多信息,请参阅上面的链接。

传输文件

以下算子将一个或多个文件从 GCS 传输到 BigQuery 表中。

tests/system/google/cloud/gcs/example_gcs_to_bigquery.py[源代码]

load_csv = GCSToBigQueryOperator(
    task_id="gcs_to_bigquery_example",
    bucket="cloud-samples-data",
    source_objects=["bigquery/us-states/us-states.csv"],
    destination_project_dataset_table=f"{DATASET_NAME}.{TABLE_NAME}",
    schema_fields=[
        {"name": "name", "type": "STRING", "mode": "NULLABLE"},
        {"name": "post_abbr", "type": "STRING", "mode": "NULLABLE"},
    ],
    write_disposition="WRITE_TRUNCATE",
)

您还可以在可延迟模式下使用 GCSToBigQueryOperator

tests/system/google/cloud/gcs/example_gcs_to_bigquery_async.py[源代码]

load_string_based_csv = GCSToBigQueryOperator(
    task_id="gcs_to_bigquery_example_str_csv_async",
    bucket="cloud-samples-data",
    source_objects=["bigquery/us-states/us-states.csv"],
    destination_project_dataset_table=f"{DATASET_NAME_STR}.{TABLE_NAME_STR}",
    write_disposition="WRITE_TRUNCATE",
    external_table=False,
    autodetect=True,
    max_id_key="string_field_0",
    deferrable=True,
)

load_date_based_csv = GCSToBigQueryOperator(
    task_id="gcs_to_bigquery_example_date_csv_async",
    bucket="cloud-samples-data",
    source_objects=["bigquery/us-states/us-states-by-date.csv"],
    destination_project_dataset_table=f"{DATASET_NAME_DATE}.{TABLE_NAME_DATE}",
    write_disposition="WRITE_TRUNCATE",
    external_table=False,
    autodetect=True,
    max_id_key=MAX_ID_DATE,
    deferrable=True,
)

load_json = GCSToBigQueryOperator(
    task_id="gcs_to_bigquery_example_date_json_async",
    bucket="cloud-samples-data",
    source_objects=["bigquery/us-states/us-states.json"],
    source_format="NEWLINE_DELIMITED_JSON",
    destination_project_dataset_table=f"{DATASET_NAME_JSON}.{TABLE_NAME_JSON}",
    write_disposition="WRITE_TRUNCATE",
    external_table=False,
    autodetect=True,
    max_id_key=MAX_ID_STR,
    deferrable=True,
)

load_csv_delimiter = GCSToBigQueryOperator(
    task_id="gcs_to_bigquery_example_delimiter_async",
    bucket="big-query-samples",
    source_objects=["employees-tabular.csv"],
    source_format="csv",
    destination_project_dataset_table=f"{DATASET_NAME_DELIMITER}.{TABLE_NAME_DELIMITER}",
    write_disposition="WRITE_TRUNCATE",
    external_table=False,
    autodetect=True,
    field_delimiter="\t",
    quote_character="",
    max_id_key=MAX_ID_STR,
    deferrable=True,
)

参考

有关更多信息,请查看

此条目是否有帮助?