Google Cloud BigQuery 运算符

BigQuery 是 Google 完全托管的、PB 级、低成本分析数据仓库。它是一种无需数据库管理员的无服务器软件即服务 (SaaS)。它允许用户专注于分析数据,使用熟悉的 SQL 查找有意义的见解。

Airflow 提供运算符来管理数据集和表,运行查询和验证数据。

先决条件任务

要使用这些运算符,您必须执行以下几项操作

管理数据集

创建数据集

要在 BigQuery 数据库中创建空数据集,您可以使用 BigQueryCreateEmptyDatasetOperator

tests/system/google/cloud/bigquery/example_bigquery_dataset.py[源]

create_dataset = BigQueryCreateEmptyDatasetOperator(task_id="create_dataset", dataset_id=DATASET_NAME)

获取数据集详细信息

要获取现有数据集的详细信息,您可以使用 BigQueryGetDatasetOperator

此运算符返回一个 Dataset Resource

tests/system/google/cloud/bigquery/example_bigquery_dataset.py[源]

get_dataset = BigQueryGetDatasetOperator(task_id="get-dataset", dataset_id=DATASET_NAME)

列出数据集中的表

要检索给定数据集中表的列表,请使用 BigQueryGetDatasetTablesOperator

tests/system/google/cloud/bigquery/example_bigquery_tables.py[源]

get_dataset_tables = BigQueryGetDatasetTablesOperator(
    task_id="get_dataset_tables", dataset_id=DATASET_NAME
)

更新表

要在 BigQuery 中更新表,您可以使用 BigQueryUpdateTableOperator

update 方法替换整个 Table 资源,而 patch 方法仅替换提交的 Table 资源中提供的字段。

tests/system/google/cloud/bigquery/example_bigquery_tables.py[源]

update_table = BigQueryUpdateTableOperator(
    task_id="update_table",
    dataset_id=DATASET_NAME,
    table_id="test_table",
    fields=["friendlyName", "description"],
    table_resource={
        "friendlyName": "Updated Table",
        "description": "Updated Table",
    },
)

更新数据集

要在 BigQuery 中更新数据集,您可以使用 BigQueryUpdateDatasetOperator

update 方法替换整个数据集资源,而 patch 方法仅替换提交的数据集资源中提供的字段。

tests/system/google/cloud/bigquery/example_bigquery_dataset.py[源]

update_dataset = BigQueryUpdateDatasetOperator(
    task_id="update_dataset",
    dataset_id=DATASET_NAME,
    dataset_resource={"description": "Updated dataset"},
)

删除数据集

要从 BigQuery 数据库中删除现有数据集,您可以使用 BigQueryDeleteDatasetOperator

tests/system/google/cloud/bigquery/example_bigquery_dataset.py[源]

delete_dataset = BigQueryDeleteDatasetOperator(
    task_id="delete_dataset", dataset_id=DATASET_NAME, delete_contents=True
)

管理表

创建原生表

要在给定的 BigQuery 数据集中创建一个新的空表(可以选择使用模式),您可以使用 BigQueryCreateEmptyTableOperator

可采用两种方式之一指定 BigQuery 表的模式。您可以直接传入模式字段,也可以将运算符指向 Google Cloud Storage 对象名称。Google Cloud Storage 中的对象必须是包含模式字段的 JSON 文件。

tests/system/google/cloud/bigquery/example_bigquery_tables.py[源]

