AWS Glue

AWS Glue 是一种无服务器数据集成服务,可轻松发现、准备和组合数据,用于分析、机器学习和应用程序开发。AWS Glue 提供了数据集成所需的所有功能,因此您可以在几分钟而不是几个月内开始分析数据并将其投入使用。

先决条件任务

要使用这些操作符,您必须执行以下几项操作

通用参数

aws_conn_id

引用 Amazon Web Services 连接 ID。如果此参数设置为 None,则使用默认的 boto3 行为,而不进行连接查找。否则,使用存储在连接中的凭据。默认值:aws_default

region_name

AWS 区域名称。如果此参数设置为 None 或省略,则将使用 AWS 连接额外参数 中的 region_name。否则,使用指定的值而不是连接值。默认值:None

verify

是否验证 SSL 证书。

  • False - 不验证 SSL 证书。

  • path/to/cert/bundle.pem - 要使用的 CA 证书捆绑文件的文件名。如果要使用与 botocore 使用的不同的 CA 证书捆绑包,则可以指定此参数。

如果此参数设置为 None 或省略,则将使用 AWS 连接额外参数 中的 verify。否则,使用指定的值而不是连接值。默认值:None

botocore_config

提供的字典用于构造 botocore.config.Config。此配置可用于配置 避免限制异常、超时等。

示例,有关参数的更多详细信息,请查看 botocore.config.Config
{
    "signature_version": "unsigned",
    "s3": {
        "us_east_1_regional_endpoint": True,
    },
    "retries": {
      "mode": "standard",
      "max_attempts": 10,
    },
    "connect_timeout": 300,
    "read_timeout": 300,
    "tcp_keepalive": True,
}

如果此参数设置为 None 或省略,则将使用 AWS 连接额外参数 中的 config_kwargs。否则,使用指定的值而不是连接值。默认值:None

注意

指定一个空字典 {} 将覆盖 botocore.config.Config 的连接配置

操作符

创建 AWS Glue 爬虫

AWS Glue 爬虫允许您轻松地从各种数据源提取数据。要创建新的 AWS Glue 爬虫或运行现有的爬虫,您可以使用 GlueCrawlerOperator

tests/system/amazon/aws/example_glue.py

crawl_s3 = GlueCrawlerOperator(
    task_id="crawl_s3",
    config=glue_crawler_config,
)

注意

config 中包含的 AWS IAM 角色需要访问源数据位置(例如,如果数据存储在 Amazon S3 中,则需要 s3:PutObject 访问权限)以及 AWSGlueServiceRole 策略。有关更多详细信息,请参阅下面的参考部分中的链接。

提交 AWS Glue 作业

要提交新的 AWS Glue 作业,您可以使用 GlueJobOperator

tests/system/amazon/aws/example_glue.py

submit_glue_job = GlueJobOperator(
    task_id="submit_glue_job",
    job_name=glue_job_name,
    script_location=f"s3://{bucket_name}/etl_script.py",
    s3_bucket=bucket_name,
    iam_role_name=role_name,
    create_job_kwargs={"GlueVersion": "3.0", "NumberOfWorkers": 2, "WorkerType": "G.1X"},
)

注意

此处也可以使用与爬虫相同的 AWS IAM 角色,但是它将需要策略来提供对结果数据输出位置的访问权限。

创建 AWS Glue 数据质量

AWS Glue 数据质量允许您衡量和监视数据质量,以便您可以做出良好的业务决策。要创建新的 AWS Glue 数据质量规则集或更新现有的规则集,您可以使用 GlueDataQualityOperator

tests/system/amazon/aws/example_glue_data_quality.py

create_rule_set = GlueDataQualityOperator(
    task_id="create_rule_set",
    name=rule_set_name,
    ruleset=RULE_SET,
    data_quality_ruleset_kwargs={
        "TargetTable": {
            "TableName": athena_table,
            "DatabaseName": athena_database,
        }
    },
)

启动 AWS Glue 数据质量评估运行

要启动 AWS Glue 数据质量规则集评估运行,您可以使用 GlueDataQualityRuleSetEvaluationRunOperator

tests/system/amazon/aws/example_glue_data_quality.py

start_evaluation_run = GlueDataQualityRuleSetEvaluationRunOperator(
    task_id="start_evaluation_run",
    datasource={
        "GlueTable": {
            "TableName": athena_table,
            "DatabaseName": athena_database,
        }
    },
    role=test_context[ROLE_ARN_KEY],
    rule_set_names=[rule_set_name],
)

启动 AWS Glue 数据质量建议运行

要启动 AWS Glue 数据质量规则建议运行,您可以使用 GlueDataQualityRuleRecommendationRunOperator

tests/system/amazon/aws/example_glue_data_quality_with_recommendation.py

recommendation_run = GlueDataQualityRuleRecommendationRunOperator(
    task_id="recommendation_run",
    datasource={
        "GlueTable": {
            "TableName": athena_table,
            "DatabaseName": athena_database,
        }
    },
    role=test_context[ROLE_ARN_KEY],
    recommendation_run_kwargs={"CreatedRulesetName": rule_set_name},
)

传感器

等待 AWS Glue 爬虫状态

要等待 AWS Glue 爬虫执行的状态,直到它达到最终状态,您可以使用 GlueCrawlerSensor

tests/system/amazon/aws/example_glue.py

wait_for_crawl = GlueCrawlerSensor(
    task_id="wait_for_crawl",
    crawler_name=glue_crawler_name,
)

等待 AWS Glue 作业状态

要等待 AWS Glue 作业的状态,直到它达到最终状态,您可以使用 GlueJobSensor

tests/system/amazon/aws/example_glue.py

wait_for_job = GlueJobSensor(
    task_id="wait_for_job",
    job_name=glue_job_name,
    # Job ID extracted from previous Glue Job Operator task
    run_id=submit_glue_job.output,
    verbose=True,  # prints glue job logs in airflow logs
)

等待 AWS Glue 数据质量评估运行

要等待 AWS Glue 数据质量规则集评估运行的状态,直到它达到最终状态,您可以使用 GlueDataQualityRuleSetEvaluationRunSensor

tests/system/amazon/aws/example_glue_data_quality.py

await_evaluation_run_sensor = GlueDataQualityRuleSetEvaluationRunSensor(
    task_id="await_evaluation_run_sensor",
    evaluation_run_id=start_evaluation_run.output,
)

等待 AWS Glue 数据质量建议运行

要等待 AWS Glue 数据质量建议运行的状态,直到它达到最终状态,您可以使用 GlueDataQualityRuleRecommendationRunSensor

tests/system/amazon/aws/example_glue_data_quality_with_recommendation.py

await_recommendation_run_sensor = GlueDataQualityRuleRecommendationRunSensor(
    task_id="await_recommendation_run_sensor",
    recommendation_run_id=recommendation_run.output,
)

等待 AWS Glue 目录分区

要等待分区在 AWS Glue 目录中显示,直到它达到最终状态,您可以使用 GlueCatalogPartitionSensor

tests/system/amazon/aws/example_glue.py

wait_for_catalog_partition = GlueCatalogPartitionSensor(
    task_id="wait_for_catalog_partition",
    table_name="input",
    database_name=glue_db_name,
    expression="category='mixed'",
)

此条目是否有帮助?