AWS Glue¶
AWS Glue 是一种无服务器数据集成服务,可以轻松地发现、准备和组合数据,用于分析、机器学习和应用程序开发。 AWS Glue 提供了数据集成所需的所有功能,因此您可以在几分钟内(而不是几个月)开始分析数据并将其投入使用。
先决条件任务¶
要使用这些操作器,您需要执行以下操作
通过 pip 安装 API 库。
pip install 'apache-airflow[amazon]'详细信息请参阅 Airflow™ 的安装
设置连接.
操作器¶
创建 AWS Glue 爬网程序¶
AWS Glue 爬网程序允许您轻松地从各种数据源中提取数据。 要创建新的 AWS Glue 爬网程序或运行现有的爬网程序,您可以使用 GlueCrawlerOperator
。
crawl_s3 = GlueCrawlerOperator(
task_id="crawl_s3",
config=glue_crawler_config,
)
注意
config
中包含的 AWS IAM 角色需要访问源数据位置(例如,如果数据存储在 Amazon S3 中,则需要 s3:PutObject 访问权限)以及 AWSGlueServiceRole
策略。 有关更多详细信息的链接,请参阅下面的“参考”部分。
提交 AWS Glue 作业¶
要提交新的 AWS Glue 作业,您可以使用 GlueJobOperator
。
submit_glue_job = GlueJobOperator(
task_id="submit_glue_job",
job_name=glue_job_name,
script_location=f"s3://{bucket_name}/etl_script.py",
s3_bucket=bucket_name,
iam_role_name=role_name,
create_job_kwargs={"GlueVersion": "3.0", "NumberOfWorkers": 2, "WorkerType": "G.1X"},
)
注意
用于爬网程序的相同 AWS IAM 角色也可以在此处使用,但它需要策略来提供对结果数据输出位置的访问权限。
创建 AWS Glue 数据质量¶
AWS Glue 数据质量允许您测量和监控数据的质量,以便您做出良好的业务决策。 要创建新的 AWS Glue 数据质量规则集或更新现有的规则集,您可以使用 GlueDataQualityOperator
。
create_rule_set = GlueDataQualityOperator(
task_id="create_rule_set",
name=rule_set_name,
ruleset=RULE_SET,
data_quality_ruleset_kwargs={
"TargetTable": {
"TableName": athena_table,
"DatabaseName": athena_database,
}
},
)
启动 AWS Glue 数据质量评估运行¶
要启动 AWS Glue 数据质量规则集评估运行,您可以使用 GlueDataQualityRuleSetEvaluationRunOperator
。
start_evaluation_run = GlueDataQualityRuleSetEvaluationRunOperator(
task_id="start_evaluation_run",
datasource={
"GlueTable": {
"TableName": athena_table,
"DatabaseName": athena_database,
}
},
role=test_context[ROLE_ARN_KEY],
rule_set_names=[rule_set_name],
)
启动 AWS Glue 数据质量建议运行¶
要启动 AWS Glue 数据质量规则建议运行,您可以使用 GlueDataQualityRuleRecommendationRunOperator
。
recommendation_run = GlueDataQualityRuleRecommendationRunOperator(
task_id="recommendation_run",
datasource={
"GlueTable": {
"TableName": athena_table,
"DatabaseName": athena_database,
}
},
role=test_context[ROLE_ARN_KEY],
recommendation_run_kwargs={"CreatedRulesetName": rule_set_name},
)
传感器¶
等待 AWS Glue 爬网程序状态¶
要等待 AWS Glue 爬网程序执行的状态,直到它达到终端状态,您可以使用 GlueCrawlerSensor
。
wait_for_crawl = GlueCrawlerSensor(
task_id="wait_for_crawl",
crawler_name=glue_crawler_name,
)
等待 AWS Glue 作业状态¶
要等待 AWS Glue 作业的状态,直到它达到终端状态,您可以使用 GlueJobSensor
wait_for_job = GlueJobSensor(
task_id="wait_for_job",
job_name=glue_job_name,
# Job ID extracted from previous Glue Job Operator task
run_id=submit_glue_job.output,
verbose=True, # prints glue job logs in airflow logs
)
等待 AWS Glue 数据质量评估运行¶
要等待 AWS Glue 数据质量规则集评估运行的状态,直到它达到终端状态,您可以使用 GlueDataQualityRuleSetEvaluationRunSensor
await_evaluation_run_sensor = GlueDataQualityRuleSetEvaluationRunSensor(
task_id="await_evaluation_run_sensor",
evaluation_run_id=start_evaluation_run.output,
)
等待 AWS Glue 数据质量建议运行¶
要等待 AWS Glue 数据质量建议运行的状态,直到它达到终端状态,您可以使用 GlueDataQualityRuleRecommendationRunSensor
await_recommendation_run_sensor = GlueDataQualityRuleRecommendationRunSensor(
task_id="await_recommendation_run_sensor",
recommendation_run_id=recommendation_run.output,
)