Google Cloud BigQuery 算子

BigQuery 是 Google 的全托管、PB 级规模、低成本分析数据仓库。它是一种无需数据库管理员的无服务器软件即服务 (SaaS)。它允许用户专注于使用熟悉的 SQL 分析数据以找出有意义的见解。

Airflow 提供算子来管理数据集和表格、运行查询和验证数据。

先决任务

要使用这些算子,您必须执行以下操作

管理数据集

创建数据集

要在 BigQuery 数据库中创建空数据集,可以使用 BigQueryCreateEmptyDatasetOperator

tests/system/providers/google/cloud/bigquery/example_bigquery_dataset.py[源代码]

create_dataset = BigQueryCreateEmptyDatasetOperator(task_id="create_dataset", dataset_id=DATASET_NAME)

获取数据集详细信息

要获取现有数据集的详细信息,可以使用 BigQueryGetDatasetOperator

此运算符返回 数据集资源

tests/system/providers/google/cloud/bigquery/example_bigquery_dataset.py[源代码]

get_dataset = BigQueryGetDatasetOperator(task_id="get-dataset", dataset_id=DATASET_NAME)

列出数据集中的表格

要检索给定数据集中表格的列表,请使用 BigQueryGetDatasetTablesOperator

tests/system/providers/google/cloud/bigquery/example_bigquery_tables.py[源代码]

get_dataset_tables = BigQueryGetDatasetTablesOperator(
    task_id="get_dataset_tables", dataset_id=DATASET_NAME
)

更新表格

要在 BigQuery 中更新表格,可以使用 BigQueryUpdateTableOperator

更新方法会替换整个 Table 资源,而修补程序方法只会替换提交的 Table 资源中提供的字段。

tests/system/providers/google/cloud/bigquery/example_bigquery_tables.py[源代码]

update_table = BigQueryUpdateTableOperator(
    task_id="update_table",
    dataset_id=DATASET_NAME,
    table_id="test_table",
    fields=["friendlyName", "description"],
    table_resource={
        "friendlyName": "Updated Table",
        "description": "Updated Table",
    },
)

更新数据集

要在 BigQuery 中更新数据集,可以使用 BigQueryUpdateDatasetOperator

更新方法会替换整个数据集资源,而修补程序方法只会替换提交的数据集资源中提供的字段。

tests/system/providers/google/cloud/bigquery/example_bigquery_dataset.py[源代码]

update_dataset = BigQueryUpdateDatasetOperator(
    task_id="update_dataset",
    dataset_id=DATASET_NAME,
    dataset_resource={"description": "Updated dataset"},
)

删除数据集

要从 BigQuery 数据库中删除现有数据集,可以使用 BigQueryDeleteDatasetOperator

tests/system/providers/google/cloud/bigquery/example_bigquery_dataset.py[源代码]

delete_dataset = BigQueryDeleteDatasetOperator(
    task_id="delete_dataset", dataset_id=DATASET_NAME, delete_contents=True
)

管理表格

创建原生表格

要在给定的 BigQuery 数据集中创建一个新的空表格(可选,带有架构),可以使用 BigQueryCreateEmptyTableOperator

用于 BigQuery 表格的架构可以用两种方式指定。你可以直接传入架构字段,也可以将操作员指向 Google Cloud Storage 对象名称。Google Cloud Storage 中的对象必须是包含架构字段的 JSON 文件。

tests/system/providers/google/cloud/bigquery/example_bigquery_tables.py[源代码]

