数据感知调度

2.4 版新增功能。

快速入门

除了根据时间调度 DAG 之外,您还可以根据任务更新数据集的时间来调度 DAG 运行。

from airflow.datasets import Dataset

with DAG(...):
    MyOperator(
        # this task updates example.csv
        outlets=[Dataset("s3://dataset-bucket/example.csv")],
        ...,
    )


with DAG(
    # this DAG should be run when example.csv is updated (by dag1)
    schedule=[Dataset("s3://dataset-bucket/example.csv")],
    ...,
):
    ...
../_images/dataset-scheduled-dags.png

什么是“数据集”?

Airflow 数据集是数据的逻辑分组。上游生产者任务可以更新数据集,数据集更新有助于调度下游消费者 DAG。

统一资源标识符 (URI) 定义数据集

from airflow.datasets import Dataset

example_dataset = Dataset("s3://dataset-bucket/example.csv")

Airflow 对 URI 表示的数据的内容或位置不做任何假设,并将 URI 视为字符串。这意味着 Airflow 将任何正则表达式(如 input_\d+.csv)或文件通配符模式(如 input_2022*.csv)视为从一个声明创建多个数据集的尝试,并且它们将不起作用。

您必须使用有效的 URI 创建数据集。Airflow 核心和提供程序定义了您可以使用的各种 URI 方案,例如 file(核心)、postgres(由 Postgres 提供程序提供)和 s3(由 Amazon 提供程序提供)。第三方提供程序和插件也可能提供自己的方案。这些预定义的方案具有预期要遵循的个别语义。

什么是有效的 URI?

从技术上讲,URI 必须符合 RFC 3986 中的有效字符集,该字符集基本上是 ASCII 字母数字字符,以及 %-_.~。要标识无法由 URI 安全字符表示的资源,请使用 百分比编码 对资源名称进行编码。

URI 也区分大小写,因此 s3://example/datasets3://Example/Dataset 被认为是不同的。请注意,URI 的*主机*部分也区分大小写,这与 RFC 3986 不同。

不要使用 airflow 方案,该方案是为 Airflow 的内部保留的。

Airflow 始终倾向于在方案中使用小写字母,并且在 URI 的主机部分需要区分大小写,以便正确区分资源。

# invalid datasets:
reserved = Dataset("airflow://example_dataset")
not_ascii = Dataset("èxample_datašet")

如果要使用不包含其他语义约束的方案定义数据集,请使用带有前缀 x- 的方案。Airflow 会跳过对具有这些方案的 URI 的任何语义验证。

# valid dataset, treated as a plain string
my_ds = Dataset("x-my-thing://foobarbaz")

标识符不必是绝对的;它可以是无方案的相对 URI,甚至只是一个简单的路径或字符串

# valid datasets:
schemeless = Dataset("//example/dataset")
csv_file = Dataset("example_dataset")

非绝对标识符被视为不包含任何对 Airflow 有语义的普通字符串。

数据集的额外信息

如果需要,您可以在数据集中包含额外的字典

example_dataset = Dataset(
    "s3://dataset/example.csv",
    extra={"team": "trainees"},
)

此额外信息不会影响数据集的身份。这意味着即使额外的字典不同,DAG 也会由具有相同 URI 的数据集触发

with DAG(
    dag_id="consumer",
    schedule=[Dataset("s3://dataset/example.csv", extra={"different": "extras"})],
):
    ...

with DAG(dag_id="producer", ...):
    MyOperator(
        # triggers "consumer" with the given extra!
        outlets=[Dataset("s3://dataset/example.csv", extra={"team": "trainees"})],
        ...,
    )

注意

**安全说明:**数据集 URI 和额外字段未加密,它们以明文形式存储在 Airflow 的元数据数据库中。请勿在数据集 URI 或额外键值中存储任何敏感值,尤其是凭据!

如何在 DAG 中使用数据集

您可以使用数据集来指定 DAG 中的数据依赖关系。以下示例显示了在 producer DAG 中的 producer 任务成功完成后,Airflow 如何调度 consumer DAG。仅当任务成功完成时,Airflow 才会将数据集标记为 updated。如果任务失败或被跳过,则不会发生更新,并且 Airflow 不会调度 consumer DAG。

example_dataset = Dataset("s3://dataset/example.csv")

with DAG(dag_id="producer", ...):
    BashOperator(task_id="producer", outlets=[example_dataset], ...)

with DAG(dag_id="consumer", schedule=[example_dataset], ...):
    ...

您可以在 数据集视图 中找到数据集和 DAG 之间关系的列表

多个数据集

因为 schedule 参数是一个列表,所以 DAG 可能需要多个数据集。在 DAG 上次运行后,**所有** DAG 使用的数据集都至少更新一次后,Airflow 才会调度该 DAG

with DAG(
    dag_id="multiple_datasets_example",
    schedule=[
        example_dataset_1,
        example_dataset_2,
        example_dataset_3,
    ],
    ...,
):
    ...

如果在一个数据集多次更新之前所有使用的的数据集都已更新,则下游 DAG 仍然只运行一次,如下图所示

