Google API 到 Amazon S3

使用 GoogleApiToS3Operator 传输来向任何支持发现的 Google API 发出请求,并将其响应保存在 Amazon S3 文件中。

先决条件任务

要使用这些操作符,您必须执行以下几项操作

操作符

Google Sheets 到 Amazon S3 传输操作符

此示例从 Google Sheets 加载数据并将其保存到 Amazon S3 文件中。

tests/system/amazon/aws/example_google_api_sheets_to_s3.py

task_google_sheets_values_to_s3 = GoogleApiToS3Operator(
    task_id="google_sheet_data_to_s3",
    google_api_service_name="sheets",
    google_api_service_version="v4",
    google_api_endpoint_path="sheets.spreadsheets.values.get",
    google_api_endpoint_params={"spreadsheetId": GOOGLE_SHEET_ID, "range": GOOGLE_SHEET_RANGE},
    s3_destination_key=f"s3://{s3_bucket}/{s3_key}",
)

您可以在此处找到有关所使用的 Google API 端点的更多信息。

Google Youtube 到 Amazon S3

这是一个更高级的示例 DAG,用于使用 GoogleApiToS3Operator,它使用 xcom 在任务之间传递数据,以检索有关 YouTube 视频的特定信息。

它在给定时间范围 (YOUTUBE_VIDEO_PUBLISHED_AFTER, YOUTUBE_VIDEO_PUBLISHED_BEFORE) 内,在一个 YouTube 频道 (YOUTUBE_CHANNEL_ID) 上搜索最多 50 个视频(由于分页),将响应保存在 Amazon S3 中,并将数据推送到 xcom。

tests/system/amazon/aws/example_google_api_youtube_to_s3.py

video_ids_to_s3 = GoogleApiToS3Operator(
    task_id="video_ids_to_s3",
    google_api_service_name="youtube",
    google_api_service_version="v3",
    google_api_endpoint_path="youtube.search.list",
    gcp_conn_id=conn_id_name,
    google_api_endpoint_params={
        "part": "snippet",
        "channelId": YOUTUBE_CHANNEL_ID,
        "maxResults": 50,
        "publishedAfter": YOUTUBE_VIDEO_PUBLISHED_AFTER,
        "publishedBefore": YOUTUBE_VIDEO_PUBLISHED_BEFORE,
        "type": "video",
        "fields": "items/id/videoId",
    },
    google_api_response_via_xcom="video_ids_response",
    s3_destination_key=f"https://s3.us-west-2.amazonaws.com/{s3_bucket_name}/youtube_search",
    s3_overwrite=True,
)

它将 YouTube ID 传递给下一个请求,然后获取请求的视频的信息 (YOUTUBE_VIDEO_FIELDS),并将它们保存在 Amazon S3 (S3_BUCKET_NAME) 中。

tests/system/amazon/aws/example_google_api_youtube_to_s3.py

video_data_to_s3 = GoogleApiToS3Operator(
    task_id="video_data_to_s3",
    google_api_service_name="youtube",
    google_api_service_version="v3",
    gcp_conn_id=conn_id_name,
    google_api_endpoint_path="youtube.videos.list",
    google_api_endpoint_params={
        "part": YOUTUBE_VIDEO_PARTS,
        "maxResults": 50,
        "fields": YOUTUBE_VIDEO_FIELDS,
    },
    google_api_endpoint_params_via_xcom="video_ids",
    s3_destination_key=f"https://s3.us-west-2.amazonaws.com/{s3_bucket_name}/youtube_videos",
    s3_overwrite=True,
)

此条目是否有帮助?