airflow.providers.google.cloud.transfers.gcs_to_bigquery
¶
此模块包含一个 Google Cloud Storage 到 BigQuery 的操作符。
模块内容¶
类¶
将文件从 Google Cloud Storage 加载到 BigQuery。 |
属性¶
- airflow.providers.google.cloud.transfers.gcs_to_bigquery.ALLOWED_FORMATS = ['CSV', 'NEWLINE_DELIMITED_JSON', 'AVRO', 'GOOGLE_SHEETS', 'DATASTORE_BACKUP', 'PARQUET'][源代码]¶
- class airflow.providers.google.cloud.transfers.gcs_to_bigquery.GCSToBigQueryOperator(*, bucket, source_objects, destination_project_dataset_table, schema_fields=None, schema_object=None, schema_object_bucket=None, source_format='CSV', compression='NONE', create_disposition='CREATE_IF_NEEDED', skip_leading_rows=None, write_disposition='WRITE_EMPTY', field_delimiter=',', max_bad_records=0, quote_character=None, ignore_unknown_values=False, allow_quoted_newlines=False, allow_jagged_rows=False, encoding='UTF-8', max_id_key=None, gcp_conn_id='google_cloud_default', schema_update_options=(), src_fmt_configs=None, external_table=False, time_partitioning=None, cluster_fields=None, autodetect=True, encryption_configuration=None, location=None, impersonation_chain=None, labels=None, description=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), result_retry=DEFAULT_RETRY, result_timeout=None, cancel_on_kill=True, job_id=None, force_rerun=True, reattach_states=None, project_id=PROVIDE_PROJECT_ID, force_delete=False, **kwargs)[源代码]¶
基类:
airflow.models.BaseOperator
将文件从 Google Cloud Storage 加载到 BigQuery。
BigQuery 表的模式可以使用两种方式之一指定。您可以直接传入模式字段,也可以将操作符指向 Google Cloud Storage 对象名称。Google Cloud Storage 中的对象必须是一个包含模式字段的 JSON 文件。
另请参阅
有关如何使用此操作符的更多信息,请查看以下指南:操作符
- 参数
bucket – 要从中加载的存储桶。(模板化)
source_objects – 要从中加载的 Google Cloud Storage URI 的字符串或列表。(模板化)如果 source_format 为 'DATASTORE_BACKUP',则列表必须仅包含一个 URI。
destination_project_dataset_table – 要将数据加载到的点分隔的
(<project>.|<project>:)<dataset>.<table>
BigQuery 表。如果未包含<project>
,则项目将是连接 json 中定义的项目。(模板化)schema_fields – 如果设置,则模式字段列表,如此处定义:https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load 如果 source_format 为 'DATASTORE_BACKUP',则不应设置。如果 'schema_object' 为 null 且 autodetect 为 False,则必须定义参数。
schema_object – 如果设置,则指向包含表模式的 .json 文件的 GCS 对象路径。(模板化)如果 'schema_fields' 为 null 且 autodetect 为 False,则必须定义参数。
schema_object_bucket – [可选] 如果设置,则为存储模式对象模板的 GCS 存储桶。(模板化) (默认值:
bucket
的值)source_format – 要导出的文件格式。
compression – [可选] 数据源的压缩类型。可能的值包括 GZIP 和 NONE。默认值为 NONE。此设置对于 Google Cloud Bigtable、Google Cloud Datastore 备份和 Avro 格式将被忽略。
create_disposition – 如果表不存在,则创建处置。
skip_leading_rows – BigQuery 在加载数据时将跳过的 CSV 文件顶部的行数。当 autodetect 开启时,行为如下:skip_leading_rows 未指定 - Autodetect 尝试检测第一行中的标题。如果未检测到标题,则将该行读取为数据。否则,将从第二行开始读取数据。skip_leading_rows 为 0 - 指示 autodetect 没有标题,应从第一行开始读取数据。skip_leading_rows = N > 0 - Autodetect 跳过 N-1 行,并尝试检测第 N 行中的标题。如果未检测到标题,则仅跳过第 N 行。否则,使用第 N 行来提取检测到的模式的列名。默认值设置为 None,以便自动检测选项可以检测模式字段。
write_disposition – 如果表已存在,则写入处置。
field_delimiter – 从 CSV 加载时要使用的分隔符。
max_bad_records – BigQuery 在运行作业时可以忽略的最大错误记录数。
quote_character – 用于在 CSV 文件中引用数据部分的值。
ignore_unknown_values – [可选] 指示 BigQuery 是否应允许未在表模式中表示的额外值。如果为 true,则忽略额外的值。如果为 false,则将具有额外列的记录视为错误记录,并且如果错误记录过多,则会在作业结果中返回无效错误。
allow_quoted_newlines – 是否允许带引号的换行符(true)或不允许(false)。
allow_jagged_rows – 接受缺少尾部可选列的行。缺失的值将被视为 null 值。如果为 false,则将缺少尾部列的记录视为错误记录,并且如果错误记录过多,则会在作业结果中返回无效错误。仅适用于 CSV,对于其他格式将被忽略。
encoding – 数据的字符编码。请参阅:https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.tableDefinitions.(key).csvOptions.encoding https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#externalDataConfiguration.csvOptions.encoding
max_id_key – 如果设置,则为要加载的 BigQuery 表中列的名称。这将被用于在加载发生后从 BigQuery 中选择 MAX 值。结果将由 execute() 命令返回,然后存储在 XCom 中供未来的运算符使用。这对于增量加载很有帮助——在未来的执行过程中,您可以从最大 ID 处继续。
schema_update_options – 允许作为加载作业的副作用更新目标表的架构。
src_fmt_configs – 配置特定于源格式的可选字段
external_table – 标志,用于指定目标表是否应为 BigQuery 外部表。默认值为 False。
time_partitioning – 配置可选的时间分区字段,例如按照 API 规范进行字段、类型和过期时间的分区。请注意,“field”在与 dataset.table$partition 并发时不可用。
cluster_fields – 请求将此加载的结果按一个或多个列排序存储。BigQuery 支持对分区表和非分区表进行聚类。给定的列顺序决定排序顺序。不适用于外部表。
autodetect – [可选] 指示是否应自动推断 CSV 和 JSON 源的选项和架构。(默认值:
True
)。如果未定义“schema_fields”和“schema_object”,则必须将此参数设置为 True。如果表是在 Airflow 之外创建的,建议将其设置为 True。如果 autodetect 为 None 且未提供架构(既没有通过 schema_fields,也没有通过 schema_object),则假定该表已存在。encryption_configuration –
[可选] 自定义加密配置(例如,Cloud KMS 密钥)。
encryption_configuration = { "kmsKeyName": "projects/testp/locations/us/keyRings/test-kr/cryptoKeys/test-key", }
location – [可选] 作业的地理位置。美国和欧盟以外的地区是必需的。有关详细信息,请参阅 https://cloud.google.com/bigquery/docs/locations#specifying_your_location
impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务帐户,使用短期凭据模拟,或者获取列表中最后一个帐户的 access_token 所需的链式帐户列表,该帐户将在请求中被模拟。如果设置为字符串,则该帐户必须授予原始帐户 Service Account Token Creator IAM 角色。如果设置为序列,则列表中的身份必须将 Service Account Token Creator IAM 角色授予直接前面的身份,列表中的第一个帐户将此角色授予原始帐户(模板化)。
labels – [可选] BiqQuery 表的标签。
description – [可选] BigQuery 表的描述。仅当新创建目标表时才会使用此项。如果表已存在且提供了与当前描述不同的值,则作业将失败。
deferrable (bool) – 在可延迟模式下运行运算符
force_delete (bool) – 如果目标表已存在,则强制删除目标表。
- template_fields: collections.abc.Sequence[str] = ('bucket', 'source_objects', 'schema_object', 'schema_object_bucket',...[source]¶
- template_ext: collections.abc.Sequence[str] = ('.sql',)[source]¶