create_table = BigQueryCreateEmptyTableOperator(
    task_id="create_table",
    dataset_id=DATASET_NAME,
    table_id="test_table",
    schema_fields=[
        {"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
        {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
    ],
)

你可以使用此操作员在现有表格上创建视图。

tests/system/providers/google/cloud/bigquery/example_bigquery_tables.py[源代码]

create_view = BigQueryCreateEmptyTableOperator(
    task_id="create_view",
    dataset_id=DATASET_NAME,
    table_id="test_view",
    view={
        "query": f"SELECT * FROM `{PROJECT_ID}.{DATASET_NAME}.test_table`",
        "useLegacySql": False,
    },
)

你还可以使用此操作员创建物化视图,定期缓存查询结果,以提高性能和效率。

tests/system/providers/google/cloud/bigquery/example_bigquery_tables.py[源代码]

create_materialized_view = BigQueryCreateEmptyTableOperator(
    task_id="create_materialized_view",
    dataset_id=DATASET_NAME,
    table_id="test_materialized_view",
    materialized_view={
        "query": f"SELECT SUM(salary) AS sum_salary FROM `{PROJECT_ID}.{DATASET_NAME}.test_table`",
        "enableRefresh": True,
        "refreshIntervalMs": 2000000,
    },
)

创建外部表格

要在数据集中使用 Google Cloud Storage 中的数据创建新的外部表格,可以使用 BigQueryCreateExternalTableOperator

类似于 BigQueryCreateEmptyTableOperator,你可以直接传入架构字段。

tests/system/providers/google/cloud/bigquery/example_bigquery_operations.py[源代码]

create_external_table = BigQueryCreateExternalTableOperator(
    task_id="create_external_table",
    destination_project_dataset_table=f"{DATASET_NAME}.external_table",
    bucket=DATA_SAMPLE_GCS_BUCKET_NAME,
    source_objects=[DATA_SAMPLE_GCS_OBJECT_NAME],
    schema_fields=[
        {"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
        {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
    ],
)

或者,你可以将操作员指向存储架构的 Google Cloud Storage 对象名称。

tests/system/providers/google/cloud/bigquery/example_bigquery_tables.py[源代码]

update_table_schema_json = BigQueryCreateEmptyTableOperator(
    task_id="update_table_schema_json",
    dataset_id=DATASET_NAME,
    table_id="test_table",
    gcs_schema_object=GCS_PATH_TO_SCHEMA_JSON,
)

要使用 BigQuery 架构自动检测,请设置 autodetect 标志,而不是提供明确的架构信息。

从表中获取数据

要从 BigQuery 表中获取数据,你可以使用 BigQueryGetDataOperator 。或者,如果你将字段传递给 selected_fields,则可以获取所选列的数据。

此操作员的结果可以根据 as_dict 参数的值以两种不同的格式检索:False(默认) - Python 列表的列表,其中嵌套列表中的元素数量将等于获取的行数。嵌套中的每个元素都将是嵌套列表,其中元素将表示该行的列值。 True - Python 字典列表,其中每个字典都表示一行。在每个字典中,键是列名,值是这些列的相应值。

tests/system/providers/google/cloud/bigquery/example_bigquery_queries.py[源代码]

get_data = BigQueryGetDataOperator(
    task_id="get_data",
    dataset_id=DATASET,
    table_id=TABLE_1,
    max_results=10,
    selected_fields="value,name",
    location=location,
)

以下示例展示了如何在异步(可延迟)模式下使用 BigQueryGetDataOperator 。请注意,可延迟任务要求触发器在你的 Airflow 部署上运行。

tests/system/providers/google/cloud/bigquery/example_bigquery_queries_async.py[源代码]

get_data = BigQueryGetDataOperator(
    task_id="get_data",
    dataset_id=DATASET_NAME,
    table_id=TABLE_NAME_1,
    use_legacy_sql=False,
    max_results=10,
    selected_fields="value",
    location=LOCATION,
    deferrable=True,
)

插入或更新表

若要插入或更新表,可以使用 BigQueryUpsertTableOperator

此操作员会更新现有表或在给定数据集中创建一个新的空表。

tests/system/providers/google/cloud/bigquery/example_bigquery_tables.py[源代码]

upsert_table = BigQueryUpsertTableOperator(
    task_id="upsert_table",
    dataset_id=DATASET_NAME,
    table_resource={
        "tableReference": {"tableId": "test_table_id"},
        "expirationTime": (int(time.time()) + 300) * 1000,
    },
)

更新表架构

若要更新表的架构,可以使用 BigQueryUpdateTableSchemaOperator

此操作员会更新提供的架构字段值,同时保持其余字段不变。例如,这对于在现有表架构上设置新的字段描述很有用。

tests/system/providers/google/cloud/bigquery/example_bigquery_tables.py[源代码]

update_table_schema = BigQueryUpdateTableSchemaOperator(
    task_id="update_table_schema",
    dataset_id=DATASET_NAME,
    table_id="test_table",
    schema_fields_updates=[
        {"name": "emp_name", "description": "Name of employee"},
        {"name": "salary", "description": "Monthly salary in USD"},
    ],
)

删除表

若要删除现有表,可以使用 BigQueryDeleteTableOperator

tests/system/providers/google/cloud/bigquery/example_bigquery_tables.py[源代码]

delete_table = BigQueryDeleteTableOperator(
    task_id="delete_table",
    deletion_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.test_table",
)

您还可以使用此操作员来删除视图。

tests/system/providers/google/cloud/bigquery/example_bigquery_tables.py[源代码]

delete_view = BigQueryDeleteTableOperator(
    task_id="delete_view",
    deletion_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.test_view",
)

您还可以使用此操作员来删除物化视图。

tests/system/providers/google/cloud/bigquery/example_bigquery_tables.py[源代码]

delete_materialized_view = BigQueryDeleteTableOperator(
    task_id="delete_materialized_view",
    deletion_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.test_materialized_view",
)

执行 BigQuery 作业

假设您要执行以下查询。

tests/system/providers/google/cloud/bigquery/example_bigquery_queries.py[源代码]

    INSERT_ROWS_QUERY = (
        f"INSERT {DATASET}.{TABLE_1} VALUES "
        f"(42, 'monty python', '{INSERT_DATE}'), "
        f"(42, 'fishy fish', '{INSERT_DATE}');"
    )

若要在特定 BigQuery 数据库中执行 SQL 查询,可以使用 BigQueryInsertJobOperator,其中包含可以进行 Jinja 模板化的正确查询作业配置。

tests/system/providers/google/cloud/bigquery/example_bigquery_queries.py[源代码]

insert_query_job = BigQueryInsertJobOperator(
    task_id="insert_query_job",
    configuration={
        "query": {
            "query": INSERT_ROWS_QUERY,
            "useLegacySql": False,
            "priority": "BATCH",
        }
    },
    location=location,
)

以下示例展示了如何在异步(可延迟)模式下使用 BigQueryInsertJobOperator。请注意,可延迟任务要求在你的 Airflow 部署上运行触发器。

tests/system/providers/google/cloud/bigquery/example_bigquery_queries_async.py[源代码]

insert_query_job = BigQueryInsertJobOperator(
    task_id="insert_query_job",
    configuration={
        "query": {
            "query": INSERT_ROWS_QUERY,
            "useLegacySql": False,
            "priority": "BATCH",
        }
    },
    location=LOCATION,
    deferrable=True,
)

有关 BigQuery 作业类型的更多信息,请查看 文档

如果你想在你的配置中包含一些文件,你可以使用 Jinja 模板语言的 include 子句,如下所示

tests/system/providers/google/cloud/bigquery/example_bigquery_queries.py[源代码]

select_query_job = BigQueryInsertJobOperator(
    task_id="select_query_job",
    configuration={
        "query": {
            "query": "{% include QUERY_SQL_PATH %}",
            "useLegacySql": False,
        }
    },
    location=location,
)

包含的文件也可以使用 Jinja 模板,这在 .sql 文件的情况下可能很有用。

此外,你可以使用 BigQueryInsertJobOperatorjob_id 参数来提高幂等性。如果未传递此参数,则会使用 uuid 作为 job_id。如果提供了,则运算符将尝试使用此 job_id` 提交一个新作业。如果已经存在具有此类 job_id 的作业,则它将重新附加到现有作业。

此外,对于所有这些操作,你可以在可延迟模式下使用运算符

tests/system/providers/google/cloud/bigquery/example_bigquery_queries_async.py[源代码]

insert_query_job = BigQueryInsertJobOperator(
    task_id="insert_query_job",
    configuration={
        "query": {
            "query": INSERT_ROWS_QUERY,
            "useLegacySql": False,
            "priority": "BATCH",
        }
    },
    location=LOCATION,
    deferrable=True,
)

验证数据

检查查询结果是否包含数据

要针对 BigQuery 执行检查,你可以使用 BigQueryCheckOperator

此运算符需要一个将返回单行的 SQL 查询。该第一行上的每个值都使用 Python bool 转换进行评估。如果任何值返回 False,则检查失败并出错。

tests/system/providers/google/cloud/bigquery/example_bigquery_queries.py[源代码]

check_count = BigQueryCheckOperator(
    task_id="check_count",
    sql=f"SELECT COUNT(*) FROM {DATASET}.{TABLE_1}",
    use_legacy_sql=False,
    location=location,
)

你也可以在此运算符中使用可延迟模式

tests/system/providers/google/cloud/bigquery/example_bigquery_queries_async.py[源代码]

check_count = BigQueryCheckOperator(
    task_id="check_count",
    sql=f"SELECT COUNT(*) FROM {DATASET_NAME}.{TABLE_NAME_1}",
    use_legacy_sql=False,
    location=LOCATION,
    deferrable=True,
)

将查询结果与通过值进行比较

要使用 SQL 代码执行简单值检查,你可以使用 BigQueryValueCheckOperator

这些运算符期望返回单行的 SQL 查询。第一行中的每个值都会针对 pass_value 进行评估,后者可以是字符串或数字值。如果是数字,还可以指定 tolerance

tests/system/providers/google/cloud/bigquery/example_bigquery_queries.py[源代码]

check_value = BigQueryValueCheckOperator(
    task_id="check_value",
    sql=f"SELECT COUNT(*) FROM {DATASET}.{TABLE_1}",
    pass_value=4,
    use_legacy_sql=False,
    location=location,
)

你也可以在此运算符中使用可延迟模式

tests/system/providers/google/cloud/bigquery/example_bigquery_queries_async.py[源代码]

check_value = BigQueryValueCheckOperator(
    task_id="check_value",
    sql=f"SELECT COUNT(*) FROM {DATASET_NAME}.{TABLE_NAME_1}",
    pass_value=2,
    use_legacy_sql=False,
    location=LOCATION,
    deferrable=True,
)

比较一段时间内的指标

要检查作为 SQL 表达式给出的指标的值是否在 days_back 之前的某个容差范围内,可以使用 BigQueryIntervalCheckOperatorBigQueryIntervalCheckAsyncOperator

tests/system/providers/google/cloud/bigquery/example_bigquery_queries.py[源代码]

check_interval = BigQueryIntervalCheckOperator(
    task_id="check_interval",
    table=f"{DATASET}.{TABLE_1}",
    days_back=1,
    metrics_thresholds={"COUNT(*)": 1.5},
    use_legacy_sql=False,
    location=location,
)

你也可以在此运算符中使用可延迟模式

tests/system/providers/google/cloud/bigquery/example_bigquery_queries_async.py[源代码]

check_interval = BigQueryIntervalCheckOperator(
    task_id="check_interval",
    table=f"{DATASET_NAME}.{TABLE_NAME_1}",
    days_back=1,
    metrics_thresholds={"COUNT(*)": 1.5},
    use_legacy_sql=False,
    location=LOCATION,
    deferrable=True,
)

使用预定义测试检查列

要检查列是否通过用户可配置的测试,可以使用 BigQueryColumnCheckOperator

tests/system/providers/google/cloud/bigquery/example_bigquery_queries.py[源代码]

    column_check = BigQueryColumnCheckOperator(
        task_id="column_check",
        table=f"{DATASET}.{TABLE_1}",
        column_mapping={"value": {"null_check": {"equal_to": 0}}},
    )

检查表级别的数据质量

要检查表是否通过用户定义的测试,可以使用 BigQueryTableCheckOperator

tests/system/providers/google/cloud/bigquery/example_bigquery_queries.py[源代码]

    table_check = BigQueryTableCheckOperator(
        task_id="table_check",
        table=f"{DATASET}.{TABLE_1}",
        checks={"row_count_check": {"check_statement": "COUNT(*) = 4"}},
    )

传感器

检查表是否存在

要检查表是否存在,可以定义传感器运算符。这允许延迟执行下游运算符,直到表存在。如果表按日期分片,则可以将 {{ ds_nodash }} 宏用作表名后缀。

BigQueryTableExistenceSensor.

tests/system/providers/google/cloud/bigquery/example_bigquery_sensors.py[源代码]

check_table_exists = BigQueryTableExistenceSensor(
    task_id="check_table_exists", project_id=PROJECT_ID, dataset_id=DATASET_NAME, table_id=TABLE_NAME
)

如果您希望在传感器运行时释放工作槽,也可以在此运算符中使用可延迟模式。

tests/system/providers/google/cloud/bigquery/example_bigquery_sensors.py[源代码]

check_table_exists_def = BigQueryTableExistenceSensor(
    task_id="check_table_exists_def",
    project_id=PROJECT_ID,
    dataset_id=DATASET_NAME,
    table_id=TABLE_NAME,
    deferrable=True,
)

tests/system/providers/google/cloud/bigquery/example_bigquery_sensors.py[源代码]

check_table_exists_async = BigQueryTableExistenceSensor(
    task_id="check_table_exists_async",
    project_id=PROJECT_ID,
    dataset_id=DATASET_NAME,
    table_id=TABLE_NAME,
)

检查表分区是否存在

要检查表是否存在并具有分区,可以使用 BigQueryTablePartitionExistenceSensor

tests/system/providers/google/cloud/bigquery/example_bigquery_sensors.py[源代码]

check_table_partition_exists = BigQueryTablePartitionExistenceSensor(
    task_id="check_table_partition_exists",
    project_id=PROJECT_ID,
    dataset_id=DATASET_NAME,
    table_id=TABLE_NAME,
    partition_id=PARTITION_NAME,
)

对于按天分区的表,partition_id 参数是“%Y%m%d”格式的字符串

如果您希望在传感器运行时释放工作槽,也可以在此运算符中使用可延迟模式。

tests/system/providers/google/cloud/bigquery/example_bigquery_sensors.py[源代码]

check_table_partition_exists_def = BigQueryTablePartitionExistenceSensor(
    task_id="check_table_partition_exists_def",
    project_id=PROJECT_ID,
    dataset_id=DATASET_NAME,
    table_id=TABLE_NAME,
    partition_id=PARTITION_NAME,
    deferrable=True,
)

tests/system/providers/google/cloud/bigquery/example_bigquery_sensors.py[源代码]

check_table_partition_exists_async = BigQueryTablePartitionExistenceSensor(
    task_id="check_table_partition_exists_async",
    partition_id=PARTITION_NAME,
    project_id=PROJECT_ID,
    dataset_id=DATASET_NAME,
    table_id=TABLE_NAME,
)

参考

有关更多信息,请查看

此条目是否有用?