Google Cloud BigQuery 传输算子到 Postgres¶
Google Cloud BigQuery 是 Google Cloud 的无服务器数据仓库产品。PostgreSQL 是一个开源关系数据库管理系统。此算子可用于将数据从 BigQuery 表格复制到 PostgreSQL。
先决任务¶
要使用这些算子,您必须执行以下操作
使用Cloud 控制台选择或创建一个 Cloud Platform 项目。
为您的项目启用帐单,如Google Cloud 文档中所述。
启用 API,如Cloud 控制台文档中所述。
通过pip安装 API 库。
pip install 'apache-airflow[google]'有关详细信息,请参阅 安装。
操作符¶
使用 BigQueryToPostgresOperator
操作符从 BigQuery 表中复制数据到 Postgres 表中。
使用 Jinja 模板 和 target_table_name
, impersonation_chain
, dataset_id
, table_id
动态定义值。
您可以使用参数 selected_fields
来限制要复制的字段(默认情况下为所有字段),以及参数 replace
来覆盖目标表,而不是追加到目标表。如果使用了 replace
参数,则由于底层 INSERT 命令中 PostgreSQL 的 ON CONFLICT 子句的约束,需要指定 selected_fields
和 replace_index
参数。
有关更多信息,请参阅以上链接。
传输数据¶
以下操作符从 BigQuery 表中复制数据到 PostgreSQL。
bigquery_to_postgres = BigQueryToPostgresOperator(
task_id="bigquery_to_postgres",
postgres_conn_id=CONNECTION_ID,
dataset_table=f"{BIGQUERY_DATASET_NAME}.{BIGQUERY_TABLE}",
target_table_name=SQL_TABLE,
batch_size=BATCH_SIZE,
replace=False,
)
该操作符还可以用来自 BigQuery 表的匹配数据替换 PostgreSQL 表中的数据。
bigquery_to_postgres_upsert = BigQueryToPostgresOperator(
task_id="bigquery_to_postgres_upsert",
postgres_conn_id=CONNECTION_ID,
dataset_table=f"{BIGQUERY_DATASET_NAME}.{BIGQUERY_TABLE}",
target_table_name=SQL_TABLE,
batch_size=BATCH_SIZE,
replace=True,
selected_fields=["emp_name", "salary"],
replace_index=["emp_name", "salary"],
)