airflow.example_dags.example_assets¶
演示 Airflow 中 Assets(资产)特性行为的示例 DAG,包括条件调度和基于资产表达式的调度。
使用说明
开启所有 DAG。
asset_produces_1 被安排每天运行。一旦完成,由于其资产更新,会触发多个 DAG。asset_consumes_1 会立即被触发,因为它仅依赖于 asset_produces_1 产生的资产。consume_1_or_2_with_asset_expressions 也将被触发,因为其条件(asset_produces_1 或 asset_produces_2 之一被更新)已由 asset_produces_1 满足。
asset_consumes_1_and_2 在 asset_produces_1 运行后不会被触发,因为它需要 asset_produces_2 的资产,而后者没有调度,必须手动触发。
手动触发 asset_produces_2 后,几个 DAG 将受影响。asset_consumes_1_and_2 应该运行,因为它的两个资产依赖都已满足。consume_1_and_2_with_asset_expressions 将被触发,因为它需要 asset_produces_1 和 asset_produces_2 的资产都已更新。consume_1_or_2_with_asset_expressions 将再次被触发,因为它被设置为当任一资产更新时有条件地运行。
consume_1_or_both_2_and_3_with_asset_expressions 演示了复杂的资产依赖逻辑。如果 asset_produces_1 被更新,或者 asset_produces_2 和 dag3_asset 都被更新,则触发此 DAG。此示例突出了将多个资产的更新与逻辑表达式结合以实现高级调度的能力。
conditional_asset_and_time_based_timetable 说明了基于时间的调度与资产依赖的集成。此 DAG 配置为在 asset_produces_1 和 asset_produces_2 资产都已更新时执行,或根据特定的 cron 计划执行,展示了 Airflow 在处理资产和基于时间的混合触发器方面的多功能性。
DAG asset_consumes_1_never_scheduled 和 asset_consumes_unknown_never_scheduled 不会自动运行,因为它们依赖于未更新或未由任何计划任务产生的资产。