Amazon AppFlow¶
Amazon AppFlow 是一项完全托管的集成服务,让您只需点击几下即可安全地在 Salesforce、SAP、Zendesk、Slack 和 ServiceNow 等软件即服务 (SaaS) 应用程序与 Amazon S3 和 Amazon Redshift 等 AWS 服务之间传输数据。使用 AppFlow,您可以按照您选择的频率(按计划、响应业务事件或按需)以企业级规模运行数据流。您可以配置数据转换功能,如过滤和验证,以在流本身中生成丰富、即时可用的数据,无需额外步骤。AppFlow 自动对传输中的数据进行加密,并允许用户限制与 AWS PrivateLink 集成的 SaaS 应用程序数据通过公共互联网流动,从而减少安全威胁风险。
先决条件任务¶
要使用这些 Operators,您需要做一些准备工作
通过 pip 安装 API 库。
pip install 'apache-airflow[amazon]'详细信息请参阅 Airflow® 安装
设置连接.
通用参数¶
- aws_conn_id
引用 Amazon Web Services 连接 ID。如果此参数设置为
None
,则使用默认的 boto3 行为,不进行连接查找。否则,使用存储在连接中的凭据。默认值:aws_default
- region_name
AWS 区域名称。如果此参数设置为
None
或省略,则使用 AWS 连接 Extra Parameter 中的 region_name。否则,使用指定的值而不是连接中的值。默认值:None
- verify
是否验证 SSL 证书。
False
- 不验证 SSL 证书。path/to/cert/bundle.pem - 要使用的 CA 证书包文件名。如果要使用与 botocore 使用的 CA 证书包不同的证书包,可以指定此参数。
如果此参数设置为
None
或省略,则使用 AWS 连接 Extra Parameter 中的 verify。否则,使用指定的值而不是连接中的值。默认值:None
- botocore_config
提供的字典用于构建 botocore.config.Config。此配置可用于配置 避免节流异常、超时等。
示例,有关参数的更多详细信息,请参阅 botocore.config.Config¶{ "signature_version": "unsigned", "s3": { "us_east_1_regional_endpoint": True, }, "retries": { "mode": "standard", "max_attempts": 10, }, "connect_timeout": 300, "read_timeout": 300, "tcp_keepalive": True, }
如果此参数设置为
None
或省略,则使用 AWS 连接 Extra Parameter 中的 config_kwargs。否则,使用指定的值而不是连接中的值。默认值:None
注意
指定一个空字典
{}
将覆盖 botocore.config.Config 的连接配置
Operators¶
运行流¶
要按原样运行 AppFlow 流,请使用:AppflowRunOperator
。
tests/system/amazon/aws/example_appflow_run.py
run_flow = AppflowRunOperator(
task_id="run_flow",
flow_name=flow_name,
)
注意
支持的来源:Salesforce, Zendesk
完整运行流¶
要运行一个移除所有过滤器的 AppFlow 流,请使用:AppflowRunFullOperator
。
tests/system/amazon/aws/example_appflow.py
campaign_dump_full = AppflowRunFullOperator(
task_id="campaign_dump_full",
source=source_name,
flow_name=flow_name,
)
注意
支持的来源:Salesforce, Zendesk
每日运行流¶
要运行一个过滤每日记录的 AppFlow 流,请使用:AppflowRunDailyOperator
。
tests/system/amazon/aws/example_appflow.py
campaign_dump_daily = AppflowRunDailyOperator(
task_id="campaign_dump_daily",
source=source_name,
flow_name=flow_name,
source_field="LastModifiedDate",
filter_date="{{ ds }}",
)
注意
支持的来源:Salesforce
运行指定时间点之前的流¶
要运行一个过滤掉未来记录并选择过去记录的 AppFlow 流,请使用:AppflowRunBeforeOperator
。
tests/system/amazon/aws/example_appflow.py
campaign_dump_before = AppflowRunBeforeOperator(
task_id="campaign_dump_before",
source=source_name,
flow_name=flow_name,
source_field="LastModifiedDate",
filter_date="{{ ds }}",
)
注意
支持的来源:Salesforce
运行指定时间点之后的流¶
要运行一个过滤掉过去记录并选择未来记录的 AppFlow 流,请使用:AppflowRunAfterOperator
。
tests/system/amazon/aws/example_appflow.py
campaign_dump_after = AppflowRunAfterOperator(
task_id="campaign_dump_after",
source=source_name,
flow_name=flow_name,
source_field="LastModifiedDate",
filter_date="3000-01-01", # Future date, so no records to dump
)
注意
支持的来源:Salesforce, Zendesk
跳过空运行的任务¶
当某些 AppFlow 运行返回零记录时跳过任务,请使用:AppflowRecordsShortCircuitOperator
。
tests/system/amazon/aws/example_appflow.py
campaign_dump_short_circuit = AppflowRecordsShortCircuitOperator(
task_id="campaign_dump_short_circuit",
flow_name=flow_name,
appflow_run_task_id="campaign_dump_after", # Should shortcircuit, no records expected
)
注意
支持的来源:Salesforce, Zendesk