graph dataset_event_timeline { graph [layout=neato] { node [margin=0 fontcolor=blue width=0.1 shape=point label=""] e1 [pos="1,2.5!"] e2 [pos="2,2.5!"] e3 [pos="2.5,2!"] e4 [pos="4,2.5!"] e5 [pos="5,2!"] e6 [pos="6,2.5!"] e7 [pos="7,1.5!"] r7 [pos="7,1!" shape=star width=0.25 height=0.25 fixedsize=shape] e8 [pos="8,2!"] e9 [pos="9,1.5!"] e10 [pos="10,2!"] e11 [pos="11,1.5!"] e12 [pos="12,2!"] e13 [pos="13,2.5!"] r13 [pos="13,1!" shape=star width=0.25 height=0.25 fixedsize=shape] } { node [shape=none label="" width=0] end_ds1 [pos="14,2.5!"] end_ds2 [pos="14,2!"] end_ds3 [pos="14,1.5!"] } { node [shape=none margin=0.25 fontname="roboto,sans-serif"] example_dataset_1 [ pos="-0.5,2.5!"] example_dataset_2 [ pos="-0.5,2!"] example_dataset_3 [ pos="-0.5,1.5!"] dag_runs [label="DagRuns created" pos="-0.5,1!"] } edge [color=lightgrey] example_dataset_1 -- e1 -- e2 -- e4 -- e6 -- e13 -- end_ds1 example_dataset_2 -- e3 -- e5 -- e8 -- e10 -- e12 -- end_ds2 example_dataset_3 -- e7 -- e9 -- e11 -- end_ds3 }

从触发数据集事件中获取信息

触发的 DAG 可以使用 triggering_dataset_events 模板或参数从触发它的数据集中获取信息。有关详细信息,请参阅 模板参考

示例

example_snowflake_dataset = Dataset("snowflake://my_db.my_schema.my_table")

with DAG(dag_id="load_snowflake_data", schedule="@hourly", ...):
    SQLExecuteQueryOperator(
        task_id="load", conn_id="snowflake_default", outlets=[example_snowflake_dataset], ...
    )

with DAG(dag_id="query_snowflake_data", schedule=[example_snowflake_dataset], ...):
    SQLExecuteQueryOperator(
        task_id="query",
        conn_id="snowflake_default",
        sql="""
          SELECT *
          FROM my_db.my_schema.my_table
          WHERE "updated_at" >= '{{ (triggering_dataset_events.values() | first | first).source_dag_run.data_interval_start }}'
          AND "updated_at" < '{{ (triggering_dataset_events.values() | first | first).source_dag_run.data_interval_end }}';
        """,
    )

    @task
    def print_triggering_dataset_events(triggering_dataset_events=None):
        for dataset, dataset_list in triggering_dataset_events.items():
            print(dataset, dataset_list)
            print(dataset_list[0].source_dag_run.dag_id)

    print_triggering_dataset_events()

请注意,此示例使用 (.values() | first | first) 来获取提供给 DAG 的一个数据集中的第一个,以及该数据集的一个 DatasetEvent 中的第一个。如果您有多个数据集,并且可能有多个 DatasetEvent,则实现可能会非常复杂。

使用条件表达式进行高级数据集调度

Apache Airflow 包括使用数据集的条件表达式的高级调度功能。此功能允许您根据数据集更新定义 DAG 执行的复杂依赖关系,使用逻辑运算符可以更好地控制工作流触发器。

数据集的逻辑运算符

Airflow 支持两个用于组合数据集条件的逻辑运算符

  • **AND (``&``)**:指定仅在所有指定数据集都已更新后才应触发 DAG。

  • **OR (``|``)**:指定在任何指定数据集更新时都应触发 DAG。

这些运算符使您能够将 Airflow 工作流配置为使用更复杂的数据集更新条件,使其更加动态和灵活。

用例示例

基于多个数据集更新进行调度

要将 DAG 调度为仅在两个特定数据集都已更新时才运行,请使用 AND 运算符 (&)

dag1_dataset = Dataset("s3://dag1/output_1.txt")
dag2_dataset = Dataset("s3://dag2/output_1.txt")

with DAG(
    # Consume dataset 1 and 2 with dataset expressions
    schedule=(dag1_dataset & dag2_dataset),
    ...,
):
    ...

基于任何数据集更新进行调度

要在两个数据集中的任何一个更新时触发 DAG 执行,请应用 OR 运算符 (|)

with DAG(
    # Consume dataset 1 or 2 with dataset expressions
    schedule=(dag1_dataset | dag2_dataset),
    ...,
):
    ...

复杂条件逻辑

对于需要更复杂条件的场景,例如在一个数据集更新时或在另外两个数据集都更新时触发 DAG,请组合使用 OR 和 AND 运算符

dag3_dataset = Dataset("s3://dag3/output_3.txt")

with DAG(
    # Consume dataset 1 or both 2 and 3 with dataset expressions
    schedule=(dag1_dataset | (dag2_dataset & dag3_dataset)),
    ...,
):
    ...

组合数据集和基于时间的调度

DatasetTimetable 集成

您可以使用 DatasetOrTimeSchedule 根据数据集事件和基于时间的调度来调度 DAG。这允许您在 DAG 需要由数据更新触发并根据固定时间表定期运行时创建工作流。

有关 DatasetOrTimeSchedule 的更多详细信息,请参阅 DatasetOrTimeSchedule 中的相应部分。

此条目有帮助吗?