Google Cloud BigQuery 算子¶
BigQuery 是 Google 的全托管、PB 级规模、低成本分析数据仓库。它是一种无需数据库管理员的无服务器软件即服务 (SaaS)。它允许用户专注于使用熟悉的 SQL 分析数据以找出有意义的见解。
Airflow 提供算子来管理数据集和表格、运行查询和验证数据。
先决任务¶
要使用这些算子,您必须执行以下操作
使用 Cloud 控制台 选择或创建 Cloud Platform 项目。
为您的项目启用结算,如 Google Cloud 文档 中所述。
启用 API,如 Cloud 控制台文档 中所述。
通过 pip 安装 API 库。
pip install 'apache-airflow[google]'详细的信息可用于 安装。
管理数据集¶
创建数据集¶
要在 BigQuery 数据库中创建空数据集,可以使用 BigQueryCreateEmptyDatasetOperator
。
create_dataset = BigQueryCreateEmptyDatasetOperator(task_id="create_dataset", dataset_id=DATASET_NAME)
获取数据集详细信息¶
要获取现有数据集的详细信息,可以使用 BigQueryGetDatasetOperator
。
此运算符返回 数据集资源。
get_dataset = BigQueryGetDatasetOperator(task_id="get-dataset", dataset_id=DATASET_NAME)
列出数据集中的表格¶
要检索给定数据集中表格的列表,请使用 BigQueryGetDatasetTablesOperator
。
get_dataset_tables = BigQueryGetDatasetTablesOperator(
task_id="get_dataset_tables", dataset_id=DATASET_NAME
)
更新表格¶
要在 BigQuery 中更新表格,可以使用 BigQueryUpdateTableOperator
。
更新方法会替换整个 Table 资源,而修补程序方法只会替换提交的 Table 资源中提供的字段。
update_table = BigQueryUpdateTableOperator(
task_id="update_table",
dataset_id=DATASET_NAME,
table_id="test_table",
fields=["friendlyName", "description"],
table_resource={
"friendlyName": "Updated Table",
"description": "Updated Table",
},
)
更新数据集¶
要在 BigQuery 中更新数据集,可以使用 BigQueryUpdateDatasetOperator
。
更新方法会替换整个数据集资源,而修补程序方法只会替换提交的数据集资源中提供的字段。
update_dataset = BigQueryUpdateDatasetOperator(
task_id="update_dataset",
dataset_id=DATASET_NAME,
dataset_resource={"description": "Updated dataset"},
)
删除数据集¶
要从 BigQuery 数据库中删除现有数据集,可以使用 BigQueryDeleteDatasetOperator
。
delete_dataset = BigQueryDeleteDatasetOperator(
task_id="delete_dataset", dataset_id=DATASET_NAME, delete_contents=True
)
管理表格¶
创建原生表格¶
要在给定的 BigQuery 数据集中创建一个新的空表格(可选,带有架构),可以使用 BigQueryCreateEmptyTableOperator
。
用于 BigQuery 表格的架构可以用两种方式指定。你可以直接传入架构字段,也可以将操作员指向 Google Cloud Storage 对象名称。Google Cloud Storage 中的对象必须是包含架构字段的 JSON 文件。
create_table = BigQueryCreateEmptyTableOperator(
task_id="create_table",
dataset_id=DATASET_NAME,
table_id="test_table",
schema_fields=[
{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
{"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
],
)
你可以使用此操作员在现有表格上创建视图。
create_view = BigQueryCreateEmptyTableOperator(
task_id="create_view",
dataset_id=DATASET_NAME,
table_id="test_view",
view={
"query": f"SELECT * FROM `{PROJECT_ID}.{DATASET_NAME}.test_table`",
"useLegacySql": False,
},
)
你还可以使用此操作员创建物化视图,定期缓存查询结果,以提高性能和效率。
create_materialized_view = BigQueryCreateEmptyTableOperator(
task_id="create_materialized_view",
dataset_id=DATASET_NAME,
table_id="test_materialized_view",
materialized_view={
"query": f"SELECT SUM(salary) AS sum_salary FROM `{PROJECT_ID}.{DATASET_NAME}.test_table`",
"enableRefresh": True,
"refreshIntervalMs": 2000000,
},
)
创建外部表格¶
要在数据集中使用 Google Cloud Storage 中的数据创建新的外部表格,可以使用 BigQueryCreateExternalTableOperator
。
类似于 BigQueryCreateEmptyTableOperator
,你可以直接传入架构字段。
create_external_table = BigQueryCreateExternalTableOperator(
task_id="create_external_table",
destination_project_dataset_table=f"{DATASET_NAME}.external_table",
bucket=DATA_SAMPLE_GCS_BUCKET_NAME,
source_objects=[DATA_SAMPLE_GCS_OBJECT_NAME],
schema_fields=[
{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
{"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
],
)
或者,你可以将操作员指向存储架构的 Google Cloud Storage 对象名称。
update_table_schema_json = BigQueryCreateEmptyTableOperator(
task_id="update_table_schema_json",
dataset_id=DATASET_NAME,
table_id="test_table",
gcs_schema_object=GCS_PATH_TO_SCHEMA_JSON,
)
要使用 BigQuery 架构自动检测,请设置 autodetect
标志,而不是提供明确的架构信息。
从表中获取数据¶
要从 BigQuery 表中获取数据,你可以使用 BigQueryGetDataOperator
。或者,如果你将字段传递给 selected_fields
,则可以获取所选列的数据。
此操作员的结果可以根据 as_dict
参数的值以两种不同的格式检索:False
(默认) - Python 列表的列表,其中嵌套列表中的元素数量将等于获取的行数。嵌套中的每个元素都将是嵌套列表,其中元素将表示该行的列值。 True
- Python 字典列表,其中每个字典都表示一行。在每个字典中,键是列名,值是这些列的相应值。
get_data = BigQueryGetDataOperator(
task_id="get_data",
dataset_id=DATASET,
table_id=TABLE_1,
max_results=10,
selected_fields="value,name",
location=location,
)
以下示例展示了如何在异步(可延迟)模式下使用 BigQueryGetDataOperator
。请注意,可延迟任务要求触发器在你的 Airflow 部署上运行。
get_data = BigQueryGetDataOperator(
task_id="get_data",
dataset_id=DATASET_NAME,
table_id=TABLE_NAME_1,
use_legacy_sql=False,
max_results=10,
selected_fields="value",
location=LOCATION,
deferrable=True,
)
插入或更新表¶
若要插入或更新表,可以使用 BigQueryUpsertTableOperator
。
此操作员会更新现有表或在给定数据集中创建一个新的空表。
upsert_table = BigQueryUpsertTableOperator(
task_id="upsert_table",
dataset_id=DATASET_NAME,
table_resource={
"tableReference": {"tableId": "test_table_id"},
"expirationTime": (int(time.time()) + 300) * 1000,
},
)
更新表架构¶
若要更新表的架构,可以使用 BigQueryUpdateTableSchemaOperator
。
此操作员会更新提供的架构字段值,同时保持其余字段不变。例如,这对于在现有表架构上设置新的字段描述很有用。
update_table_schema = BigQueryUpdateTableSchemaOperator(
task_id="update_table_schema",
dataset_id=DATASET_NAME,
table_id="test_table",
schema_fields_updates=[
{"name": "emp_name", "description": "Name of employee"},
{"name": "salary", "description": "Monthly salary in USD"},
],
)
删除表¶
若要删除现有表,可以使用 BigQueryDeleteTableOperator
。
delete_table = BigQueryDeleteTableOperator(
task_id="delete_table",
deletion_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.test_table",
)
您还可以使用此操作员来删除视图。
delete_view = BigQueryDeleteTableOperator(
task_id="delete_view",
deletion_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.test_view",
)
您还可以使用此操作员来删除物化视图。
delete_materialized_view = BigQueryDeleteTableOperator(
task_id="delete_materialized_view",
deletion_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.test_materialized_view",
)
执行 BigQuery 作业¶
假设您要执行以下查询。
INSERT_ROWS_QUERY = (
f"INSERT {DATASET}.{TABLE_1} VALUES "
f"(42, 'monty python', '{INSERT_DATE}'), "
f"(42, 'fishy fish', '{INSERT_DATE}');"
)
若要在特定 BigQuery 数据库中执行 SQL 查询,可以使用 BigQueryInsertJobOperator
,其中包含可以进行 Jinja 模板化的正确查询作业配置。
insert_query_job = BigQueryInsertJobOperator(
task_id="insert_query_job",
configuration={
"query": {
"query": INSERT_ROWS_QUERY,
"useLegacySql": False,
"priority": "BATCH",
}
},
location=location,
)
以下示例展示了如何在异步(可延迟)模式下使用 BigQueryInsertJobOperator
。请注意,可延迟任务要求在你的 Airflow 部署上运行触发器。
insert_query_job = BigQueryInsertJobOperator(
task_id="insert_query_job",
configuration={
"query": {
"query": INSERT_ROWS_QUERY,
"useLegacySql": False,
"priority": "BATCH",
}
},
location=LOCATION,
deferrable=True,
)
有关 BigQuery 作业类型的更多信息,请查看 文档。
如果你想在你的配置中包含一些文件,你可以使用 Jinja 模板语言的 include
子句,如下所示
select_query_job = BigQueryInsertJobOperator(
task_id="select_query_job",
configuration={
"query": {
"query": "{% include QUERY_SQL_PATH %}",
"useLegacySql": False,
}
},
location=location,
)
包含的文件也可以使用 Jinja 模板,这在 .sql
文件的情况下可能很有用。
此外,你可以使用 BigQueryInsertJobOperator
的 job_id
参数来提高幂等性。如果未传递此参数,则会使用 uuid 作为 job_id
。如果提供了,则运算符将尝试使用此 job_id`
提交一个新作业。如果已经存在具有此类 job_id
的作业,则它将重新附加到现有作业。
此外,对于所有这些操作,你可以在可延迟模式下使用运算符
insert_query_job = BigQueryInsertJobOperator(
task_id="insert_query_job",
configuration={
"query": {
"query": INSERT_ROWS_QUERY,
"useLegacySql": False,
"priority": "BATCH",
}
},
location=LOCATION,
deferrable=True,
)
验证数据¶
检查查询结果是否包含数据¶
要针对 BigQuery 执行检查,你可以使用 BigQueryCheckOperator
此运算符需要一个将返回单行的 SQL 查询。该第一行上的每个值都使用 Python bool
转换进行评估。如果任何值返回 False
,则检查失败并出错。
check_count = BigQueryCheckOperator(
task_id="check_count",
sql=f"SELECT COUNT(*) FROM {DATASET}.{TABLE_1}",
use_legacy_sql=False,
location=location,
)
你也可以在此运算符中使用可延迟模式
check_count = BigQueryCheckOperator(
task_id="check_count",
sql=f"SELECT COUNT(*) FROM {DATASET_NAME}.{TABLE_NAME_1}",
use_legacy_sql=False,
location=LOCATION,
deferrable=True,
)
将查询结果与通过值进行比较¶
要使用 SQL 代码执行简单值检查,你可以使用 BigQueryValueCheckOperator
这些运算符期望返回单行的 SQL 查询。第一行中的每个值都会针对 pass_value
进行评估,后者可以是字符串或数字值。如果是数字,还可以指定 tolerance
。
check_value = BigQueryValueCheckOperator(
task_id="check_value",
sql=f"SELECT COUNT(*) FROM {DATASET}.{TABLE_1}",
pass_value=4,
use_legacy_sql=False,
location=location,
)
你也可以在此运算符中使用可延迟模式
check_value = BigQueryValueCheckOperator(
task_id="check_value",
sql=f"SELECT COUNT(*) FROM {DATASET_NAME}.{TABLE_NAME_1}",
pass_value=2,
use_legacy_sql=False,
location=LOCATION,
deferrable=True,
)
比较一段时间内的指标¶
要检查作为 SQL 表达式给出的指标的值是否在 days_back
之前的某个容差范围内,可以使用 BigQueryIntervalCheckOperator
或 BigQueryIntervalCheckAsyncOperator
check_interval = BigQueryIntervalCheckOperator(
task_id="check_interval",
table=f"{DATASET}.{TABLE_1}",
days_back=1,
metrics_thresholds={"COUNT(*)": 1.5},
use_legacy_sql=False,
location=location,
)
你也可以在此运算符中使用可延迟模式
check_interval = BigQueryIntervalCheckOperator(
task_id="check_interval",
table=f"{DATASET_NAME}.{TABLE_NAME_1}",
days_back=1,
metrics_thresholds={"COUNT(*)": 1.5},
use_legacy_sql=False,
location=LOCATION,
deferrable=True,
)
使用预定义测试检查列¶
要检查列是否通过用户可配置的测试,可以使用 BigQueryColumnCheckOperator
column_check = BigQueryColumnCheckOperator(
task_id="column_check",
table=f"{DATASET}.{TABLE_1}",
column_mapping={"value": {"null_check": {"equal_to": 0}}},
)
检查表级别的数据质量¶
要检查表是否通过用户定义的测试,可以使用 BigQueryTableCheckOperator
table_check = BigQueryTableCheckOperator(
task_id="table_check",
table=f"{DATASET}.{TABLE_1}",
checks={"row_count_check": {"check_statement": "COUNT(*) = 4"}},
)
传感器¶
检查表是否存在¶
要检查表是否存在,可以定义传感器运算符。这允许延迟执行下游运算符,直到表存在。如果表按日期分片,则可以将 {{ ds_nodash }}
宏用作表名后缀。
check_table_exists = BigQueryTableExistenceSensor(
task_id="check_table_exists", project_id=PROJECT_ID, dataset_id=DATASET_NAME, table_id=TABLE_NAME
)
如果您希望在传感器运行时释放工作槽,也可以在此运算符中使用可延迟模式。
check_table_exists_def = BigQueryTableExistenceSensor(
task_id="check_table_exists_def",
project_id=PROJECT_ID,
dataset_id=DATASET_NAME,
table_id=TABLE_NAME,
deferrable=True,
)
check_table_exists_async = BigQueryTableExistenceSensor(
task_id="check_table_exists_async",
project_id=PROJECT_ID,
dataset_id=DATASET_NAME,
table_id=TABLE_NAME,
)
检查表分区是否存在¶
要检查表是否存在并具有分区,可以使用 BigQueryTablePartitionExistenceSensor
。
check_table_partition_exists = BigQueryTablePartitionExistenceSensor(
task_id="check_table_partition_exists",
project_id=PROJECT_ID,
dataset_id=DATASET_NAME,
table_id=TABLE_NAME,
partition_id=PARTITION_NAME,
)
对于按天分区的表,partition_id 参数是“%Y%m%d”格式的字符串
如果您希望在传感器运行时释放工作槽,也可以在此运算符中使用可延迟模式。
check_table_partition_exists_def = BigQueryTablePartitionExistenceSensor(
task_id="check_table_partition_exists_def",
project_id=PROJECT_ID,
dataset_id=DATASET_NAME,
table_id=TABLE_NAME,
partition_id=PARTITION_NAME,
deferrable=True,
)
check_table_partition_exists_async = BigQueryTablePartitionExistenceSensor(
task_id="check_table_partition_exists_async",
partition_id=PARTITION_NAME,
project_id=PROJECT_ID,
dataset_id=DATASET_NAME,
table_id=TABLE_NAME,
)