Google Cloud BigQuery 数据传输服务操作符¶
BigQuery 数据传输服务自动将数据从 SaaS 应用程序移至 Google BigQuery,并按计划进行管理。您的分析团队无需编写任何代码,即可为数据仓库奠定基础。BigQuery 数据传输服务最初支持 Google 应用程序源,例如 Google Ads、Campaign Manager、Google Ad Manager 和 YouTube。通过 BigQuery 数据传输服务,用户还可以访问数据连接器,以便轻松地将数据从 Teradata 和 Amazon S3 传输到 BigQuery。
先决任务¶
要使用这些操作符,您必须执行以下操作
使用Cloud 控制台选择或创建 Cloud Platform 项目。
为您的项目启用帐单,如Google Cloud 文档中所述。
启用 API,如Cloud 控制台文档中所述。
通过pip安装 API 库。
pip install 'apache-airflow[google]'有关安装的详细信息,请访问此处。
创建传输配置¶
要创建 DTS 传输配置,可以使用 BigQueryCreateDataTransferOperator
。
对于 Airflow,客户需要创建一个传输配置,其中自动调度处于禁用状态,然后使用专门的 Airflow 运算符触发传输运行,该运算符将调用 StartManualTransferRuns API,例如 BigQueryDataTransferServiceStartTransferRunsOperator
。 BigQueryCreateDataTransferOperator
检查传递的配置中是否存在自动调度选项。如果存在,则不执行任何操作,否则将其值设置为 True
。
# In the case of Airflow, the customer needs to create a transfer
# config with the automatic scheduling disabled and then trigger
# a transfer run using a specialized Airflow operator
TRANSFER_CONFIG = {
"destination_dataset_id": DATASET_NAME,
"display_name": "test data transfer",
"data_source_id": "google_cloud_storage",
"schedule_options": {"disable_auto_scheduling": True},
"params": {
"field_delimiter": ",",
"max_bad_records": "0",
"skip_leading_rows": "1",
"data_path_template": BUCKET_URI,
"destination_table_name_template": DTS_BQ_TABLE,
"file_format": "CSV",
},
}
您可以使用或不使用项目 ID 来创建运算符。如果缺少项目 ID,它将从使用的 Google Cloud 连接中检索。运算符的基本用法
gcp_bigquery_create_transfer = BigQueryCreateDataTransferOperator(
transfer_config=TRANSFER_CONFIG,
project_id=PROJECT_ID,
task_id="gcp_bigquery_create_transfer",
)
transfer_config_id = cast(str, XComArg(gcp_bigquery_create_transfer, key="transfer_config_id"))
您可以将 Jinja 模板 与 transfer_config
、project_id
、authorization_code
、gcp_conn_id
、impersonation_chain
参数结合使用,这些参数允许您动态确定值。结果将保存到 XCom,这允许其他运算符使用该结果。此外,新配置的 ID 在 XCom 中可通过 transfer_config_id
键访问。
删除传输配置¶
要删除 DTS 传输配置,您可以使用 BigQueryDeleteDataTransferConfigOperator
。
运算符的基本用法
gcp_bigquery_delete_transfer = BigQueryDeleteDataTransferConfigOperator(
transfer_config_id=transfer_config_id, task_id="gcp_bigquery_delete_transfer"
)
您可以将 Jinja 模板 与 transfer_config
、project_id
、authorization_code
、gcp_conn_id
、impersonation_chain
参数结合使用,这些参数允许您动态确定值。
手动启动传输运行¶
启动手动传输运行,现在执行,计划时间等于当前时间。 BigQueryDataTransferServiceStartTransferRunsOperator
。
运算符的基本用法
gcp_bigquery_start_transfer = BigQueryDataTransferServiceStartTransferRunsOperator(
task_id="gcp_bigquery_start_transfer",
project_id=PROJECT_ID,
transfer_config_id=transfer_config_id,
requested_run_time={"seconds": int(time.time() + 60)},
)
你可以将 Jinja 模板 与 transfer_config_id
、project_id
、requested_time_range
、requested_run_time
、gcp_conn_id
、impersonation_chain
参数结合使用,这些参数允许你动态确定值。
要检查操作是否成功,你可以使用 BigQueryDataTransferServiceTransferRunSensor
。
gcp_run_sensor = BigQueryDataTransferServiceTransferRunSensor(
task_id="gcp_run_sensor",
transfer_config_id=transfer_config_id,
run_id=cast(str, XComArg(gcp_bigquery_start_transfer, key="run_id")),
expected_statuses={"SUCCEEDED"},
)
你可以将 Jinja 模板 与 run_id
、transfer_config_id
、expected_statuses
、project_id
、impersonation_chain
参数结合使用,这些参数允许你动态确定值。