Google Cloud BigQuery 运算符¶
BigQuery 是 Google 完全托管的、PB 级、低成本分析数据仓库。它是一种无需数据库管理员的无服务器软件即服务 (SaaS)。它允许用户专注于分析数据,使用熟悉的 SQL 查找有意义的见解。
Airflow 提供运算符来管理数据集和表,运行查询和验证数据。
先决条件任务¶
要使用这些运算符,您必须执行以下几项操作
使用 Cloud Console 选择或创建云平台项目。
按照 Google Cloud 文档中的说明,为您的项目启用结算功能。
按照 Cloud Console 文档中的说明启用 API。
通过 pip 安装 API 库。
pip install 'apache-airflow[google]'有关详细信息,请参阅 安装。
管理数据集¶
创建数据集¶
要在 BigQuery 数据库中创建空数据集,您可以使用 BigQueryCreateEmptyDatasetOperator
。
create_dataset = BigQueryCreateEmptyDatasetOperator(task_id="create_dataset", dataset_id=DATASET_NAME)
获取数据集详细信息¶
要获取现有数据集的详细信息,您可以使用 BigQueryGetDatasetOperator
。
此运算符返回一个 Dataset Resource。
get_dataset = BigQueryGetDatasetOperator(task_id="get-dataset", dataset_id=DATASET_NAME)
列出数据集中的表¶
要检索给定数据集中表的列表,请使用 BigQueryGetDatasetTablesOperator
。
get_dataset_tables = BigQueryGetDatasetTablesOperator(
task_id="get_dataset_tables", dataset_id=DATASET_NAME
)
更新表¶
要在 BigQuery 中更新表,您可以使用 BigQueryUpdateTableOperator
。
update 方法替换整个 Table 资源,而 patch 方法仅替换提交的 Table 资源中提供的字段。
update_table = BigQueryUpdateTableOperator(
task_id="update_table",
dataset_id=DATASET_NAME,
table_id="test_table",
fields=["friendlyName", "description"],
table_resource={
"friendlyName": "Updated Table",
"description": "Updated Table",
},
)
更新数据集¶
要在 BigQuery 中更新数据集,您可以使用 BigQueryUpdateDatasetOperator
。
update 方法替换整个数据集资源,而 patch 方法仅替换提交的数据集资源中提供的字段。
update_dataset = BigQueryUpdateDatasetOperator(
task_id="update_dataset",
dataset_id=DATASET_NAME,
dataset_resource={"description": "Updated dataset"},
)
删除数据集¶
要从 BigQuery 数据库中删除现有数据集,您可以使用 BigQueryDeleteDatasetOperator
。
delete_dataset = BigQueryDeleteDatasetOperator(
task_id="delete_dataset", dataset_id=DATASET_NAME, delete_contents=True
)
管理表¶
创建原生表¶
要在给定的 BigQuery 数据集中创建一个新的空表(可以选择使用模式),您可以使用 BigQueryCreateEmptyTableOperator
。
可采用两种方式之一指定 BigQuery 表的模式。您可以直接传入模式字段,也可以将运算符指向 Google Cloud Storage 对象名称。Google Cloud Storage 中的对象必须是包含模式字段的 JSON 文件。
create_table = BigQueryCreateEmptyTableOperator(
task_id="create_table",
dataset_id=DATASET_NAME,
table_id="test_table",
schema_fields=[
{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
{"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
],
)
您可以使用此运算符在现有表之上创建视图。
create_view = BigQueryCreateEmptyTableOperator(
task_id="create_view",
dataset_id=DATASET_NAME,
table_id="test_view",
view={
"query": f"SELECT * FROM `{PROJECT_ID}.{DATASET_NAME}.test_table`",
"useLegacySql": False,
},
)
您还可以使用此运算符创建物化视图,该视图定期缓存查询结果,以提高性能和效率。
create_materialized_view = BigQueryCreateEmptyTableOperator(
task_id="create_materialized_view",
dataset_id=DATASET_NAME,
table_id="test_materialized_view",
materialized_view={
"query": f"SELECT SUM(salary) AS sum_salary FROM `{PROJECT_ID}.{DATASET_NAME}.test_table`",
"enableRefresh": True,
"refreshIntervalMs": 2000000,
},
)
创建外部表¶
要在数据集中创建具有 Google Cloud Storage 中数据的新外部表,您可以使用 BigQueryCreateExternalTableOperator
。
与 BigQueryCreateEmptyTableOperator
类似,您可以直接传入模式字段。
create_external_table = BigQueryCreateExternalTableOperator(
task_id="create_external_table",
destination_project_dataset_table=f"{DATASET_NAME}.external_table",
bucket=DATA_SAMPLE_GCS_BUCKET_NAME,
source_objects=[DATA_SAMPLE_GCS_OBJECT_NAME],
schema_fields=[
{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
{"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
],
)
或者,您可以将运算符指向存储模式的 Google Cloud Storage 对象名称。
update_table_schema_json = BigQueryCreateEmptyTableOperator(
task_id="update_table_schema_json",
dataset_id=DATASET_NAME,
table_id="test_table",
gcs_schema_object=GCS_PATH_TO_SCHEMA_JSON,
)
要使用 BigQuery 模式自动检测,请设置 autodetect
标志,而不是提供显式模式信息。
从表中提取数据¶
要从 BigQuery 表中提取数据,您可以使用 BigQueryGetDataOperator
。或者,如果将字段传递给 selected_fields
,则可以提取选定列的数据。
此运算符的结果可以以两种不同的格式检索,具体取决于 as_dict
参数的值:False
(默认)- Python 列表的列表,其中嵌套列表中的元素数量将等于提取的行数。嵌套中的每个元素将是一个嵌套列表,其中元素将表示该行的列值。True
- Python 字典的列表,其中每个字典表示一行。在每个字典中,键是列名,值是这些列的相应值。
get_data = BigQueryGetDataOperator(
task_id="get_data",
dataset_id=DATASET,
table_id=TABLE_1,
max_results=10,
selected_fields="value,name",
location=location,
)
以下示例显示如何在异步(可延迟)模式下使用 BigQueryGetDataOperator
。请注意,可延迟的任务需要触发器在您的 Airflow 部署上运行。
get_data = BigQueryGetDataOperator(
task_id="get_data",
dataset_id=DATASET_NAME,
table_id=TABLE_NAME_1,
use_legacy_sql=False,
max_results=10,
selected_fields="value",
location=LOCATION,
deferrable=True,
)
更新插入表¶
要更新插入表,您可以使用 BigQueryUpsertTableOperator
。
此运算符要么更新现有表,要么在给定的数据集中创建一个新的空表。
upsert_table = BigQueryUpsertTableOperator(
task_id="upsert_table",
dataset_id=DATASET_NAME,
table_resource={
"tableReference": {"tableId": "test_table_id"},
"expirationTime": (int(time.time()) + 300) * 1000,
},
)
更新表模式¶
要更新表的 schema,您可以使用 BigQueryUpdateTableSchemaOperator
。
此操作符会更新提供的 schema 字段值,而保留其余部分不变。例如,这对于在现有表 schema 上设置新的字段描述非常有用。
update_table_schema = BigQueryUpdateTableSchemaOperator(
task_id="update_table_schema",
dataset_id=DATASET_NAME,
table_id="test_table",
schema_fields_updates=[
{"name": "emp_name", "description": "Name of employee"},
{"name": "salary", "description": "Monthly salary in USD"},
],
)
删除表¶
要删除现有表,您可以使用 BigQueryDeleteTableOperator
。
delete_table = BigQueryDeleteTableOperator(
task_id="delete_table",
deletion_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.test_table",
)
您也可以使用此操作符删除视图。
delete_view = BigQueryDeleteTableOperator(
task_id="delete_view",
deletion_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.test_view",
)
您也可以使用此操作符删除物化视图。
delete_materialized_view = BigQueryDeleteTableOperator(
task_id="delete_materialized_view",
deletion_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.test_materialized_view",
)
执行 BigQuery 作业¶
假设您想要执行以下查询。
INSERT_ROWS_QUERY = (
f"INSERT {DATASET}.{TABLE_1} VALUES "
f"(42, 'monty python', '{INSERT_DATE}'), "
f"(42, 'fishy fish', '{INSERT_DATE}');"
)
要在特定的 BigQuery 数据库中执行 SQL 查询,您可以使用 BigQueryInsertJobOperator
以及适当的查询作业配置,该配置可以使用 Jinja 模板进行模板化。
insert_query_job = BigQueryInsertJobOperator(
task_id="insert_query_job",
configuration={
"query": {
"query": INSERT_ROWS_QUERY,
"useLegacySql": False,
"priority": "BATCH",
}
},
location=location,
)
以下示例展示了如何在异步(可延迟)模式下使用 BigQueryInsertJobOperator
。请注意,可延迟的任务需要在您的 Airflow 部署上运行 Triggerer。
insert_query_job = BigQueryInsertJobOperator(
task_id="insert_query_job",
configuration={
"query": {
"query": INSERT_ROWS_QUERY,
"useLegacySql": False,
"priority": "BATCH",
}
},
location=LOCATION,
deferrable=True,
)
有关 BigQuery 作业类型的更多信息,请查看文档。
如果您想在配置中包含一些文件,您可以使用 Jinja 模板语言的 include
子句,如下所示
select_query_job = BigQueryInsertJobOperator(
task_id="select_query_job",
configuration={
"query": {
"query": "{% include QUERY_SQL_PATH %}",
"useLegacySql": False,
}
},
location=location,
)
包含的文件也可以使用 Jinja 模板,这在 .sql
文件的情况下非常有用。
此外,您可以使用 BigQueryInsertJobOperator
的 job_id
参数来提高幂等性。如果未传递此参数,则 uuid 将用作 job_id
。如果提供了此参数,则操作符将尝试使用此 job_id`
提交新作业。如果已经存在具有此类 job_id
的作业,则它将重新附加到现有作业。
此外,对于所有这些操作,您可以在可延迟模式下使用操作符
insert_query_job = BigQueryInsertJobOperator(
task_id="insert_query_job",
configuration={
"query": {
"query": INSERT_ROWS_QUERY,
"useLegacySql": False,
"priority": "BATCH",
}
},
location=LOCATION,
deferrable=True,
)
验证数据¶
检查查询结果是否有数据¶
要对 BigQuery 执行检查,您可以使用 BigQueryCheckOperator
此操作符期望一个 SQL 查询,该查询将返回单行。该第一行上的每个值都使用 Python bool
转换进行评估。如果任何值返回 False
,则检查失败并报错。
check_count = BigQueryCheckOperator(
task_id="check_count",
sql=f"SELECT COUNT(*) FROM {DATASET}.{TABLE_1}",
use_legacy_sql=False,
location=location,
)
您也可以在此操作符中使用可延迟模式
check_count = BigQueryCheckOperator(
task_id="check_count",
sql=f"SELECT COUNT(*) FROM {DATASET_NAME}.{TABLE_NAME_1}",
use_legacy_sql=False,
location=LOCATION,
deferrable=True,
)
将查询结果与传递值进行比较¶
要使用 SQL 代码执行简单的值检查,您可以使用 BigQueryValueCheckOperator
这些操作符期望一个 SQL 查询,该查询将返回单行。该第一行上的每个值都针对 pass_value
进行评估,该值可以是字符串或数值。如果是数值,您还可以指定 tolerance
。
check_value = BigQueryValueCheckOperator(
task_id="check_value",
sql=f"SELECT COUNT(*) FROM {DATASET}.{TABLE_1}",
pass_value=4,
use_legacy_sql=False,
location=location,
)
您也可以在此操作符中使用可延迟模式
check_value = BigQueryValueCheckOperator(
task_id="check_value",
sql=f"SELECT COUNT(*) FROM {DATASET_NAME}.{TABLE_NAME_1}",
pass_value=2,
use_legacy_sql=False,
location=LOCATION,
deferrable=True,
)
比较一段时间内的指标¶
要检查以 SQL 表达式给出的指标值是否在 days_back
之前的指定容差范围内,您可以使用 BigQueryIntervalCheckOperator
或 BigQueryIntervalCheckAsyncOperator
check_interval = BigQueryIntervalCheckOperator(
task_id="check_interval",
table=f"{DATASET}.{TABLE_1}",
days_back=1,
metrics_thresholds={"COUNT(*)": 1.5},
use_legacy_sql=False,
location=location,
)
您也可以在此操作符中使用可延迟模式
check_interval = BigQueryIntervalCheckOperator(
task_id="check_interval",
table=f"{DATASET_NAME}.{TABLE_NAME_1}",
days_back=1,
metrics_thresholds={"COUNT(*)": 1.5},
use_legacy_sql=False,
location=LOCATION,
deferrable=True,
)
使用预定义测试检查列¶
要检查列是否通过用户可配置的测试,您可以使用 BigQueryColumnCheckOperator
column_check = BigQueryColumnCheckOperator(
task_id="column_check",
table=f"{DATASET}.{TABLE_1}",
column_mapping={"value": {"null_check": {"equal_to": 0}}},
)
检查表级别数据质量¶
要检查表是否通过用户定义的测试,您可以使用 BigQueryTableCheckOperator
table_check = BigQueryTableCheckOperator(
task_id="table_check",
table=f"{DATASET}.{TABLE_1}",
checks={"row_count_check": {"check_statement": "COUNT(*) = 4"}},
)
传感器¶
检查表是否存在¶
要检查表是否存在,您可以定义一个传感器操作符。这允许延迟下游操作符的执行,直到表存在。如果表按日期分片,您可以使用 {{ ds_nodash }}
宏作为表名称后缀。
check_table_exists = BigQueryTableExistenceSensor(
task_id="check_table_exists", project_id=PROJECT_ID, dataset_id=DATASET_NAME, table_id=TABLE_NAME
)
如果您想在传感器运行时释放 worker 插槽,您也可以在此操作符中使用可延迟模式。
check_table_exists_def = BigQueryTableExistenceSensor(
task_id="check_table_exists_def",
project_id=PROJECT_ID,
dataset_id=DATASET_NAME,
table_id=TABLE_NAME,
deferrable=True,
)
check_table_exists_async = BigQueryTableExistenceSensor(
task_id="check_table_exists_async",
project_id=PROJECT_ID,
dataset_id=DATASET_NAME,
table_id=TABLE_NAME,
)
检查表分区是否存在¶
要检查表是否存在并且具有分区,您可以使用 BigQueryTablePartitionExistenceSensor
。
check_table_partition_exists = BigQueryTablePartitionExistenceSensor(
task_id="check_table_partition_exists",
project_id=PROJECT_ID,
dataset_id=DATASET_NAME,
table_id=TABLE_NAME,
partition_id=PARTITION_NAME,
)
对于按 DAY 分区的表,partition_id 参数是 “%Y%m%d” 格式的字符串
如果您想在传感器运行时释放 worker 插槽,您也可以在此操作符中使用可延迟模式。
check_table_partition_exists_def = BigQueryTablePartitionExistenceSensor(
task_id="check_table_partition_exists_def",
project_id=PROJECT_ID,
dataset_id=DATASET_NAME,
table_id=TABLE_NAME,
partition_id=PARTITION_NAME,
deferrable=True,
)
check_table_partition_exists_async = BigQueryTablePartitionExistenceSensor(
task_id="check_table_partition_exists_async",
partition_id=PARTITION_NAME,
project_id=PROJECT_ID,
dataset_id=DATASET_NAME,
table_id=TABLE_NAME,
)