Google Cloud Storage 传输操作符到 BigQuery¶
Google Cloud Storage (GCS) 是一项用于存储非结构化数据的托管服务。 Google Cloud BigQuery 是 Google Cloud 提供的无服务器数据仓库。此操作符可用于使用存储在 Cloud Storage 存储桶中的文件填充 BigQuery 表。
先决任务¶
要使用这些操作符,您必须执行以下操作
使用 Cloud 控制台 选择或创建 Cloud Platform 项目。
为您的项目启用帐单,如 Google Cloud 文档 中所述。
启用 API,如 Cloud 控制台文档 中所述。
通过 pip 安装 API 库。
pip install 'apache-airflow[google]'有关 安装 的详细信息,请访问此处。
操作符¶
使用 GCSToBigQueryOperator
运算符从 GCS 传输文件到 BigQuery。
使用 Jinja 模板 和 bucket
、source_objects
、schema_object
、schema_object_bucket
、destination_project_dataset_table
、impersonation_chain
、src_fmt_configs
动态定义值。
你可以使用 source_objects
参数从单个存储分区加载多个对象。你还可以定义架构以及其他设置,例如压缩格式。有关更多信息,请参阅上面的链接。
传输文件¶
以下运算符将一个或多个文件从 GCS 传输到 BigQuery 表中。
load_csv = GCSToBigQueryOperator(
task_id="gcs_to_bigquery_example",
bucket="cloud-samples-data",
source_objects=["bigquery/us-states/us-states.csv"],
destination_project_dataset_table=f"{DATASET_NAME}.{TABLE_NAME}",
schema_fields=[
{"name": "name", "type": "STRING", "mode": "NULLABLE"},
{"name": "post_abbr", "type": "STRING", "mode": "NULLABLE"},
],
write_disposition="WRITE_TRUNCATE",
)
你也可以在可延迟模式下使用 GCSToBigQueryOperator
load_string_based_csv = GCSToBigQueryOperator(
task_id="gcs_to_bigquery_example_str_csv_async",
bucket="cloud-samples-data",
source_objects=["bigquery/us-states/us-states.csv"],
destination_project_dataset_table=f"{DATASET_NAME_STR}.{TABLE_NAME_STR}",
write_disposition="WRITE_TRUNCATE",
external_table=False,
autodetect=True,
max_id_key="string_field_0",
deferrable=True,
)
load_date_based_csv = GCSToBigQueryOperator(
task_id="gcs_to_bigquery_example_date_csv_async",
bucket="cloud-samples-data",
source_objects=["bigquery/us-states/us-states-by-date.csv"],
destination_project_dataset_table=f"{DATASET_NAME_DATE}.{TABLE_NAME_DATE}",
write_disposition="WRITE_TRUNCATE",
external_table=False,
autodetect=True,
max_id_key=MAX_ID_DATE,
deferrable=True,
)
load_json = GCSToBigQueryOperator(
task_id="gcs_to_bigquery_example_date_json_async",
bucket="cloud-samples-data",
source_objects=["bigquery/us-states/us-states.json"],
source_format="NEWLINE_DELIMITED_JSON",
destination_project_dataset_table=f"{DATASET_NAME_JSON}.{TABLE_NAME_JSON}",
write_disposition="WRITE_TRUNCATE",
external_table=False,
autodetect=True,
max_id_key=MAX_ID_STR,
deferrable=True,
)
load_csv_delimiter = GCSToBigQueryOperator(
task_id="gcs_to_bigquery_example_delimiter_async",
bucket="big-query-samples",
source_objects=["employees-tabular.csv"],
source_format="csv",
destination_project_dataset_table=f"{DATASET_NAME_DELIMITER}.{TABLE_NAME_DELIMITER}",
write_disposition="WRITE_TRUNCATE",
external_table=False,
autodetect=True,
field_delimiter="\t",
quote_character="",
max_id_key=MAX_ID_STR,
deferrable=True,
)