Amazon AppFlow

Amazon AppFlow 是一项完全托管的集成服务,使您能够安全地在 Salesforce、SAP、Zendesk、Slack 和 ServiceNow 等软件即服务 (SaaS) 应用程序与 Amazon S3 和 Amazon Redshift 等 AWS 服务之间安全地传输数据,只需点击几下即可完成。借助 AppFlow,您可以按照您选择的频率以企业规模运行数据流 - 按计划、响应业务事件或按需运行。您可以配置数据转换功能,例如过滤和验证,以在流本身中生成丰富、随时可用的数据,而无需额外的步骤。AppFlow 会自动加密传输中的数据,并允许用户限制数据通过公共互联网流向与 AWS PrivateLink 集成的 SaaS 应用程序,从而减少安全威胁。

前提任务

要使用这些操作符,您必须执行以下操作

通用参数

aws_conn_id

Amazon Web Services 连接 ID 的引用。如果此参数设置为 None,则使用默认的 boto3 行为,不进行连接查找。否则,使用存储在连接中的凭据。默认值:aws_default

region_name

AWS 区域名称。如果此参数设置为 None 或省略,则将使用 AWS 连接额外参数 中的 region_name。否则,使用指定的值而不是连接值。默认值:None

verify

是否验证 SSL 证书。

  • False - 不验证 SSL 证书。

  • path/to/cert/bundle.pem - 要使用的 CA 证书包的文件名。如果要使用与 botocore 使用的 CA 证书包不同的 CA 证书包,则可以指定此参数。

如果此参数设置为 None 或省略,则将使用 AWS 连接额外参数 中的 verify。否则,使用指定的值而不是连接值。默认值:None

botocore_config

提供的字典用于构造 botocore.config.Config。此配置可用于配置 避免限制异常、超时等。

示例,有关参数的更多详细信息,请查看 botocore.config.Config
{
    "signature_version": "unsigned",
    "s3": {
        "us_east_1_regional_endpoint": True,
    },
    "retries": {
      "mode": "standard",
      "max_attempts": 10,
    },
    "connect_timeout": 300,
    "read_timeout": 300,
    "tcp_keepalive": True,
}

如果此参数设置为 None 或省略,则将使用 AWS 连接额外参数 中的 config_kwargs。否则,使用指定的值而不是连接值。默认值:None

注意

指定一个空字典 {} 将覆盖 botocore.config.Config 的连接配置

操作符

运行流

要按原样运行 AppFlow 流,请使用:AppflowRunOperator

tests/system/providers/amazon/aws/example_appflow_run.py[源代码]

run_flow = AppflowRunOperator(
    task_id="run_flow",
    flow_name=flow_name,
)

注意

支持的来源:Salesforce、Zendesk

运行完整流

要运行 AppFlow 流并删除所有过滤器,请使用:AppflowRunFullOperator

tests/system/providers/amazon/aws/example_appflow.py[源代码]

campaign_dump_full = AppflowRunFullOperator(
    task_id="campaign_dump_full",
    source=source_name,
    flow_name=flow_name,
)

注意

支持的来源:Salesforce、Zendesk

每日运行流

要运行 AppFlow 流并过滤每日记录,请使用:AppflowRunDailyOperator

tests/system/providers/amazon/aws/example_appflow.py[源代码]

campaign_dump_daily = AppflowRunDailyOperator(
    task_id="campaign_dump_daily",
    source=source_name,
    flow_name=flow_name,
    source_field="LastModifiedDate",
    filter_date="{{ ds }}",
)

注意

支持的来源:Salesforce

在此之前运行流

要运行 AppFlow 流并过滤未来记录并选择过去的记录,请使用:AppflowRunBeforeOperator

tests/system/providers/amazon/aws/example_appflow.py[源代码]

campaign_dump_before = AppflowRunBeforeOperator(
    task_id="campaign_dump_before",
    source=source_name,
    flow_name=flow_name,
    source_field="LastModifiedDate",
    filter_date="{{ ds }}",
)

注意

支持的来源:Salesforce

在此之后运行流

要运行 AppFlow 流并过滤过去记录并选择未来的记录,请使用:AppflowRunAfterOperator

tests/system/providers/amazon/aws/example_appflow.py[源代码]

campaign_dump_after = AppflowRunAfterOperator(
    task_id="campaign_dump_after",
    source=source_name,
    flow_name=flow_name,
    source_field="LastModifiedDate",
    filter_date="3000-01-01",  # Future date, so no records to dump
)

注意

支持的来源:Salesforce、Zendesk

跳过空运行的任务

要在某些 AppFlow 运行返回零记录时跳过任务,请使用:AppflowRecordsShortCircuitOperator

tests/system/providers/amazon/aws/example_appflow.py[源代码]

campaign_dump_short_circuit = AppflowRecordsShortCircuitOperator(
    task_id="campaign_dump_short_circuit",
    flow_name=flow_name,
    appflow_run_task_id="campaign_dump_after",  # Should shortcircuit, no records expected
)

注意

支持的来源:Salesforce、Zendesk

此条目有帮助吗?