BranchDateTimeOperator

使用 BranchDateTimeOperator 根据时间是否落在两个目标参数给定的范围内,分支到两个执行路径之一,

此运算符有两种模式。第一种模式是使用当前时间(DAG 执行时的机器时钟时间),第二种模式是使用它运行的 DAG 运行的 logical_date

使用当前时间

上述用法在某些情况下可能很有用 - 例如,当 DAG 用于执行清理和维护并且实际上不应用于任何应该回填的 DAG 时,因为“当前时间”使回填非幂等,其结果取决于实际运行 DAG 的时间。即使它是按计划运行的,它也可能略有不确定性。从 DAGRun 的调度到执行之间可能需要一些时间,这可能意味着即使 DAGRun 已正确调度,用于分支决策的实际时间也将与调度时间不同,并且分支决策可能会因这些延迟而异。

airflow/example_dags/example_branch_datetime_operator.py[源代码]

empty_task_11 = EmptyOperator(task_id="date_in_range", dag=dag1)
empty_task_21 = EmptyOperator(task_id="date_outside_range", dag=dag1)

cond1 = BranchDateTimeOperator(
    task_id="datetime_branch",
    follow_task_ids_if_true=["date_in_range"],
    follow_task_ids_if_false=["date_outside_range"],
    target_upper=pendulum.datetime(2020, 10, 10, 15, 0, 0),
    target_lower=pendulum.datetime(2020, 10, 10, 14, 0, 0),
    dag=dag1,
)

# Run empty_task_11 if cond1 executes between 2020-10-10 14:00:00 and 2020-10-10 15:00:00
cond1 >> [empty_task_11, empty_task_21]

目标参数 target_uppertarget_lower 可以接收 datetime.datetimedatetime.timeNone。当使用 datetime.time 对象时,它将与当前日期组合以允许与其进行比较。如果 target_upper 设置为在给定 target_lower 之前发生的 datetime.time,则会将一天添加到 target_upper。这样做是为了允许跨越两个日期的时间段。

airflow/example_dags/example_branch_datetime_operator.py[源代码]

empty_task_12 = EmptyOperator(task_id="date_in_range", dag=dag2)
empty_task_22 = EmptyOperator(task_id="date_outside_range", dag=dag2)

cond2 = BranchDateTimeOperator(
    task_id="datetime_branch",
    follow_task_ids_if_true=["date_in_range"],
    follow_task_ids_if_false=["date_outside_range"],
    target_upper=pendulum.time(0, 0, 0),
    target_lower=pendulum.time(15, 0, 0),
    dag=dag2,
)

# Since target_lower happens after target_upper, target_upper will be moved to the following day
# Run empty_task_12 if cond2 executes between 15:00:00, and 00:00:00 of the following day
cond2 >> [empty_task_12, empty_task_22]

如果目标参数设置为 None,则运算符将仅使用非 None 目标执行单边比较。将 target_uppertarget_lower 都设置为 None 将引发异常。

使用逻辑日期

这种用法对“数据范围”更友好。当 DAG 重新运行时,logical_date 不会改变,并且它不受执行延迟的影响,因此这种方法适用于可能回填的幂等 DAG 运行。

airflow/example_dags/example_branch_datetime_operator.py[源代码]

empty_task_13 = EmptyOperator(task_id="date_in_range", dag=dag3)
empty_task_23 = EmptyOperator(task_id="date_outside_range", dag=dag3)

cond3 = BranchDateTimeOperator(
    task_id="datetime_branch",
    use_task_logical_date=True,
    follow_task_ids_if_true=["date_in_range"],
    follow_task_ids_if_false=["date_outside_range"],
    target_upper=pendulum.datetime(2020, 10, 10, 15, 0, 0),
    target_lower=pendulum.datetime(2020, 10, 10, 14, 0, 0),
    dag=dag3,
)

# Run empty_task_13 if cond3 executes between 2020-10-10 14:00:00 and 2020-10-10 15:00:00
cond3 >> [empty_task_13, empty_task_23]

此条目有帮助吗?