create_table = BigQueryCreateEmptyTableOperator(
    task_id="create_table",
    dataset_id=DATASET_NAME,
    table_id="test_table",
    schema_fields=[
        {"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
        {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
    ],
)

您可以使用此运算符在现有表之上创建视图。

tests/system/google/cloud/bigquery/example_bigquery_tables.py[源]

create_view = BigQueryCreateEmptyTableOperator(
    task_id="create_view",
    dataset_id=DATASET_NAME,
    table_id="test_view",
    view={
        "query": f"SELECT * FROM `{PROJECT_ID}.{DATASET_NAME}.test_table`",
        "useLegacySql": False,
    },
)

您还可以使用此运算符创建物化视图,该视图定期缓存查询结果,以提高性能和效率。

tests/system/google/cloud/bigquery/example_bigquery_tables.py[源]

create_materialized_view = BigQueryCreateEmptyTableOperator(
    task_id="create_materialized_view",
    dataset_id=DATASET_NAME,
    table_id="test_materialized_view",
    materialized_view={
        "query": f"SELECT SUM(salary) AS sum_salary FROM `{PROJECT_ID}.{DATASET_NAME}.test_table`",
        "enableRefresh": True,
        "refreshIntervalMs": 2000000,
    },
)

创建外部表

要在数据集中创建具有 Google Cloud Storage 中数据的新外部表,您可以使用 BigQueryCreateExternalTableOperator

BigQueryCreateEmptyTableOperator 类似,您可以直接传入模式字段。

tests/system/google/cloud/bigquery/example_bigquery_operations.py[源]

create_external_table = BigQueryCreateExternalTableOperator(
    task_id="create_external_table",
    destination_project_dataset_table=f"{DATASET_NAME}.external_table",
    bucket=DATA_SAMPLE_GCS_BUCKET_NAME,
    source_objects=[DATA_SAMPLE_GCS_OBJECT_NAME],
    schema_fields=[
        {"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
        {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
    ],
)

或者,您可以将运算符指向存储模式的 Google Cloud Storage 对象名称。

tests/system/google/cloud/bigquery/example_bigquery_tables.py[源]

update_table_schema_json = BigQueryCreateEmptyTableOperator(
    task_id="update_table_schema_json",
    dataset_id=DATASET_NAME,
    table_id="test_table",
    gcs_schema_object=GCS_PATH_TO_SCHEMA_JSON,
)

要使用 BigQuery 模式自动检测,请设置 autodetect 标志,而不是提供显式模式信息。

从表中提取数据

要从 BigQuery 表中提取数据,您可以使用 BigQueryGetDataOperator。或者,如果将字段传递给 selected_fields,则可以提取选定列的数据。

此运算符的结果可以以两种不同的格式检索,具体取决于 as_dict 参数的值:False(默认)- Python 列表的列表,其中嵌套列表中的元素数量将等于提取的行数。嵌套中的每个元素将是一个嵌套列表,其中元素将表示该行的列值。True - Python 字典的列表,其中每个字典表示一行。在每个字典中,键是列名,值是这些列的相应值。

tests/system/google/cloud/bigquery/example_bigquery_queries.py[源]

get_data = BigQueryGetDataOperator(
    task_id="get_data",
    dataset_id=DATASET,
    table_id=TABLE_1,
    max_results=10,
    selected_fields="value,name",
    location=location,
)

以下示例显示如何在异步(可延迟)模式下使用 BigQueryGetDataOperator。请注意,可延迟的任务需要触发器在您的 Airflow 部署上运行。

tests/system/google/cloud/bigquery/example_bigquery_queries_async.py[源]

get_data = BigQueryGetDataOperator(
    task_id="get_data",
    dataset_id=DATASET_NAME,
    table_id=TABLE_NAME_1,
    use_legacy_sql=False,
    max_results=10,
    selected_fields="value",
    location=LOCATION,
    deferrable=True,
)

更新插入表

要更新插入表,您可以使用 BigQueryUpsertTableOperator

此运算符要么更新现有表,要么在给定的数据集中创建一个新的空表。

tests/system/google/cloud/bigquery/example_bigquery_tables.py[源]

upsert_table = BigQueryUpsertTableOperator(
    task_id="upsert_table",
    dataset_id=DATASET_NAME,
    table_resource={
        "tableReference": {"tableId": "test_table_id"},
        "expirationTime": (int(time.time()) + 300) * 1000,
    },
)

更新表模式

要更新表的 schema,您可以使用 BigQueryUpdateTableSchemaOperator

此操作符会更新提供的 schema 字段值,而保留其余部分不变。例如,这对于在现有表 schema 上设置新的字段描述非常有用。

tests/system/google/cloud/bigquery/example_bigquery_tables.py[源]

update_table_schema = BigQueryUpdateTableSchemaOperator(
    task_id="update_table_schema",
    dataset_id=DATASET_NAME,
    table_id="test_table",
    schema_fields_updates=[
        {"name": "emp_name", "description": "Name of employee"},
        {"name": "salary", "description": "Monthly salary in USD"},
    ],
)

删除表

要删除现有表,您可以使用 BigQueryDeleteTableOperator

tests/system/google/cloud/bigquery/example_bigquery_tables.py[源]

delete_table = BigQueryDeleteTableOperator(
    task_id="delete_table",
    deletion_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.test_table",
)

您也可以使用此操作符删除视图。

tests/system/google/cloud/bigquery/example_bigquery_tables.py[源]

delete_view = BigQueryDeleteTableOperator(
    task_id="delete_view",
    deletion_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.test_view",
)

您也可以使用此操作符删除物化视图。

tests/system/google/cloud/bigquery/example_bigquery_tables.py[源]

delete_materialized_view = BigQueryDeleteTableOperator(
    task_id="delete_materialized_view",
    deletion_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.test_materialized_view",
)

执行 BigQuery 作业

假设您想要执行以下查询。

tests/system/google/cloud/bigquery/example_bigquery_queries.py[源]

    INSERT_ROWS_QUERY = (
        f"INSERT {DATASET}.{TABLE_1} VALUES "
        f"(42, 'monty python', '{INSERT_DATE}'), "
        f"(42, 'fishy fish', '{INSERT_DATE}');"
    )

要在特定的 BigQuery 数据库中执行 SQL 查询,您可以使用 BigQueryInsertJobOperator 以及适当的查询作业配置,该配置可以使用 Jinja 模板进行模板化。

tests/system/google/cloud/bigquery/example_bigquery_queries.py[源]

insert_query_job = BigQueryInsertJobOperator(
    task_id="insert_query_job",
    configuration={
        "query": {
            "query": INSERT_ROWS_QUERY,
            "useLegacySql": False,
            "priority": "BATCH",
        }
    },
    location=location,
)

以下示例展示了如何在异步(可延迟)模式下使用 BigQueryInsertJobOperator。请注意,可延迟的任务需要在您的 Airflow 部署上运行 Triggerer。

tests/system/google/cloud/bigquery/example_bigquery_queries_async.py[源]

insert_query_job = BigQueryInsertJobOperator(
    task_id="insert_query_job",
    configuration={
        "query": {
            "query": INSERT_ROWS_QUERY,
            "useLegacySql": False,
            "priority": "BATCH",
        }
    },
    location=LOCATION,
    deferrable=True,
)

有关 BigQuery 作业类型的更多信息,请查看文档

如果您想在配置中包含一些文件,您可以使用 Jinja 模板语言的 include 子句,如下所示

tests/system/google/cloud/bigquery/example_bigquery_queries.py[源]

select_query_job = BigQueryInsertJobOperator(
    task_id="select_query_job",
    configuration={
        "query": {
            "query": "{% include QUERY_SQL_PATH %}",
            "useLegacySql": False,
        }
    },
    location=location,
)

包含的文件也可以使用 Jinja 模板,这在 .sql 文件的情况下非常有用。

此外,您可以使用 BigQueryInsertJobOperatorjob_id 参数来提高幂等性。如果未传递此参数,则 uuid 将用作 job_id。如果提供了此参数,则操作符将尝试使用此 job_id` 提交新作业。如果已经存在具有此类 job_id 的作业,则它将重新附加到现有作业。

此外,对于所有这些操作,您可以在可延迟模式下使用操作符

tests/system/google/cloud/bigquery/example_bigquery_queries_async.py[源]

insert_query_job = BigQueryInsertJobOperator(
    task_id="insert_query_job",
    configuration={
        "query": {
            "query": INSERT_ROWS_QUERY,
            "useLegacySql": False,
            "priority": "BATCH",
        }
    },
    location=LOCATION,
    deferrable=True,
)

验证数据

检查查询结果是否有数据

要对 BigQuery 执行检查,您可以使用 BigQueryCheckOperator

此操作符期望一个 SQL 查询,该查询将返回单行。该第一行上的每个值都使用 Python bool 转换进行评估。如果任何值返回 False,则检查失败并报错。

tests/system/google/cloud/bigquery/example_bigquery_queries.py[源]

check_count = BigQueryCheckOperator(
    task_id="check_count",
    sql=f"SELECT COUNT(*) FROM {DATASET}.{TABLE_1}",
    use_legacy_sql=False,
    location=location,
)

您也可以在此操作符中使用可延迟模式

tests/system/google/cloud/bigquery/example_bigquery_queries_async.py[源]

check_count = BigQueryCheckOperator(
    task_id="check_count",
    sql=f"SELECT COUNT(*) FROM {DATASET_NAME}.{TABLE_NAME_1}",
    use_legacy_sql=False,
    location=LOCATION,
    deferrable=True,
)

将查询结果与传递值进行比较

要使用 SQL 代码执行简单的值检查,您可以使用 BigQueryValueCheckOperator

这些操作符期望一个 SQL 查询,该查询将返回单行。该第一行上的每个值都针对 pass_value 进行评估,该值可以是字符串或数值。如果是数值,您还可以指定 tolerance

tests/system/google/cloud/bigquery/example_bigquery_queries.py[源]

check_value = BigQueryValueCheckOperator(
    task_id="check_value",
    sql=f"SELECT COUNT(*) FROM {DATASET}.{TABLE_1}",
    pass_value=4,
    use_legacy_sql=False,
    location=location,
)

您也可以在此操作符中使用可延迟模式

tests/system/google/cloud/bigquery/example_bigquery_queries_async.py[源]

check_value = BigQueryValueCheckOperator(
    task_id="check_value",
    sql=f"SELECT COUNT(*) FROM {DATASET_NAME}.{TABLE_NAME_1}",
    pass_value=2,
    use_legacy_sql=False,
    location=LOCATION,
    deferrable=True,
)

比较一段时间内的指标

要检查以 SQL 表达式给出的指标值是否在 days_back 之前的指定容差范围内,您可以使用 BigQueryIntervalCheckOperatorBigQueryIntervalCheckAsyncOperator

tests/system/google/cloud/bigquery/example_bigquery_queries.py[源]

check_interval = BigQueryIntervalCheckOperator(
    task_id="check_interval",
    table=f"{DATASET}.{TABLE_1}",
    days_back=1,
    metrics_thresholds={"COUNT(*)": 1.5},
    use_legacy_sql=False,
    location=location,
)

您也可以在此操作符中使用可延迟模式

tests/system/google/cloud/bigquery/example_bigquery_queries_async.py[源]

check_interval = BigQueryIntervalCheckOperator(
    task_id="check_interval",
    table=f"{DATASET_NAME}.{TABLE_NAME_1}",
    days_back=1,
    metrics_thresholds={"COUNT(*)": 1.5},
    use_legacy_sql=False,
    location=LOCATION,
    deferrable=True,
)

使用预定义测试检查列

要检查列是否通过用户可配置的测试,您可以使用 BigQueryColumnCheckOperator

tests/system/google/cloud/bigquery/example_bigquery_queries.py[源]

    column_check = BigQueryColumnCheckOperator(
        task_id="column_check",
        table=f"{DATASET}.{TABLE_1}",
        column_mapping={"value": {"null_check": {"equal_to": 0}}},
    )

检查表级别数据质量

要检查表是否通过用户定义的测试,您可以使用 BigQueryTableCheckOperator

tests/system/google/cloud/bigquery/example_bigquery_queries.py[源]

    table_check = BigQueryTableCheckOperator(
        task_id="table_check",
        table=f"{DATASET}.{TABLE_1}",
        checks={"row_count_check": {"check_statement": "COUNT(*) = 4"}},
    )

传感器

检查表是否存在

要检查表是否存在,您可以定义一个传感器操作符。这允许延迟下游操作符的执行,直到表存在。如果表按日期分片,您可以使用 {{ ds_nodash }} 宏作为表名称后缀。

BigQueryTableExistenceSensor.

tests/system/google/cloud/bigquery/example_bigquery_sensors.py[源代码]

check_table_exists = BigQueryTableExistenceSensor(
    task_id="check_table_exists", project_id=PROJECT_ID, dataset_id=DATASET_NAME, table_id=TABLE_NAME
)

如果您想在传感器运行时释放 worker 插槽,您也可以在此操作符中使用可延迟模式。

tests/system/google/cloud/bigquery/example_bigquery_sensors.py[源代码]

check_table_exists_def = BigQueryTableExistenceSensor(
    task_id="check_table_exists_def",
    project_id=PROJECT_ID,
    dataset_id=DATASET_NAME,
    table_id=TABLE_NAME,
    deferrable=True,
)

tests/system/google/cloud/bigquery/example_bigquery_sensors.py[源代码]

check_table_exists_async = BigQueryTableExistenceSensor(
    task_id="check_table_exists_async",
    project_id=PROJECT_ID,
    dataset_id=DATASET_NAME,
    table_id=TABLE_NAME,
)

检查表分区是否存在

要检查表是否存在并且具有分区,您可以使用 BigQueryTablePartitionExistenceSensor

tests/system/google/cloud/bigquery/example_bigquery_sensors.py[源代码]

check_table_partition_exists = BigQueryTablePartitionExistenceSensor(
    task_id="check_table_partition_exists",
    project_id=PROJECT_ID,
    dataset_id=DATASET_NAME,
    table_id=TABLE_NAME,
    partition_id=PARTITION_NAME,
)

对于按 DAY 分区的表,partition_id 参数是 “%Y%m%d” 格式的字符串

如果您想在传感器运行时释放 worker 插槽,您也可以在此操作符中使用可延迟模式。

tests/system/google/cloud/bigquery/example_bigquery_sensors.py[源代码]

check_table_partition_exists_def = BigQueryTablePartitionExistenceSensor(
    task_id="check_table_partition_exists_def",
    project_id=PROJECT_ID,
    dataset_id=DATASET_NAME,
    table_id=TABLE_NAME,
    partition_id=PARTITION_NAME,
    deferrable=True,
)

tests/system/google/cloud/bigquery/example_bigquery_sensors.py[源代码]

check_table_partition_exists_async = BigQueryTablePartitionExistenceSensor(
    task_id="check_table_partition_exists_async",
    partition_id=PARTITION_NAME,
    project_id=PROJECT_ID,
    dataset_id=DATASET_NAME,
    table_id=TABLE_NAME,
)

参考

有关更多信息,请查看

此条目是否有帮助?