Google Cloud BigQuery 传输算子到 Postgres

Google Cloud BigQuery 是 Google Cloud 的无服务器数据仓库产品。PostgreSQL 是一个开源关系数据库管理系统。此算子可用于将数据从 BigQuery 表格复制到 PostgreSQL。

先决任务

要使用这些算子,您必须执行以下操作

操作符

使用 BigQueryToPostgresOperator 操作符从 BigQuery 表中复制数据到 Postgres 表中。

使用 Jinja 模板target_table_name, impersonation_chain, dataset_id, table_id 动态定义值。

您可以使用参数 selected_fields 来限制要复制的字段(默认情况下为所有字段),以及参数 replace 来覆盖目标表,而不是追加到目标表。如果使用了 replace 参数,则由于底层 INSERT 命令中 PostgreSQL 的 ON CONFLICT 子句的约束,需要指定 selected_fieldsreplace_index 参数。

有关更多信息,请参阅以上链接。

传输数据

以下操作符从 BigQuery 表中复制数据到 PostgreSQL。

tests/system/providers/google/cloud/bigquery/example_bigquery_to_postgres.py[源代码]

bigquery_to_postgres = BigQueryToPostgresOperator(
    task_id="bigquery_to_postgres",
    postgres_conn_id=CONNECTION_ID,
    dataset_table=f"{BIGQUERY_DATASET_NAME}.{BIGQUERY_TABLE}",
    target_table_name=SQL_TABLE,
    batch_size=BATCH_SIZE,
    replace=False,
)

该操作符还可以用来自 BigQuery 表的匹配数据替换 PostgreSQL 表中的数据。

tests/system/providers/google/cloud/bigquery/example_bigquery_to_postgres.py[源代码]

bigquery_to_postgres_upsert = BigQueryToPostgresOperator(
    task_id="bigquery_to_postgres_upsert",
    postgres_conn_id=CONNECTION_ID,
    dataset_table=f"{BIGQUERY_DATASET_NAME}.{BIGQUERY_TABLE}",
    target_table_name=SQL_TABLE,
    batch_size=BATCH_SIZE,
    replace=True,
    selected_fields=["emp_name", "salary"],
    replace_index=["emp_name", "salary"],
)

参考

有关更多信息,请参阅

此条目是否有用?