Google Cloud Storage 传输操作符到 BigQuery

Google Cloud Storage (GCS) 是一项用于存储非结构化数据的托管服务。 Google Cloud BigQuery 是 Google Cloud 提供的无服务器数据仓库。此操作符可用于使用存储在 Cloud Storage 存储桶中的文件填充 BigQuery 表。

先决任务

要使用这些操作符,您必须执行以下操作

操作符

使用 GCSToBigQueryOperator 运算符从 GCS 传输文件到 BigQuery。

使用 Jinja 模板bucketsource_objectsschema_objectschema_object_bucketdestination_project_dataset_tableimpersonation_chainsrc_fmt_configs 动态定义值。

你可以使用 source_objects 参数从单个存储分区加载多个对象。你还可以定义架构以及其他设置,例如压缩格式。有关更多信息,请参阅上面的链接。

传输文件

以下运算符将一个或多个文件从 GCS 传输到 BigQuery 表中。

tests/system/providers/google/cloud/gcs/example_gcs_to_bigquery.py[源代码]

load_csv = GCSToBigQueryOperator(
    task_id="gcs_to_bigquery_example",
    bucket="cloud-samples-data",
    source_objects=["bigquery/us-states/us-states.csv"],
    destination_project_dataset_table=f"{DATASET_NAME}.{TABLE_NAME}",
    schema_fields=[
        {"name": "name", "type": "STRING", "mode": "NULLABLE"},
        {"name": "post_abbr", "type": "STRING", "mode": "NULLABLE"},
    ],
    write_disposition="WRITE_TRUNCATE",
)

你也可以在可延迟模式下使用 GCSToBigQueryOperator

tests/system/providers/google/cloud/gcs/example_gcs_to_bigquery_async.py[源代码]

load_string_based_csv = GCSToBigQueryOperator(
    task_id="gcs_to_bigquery_example_str_csv_async",
    bucket="cloud-samples-data",
    source_objects=["bigquery/us-states/us-states.csv"],
    destination_project_dataset_table=f"{DATASET_NAME_STR}.{TABLE_NAME_STR}",
    write_disposition="WRITE_TRUNCATE",
    external_table=False,
    autodetect=True,
    max_id_key="string_field_0",
    deferrable=True,
)

load_date_based_csv = GCSToBigQueryOperator(
    task_id="gcs_to_bigquery_example_date_csv_async",
    bucket="cloud-samples-data",
    source_objects=["bigquery/us-states/us-states-by-date.csv"],
    destination_project_dataset_table=f"{DATASET_NAME_DATE}.{TABLE_NAME_DATE}",
    write_disposition="WRITE_TRUNCATE",
    external_table=False,
    autodetect=True,
    max_id_key=MAX_ID_DATE,
    deferrable=True,
)

load_json = GCSToBigQueryOperator(
    task_id="gcs_to_bigquery_example_date_json_async",
    bucket="cloud-samples-data",
    source_objects=["bigquery/us-states/us-states.json"],
    source_format="NEWLINE_DELIMITED_JSON",
    destination_project_dataset_table=f"{DATASET_NAME_JSON}.{TABLE_NAME_JSON}",
    write_disposition="WRITE_TRUNCATE",
    external_table=False,
    autodetect=True,
    max_id_key=MAX_ID_STR,
    deferrable=True,
)

load_csv_delimiter = GCSToBigQueryOperator(
    task_id="gcs_to_bigquery_example_delimiter_async",
    bucket="big-query-samples",
    source_objects=["employees-tabular.csv"],
    source_format="csv",
    destination_project_dataset_table=f"{DATASET_NAME_DELIMITER}.{TABLE_NAME_DELIMITER}",
    write_disposition="WRITE_TRUNCATE",
    external_table=False,
    autodetect=True,
    field_delimiter="\t",
    quote_character="",
    max_id_key=MAX_ID_STR,
    deferrable=True,
)

参考

有关更多信息,请参阅

此条目是否有用?