Amazon Managed Service for Apache Flink¶
Amazon Managed Service for Apache Flink 是一个全托管服务,可用于使用 Java、Python、SQL 或 Scala 处理和分析流数据。该服务使您能够针对流数据源快速编写和运行 Java、SQL 或 Scala 代码,以执行时间序列分析、为实时仪表盘提供数据以及创建实时指标。
先决条件任务¶
要使用这些操作符,您必须完成以下几项操作
通过 pip 安装 API 库。
pip install 'apache-airflow[amazon]'详细信息请参阅 Airflow® 安装
设置连接.
通用参数¶
- aws_conn_id
- 引用 Amazon Web Services 连接 ID。如果此参数设置为 - None,则使用默认的 boto3 行为,不进行连接查找。否则,使用连接中存储的凭据。默认值:- aws_default
- region_name
- AWS 区域名称。如果此参数设置为 - None或省略,则使用 AWS 连接额外参数 中的 region_name。否则,使用指定的值而不是连接中的值。默认值:- None
- verify
- 是否验证 SSL 证书。 - False- 不验证 SSL 证书。
- path/to/cert/bundle.pem - 要使用的 CA 证书捆绑包的文件名。如果您想使用与 botocore 使用的不同的 CA 证书捆绑包,可以指定此参数。 
 - 如果此参数设置为 - None或省略,则使用 AWS 连接额外参数 中的 verify。否则,使用指定的值而不是连接中的值。默认值:- None
- botocore_config
- 提供的字典用于构建 botocore.config.Config。此配置可用于配置 避免节流异常、超时等。 示例:有关参数的更多详细信息,请参阅 botocore.config.Config¶- { "signature_version": "unsigned", "s3": { "us_east_1_regional_endpoint": True, }, "retries": { "mode": "standard", "max_attempts": 10, }, "connect_timeout": 300, "read_timeout": 300, "tcp_keepalive": True, } - 如果此参数设置为 - None或省略,则使用 AWS 连接额外参数 中的 config_kwargs。否则,使用指定的值而不是连接中的值。默认值:- None- 注意 - 指定一个空字典, - {},将覆盖 botocore.config.Config 的连接配置
操作符¶
创建 Amazon Managed Service for Apache Flink 应用程序¶
要创建 Amazon Managed Service for Apache Flink 应用程序,可以使用 KinesisAnalyticsV2CreateApplicationOperator。
tests/system/amazon/aws/example_kinesis_analytics.py
create_application = KinesisAnalyticsV2CreateApplicationOperator(
    task_id="create_application",
    application_name=application_name,
    runtime_environment="FLINK-1_18",
    service_execution_role=test_context[ROLE_ARN_KEY],
    create_application_kwargs={
        "ApplicationConfiguration": {
            "FlinkApplicationConfiguration": {
                "ParallelismConfiguration": {
                    "ConfigurationType": "CUSTOM",
                    "Parallelism": 2,
                    "ParallelismPerKPU": 1,
                    "AutoScalingEnabled": False,
                }
            },
            "EnvironmentProperties": {
                "PropertyGroups": [
                    {
                        "PropertyGroupId": "BlueprintMetadata",
                        "PropertyMap": {
                            "AWSRegion": region_name,
                            "BlueprintName": "KDS_FLINK-DATASTREAM-JAVA_S3",
                            "BucketName": f"s3://{bucket_name}/",
                            "PartitionFormat": "yyyy-MM-dd-HH",
                            "StreamInitialPosition": "TRIM_HORIZON",
                            "StreamName": stream_name,
                        },
                    },
                ]
            },
            "ApplicationCodeConfiguration": {
                "CodeContent": {
                    "S3ContentLocation": {
                        "BucketARN": f"arn:aws:s3:::{bucket_name}",
                        "FileKey": "code/kds-to-s3-datastream-java-1.0.1.jar",
                    },
                },
                "CodeContentType": "ZIPFILE",
            },
        }
    },
)
启动 Amazon Managed Service for Apache Flink 应用程序¶
要启动 Amazon Managed Service for Apache Flink 应用程序,可以使用 KinesisAnalyticsV2StartApplicationOperator。
tests/system/amazon/aws/example_kinesis_analytics.py
start_application = KinesisAnalyticsV2StartApplicationOperator(
    task_id="start_application",
    application_name=application_name,
)
停止 Amazon Managed Service for Apache Flink 应用程序¶
要停止 Amazon Managed Service for Apache Flink 应用程序,可以使用 KinesisAnalyticsV2StopApplicationOperator。
tests/system/amazon/aws/example_kinesis_analytics.py
stop_application = KinesisAnalyticsV2StopApplicationOperator(
    task_id="stop_application",
    application_name=application_name,
)
传感器¶
等待 Amazon Managed Service for Apache Flink 应用程序启动¶
要等待 Amazon Managed Service for Apache Flink 应用程序启动的状态,可以使用 KinesisAnalyticsV2StartApplicationCompletedSensor。
tests/system/amazon/aws/example_kinesis_analytics.py
await_start_application = KinesisAnalyticsV2StartApplicationCompletedSensor(
    task_id="await_start_application",
    application_name=application_name,
)
等待 Amazon Managed Service for Apache Flink 应用程序停止¶
要等待 Amazon Managed Service for Apache Flink 应用程序停止的状态,可以使用 KinesisAnalyticsV2StopApplicationCompletedSensor。
tests/system/amazon/aws/example_kinesis_analytics.py
await_stop_application = KinesisAnalyticsV2StopApplicationCompletedSensor(
    task_id="await_stop_application",
    application_name=application_name,
)