数据感知调度¶
2.4 版新增功能。
快速入门¶
除了根据时间调度 DAG 之外,您还可以根据任务更新数据集的时间来调度 DAG 运行。
from airflow.datasets import Dataset
with DAG(...):
MyOperator(
# this task updates example.csv
outlets=[Dataset("s3://dataset-bucket/example.csv")],
...,
)
with DAG(
# this DAG should be run when example.csv is updated (by dag1)
schedule=[Dataset("s3://dataset-bucket/example.csv")],
...,
):
...
什么是“数据集”?¶
Airflow 数据集是数据的逻辑分组。上游生产者任务可以更新数据集,数据集更新有助于调度下游消费者 DAG。
统一资源标识符 (URI) 定义数据集
from airflow.datasets import Dataset
example_dataset = Dataset("s3://dataset-bucket/example.csv")
Airflow 对 URI 表示的数据的内容或位置不做任何假设,并将 URI 视为字符串。这意味着 Airflow 将任何正则表达式(如 input_\d+.csv
)或文件通配符模式(如 input_2022*.csv
)视为从一个声明创建多个数据集的尝试,并且它们将不起作用。
您必须使用有效的 URI 创建数据集。Airflow 核心和提供程序定义了您可以使用的各种 URI 方案,例如 file
(核心)、postgres
(由 Postgres 提供程序提供)和 s3
(由 Amazon 提供程序提供)。第三方提供程序和插件也可能提供自己的方案。这些预定义的方案具有预期要遵循的个别语义。
什么是有效的 URI?¶
从技术上讲,URI 必须符合 RFC 3986 中的有效字符集,该字符集基本上是 ASCII 字母数字字符,以及 %
、-
、_
、.
和 ~
。要标识无法由 URI 安全字符表示的资源,请使用 百分比编码 对资源名称进行编码。
URI 也区分大小写,因此 s3://example/dataset
和 s3://Example/Dataset
被认为是不同的。请注意,URI 的*主机*部分也区分大小写,这与 RFC 3986 不同。
不要使用 airflow
方案,该方案是为 Airflow 的内部保留的。
Airflow 始终倾向于在方案中使用小写字母,并且在 URI 的主机部分需要区分大小写,以便正确区分资源。
# invalid datasets:
reserved = Dataset("airflow://example_dataset")
not_ascii = Dataset("èxample_datašet")
如果要使用不包含其他语义约束的方案定义数据集,请使用带有前缀 x-
的方案。Airflow 会跳过对具有这些方案的 URI 的任何语义验证。
# valid dataset, treated as a plain string
my_ds = Dataset("x-my-thing://foobarbaz")
标识符不必是绝对的;它可以是无方案的相对 URI,甚至只是一个简单的路径或字符串
# valid datasets:
schemeless = Dataset("//example/dataset")
csv_file = Dataset("example_dataset")
非绝对标识符被视为不包含任何对 Airflow 有语义的普通字符串。
数据集的额外信息¶
如果需要,您可以在数据集中包含额外的字典
example_dataset = Dataset(
"s3://dataset/example.csv",
extra={"team": "trainees"},
)
此额外信息不会影响数据集的身份。这意味着即使额外的字典不同,DAG 也会由具有相同 URI 的数据集触发
with DAG(
dag_id="consumer",
schedule=[Dataset("s3://dataset/example.csv", extra={"different": "extras"})],
):
...
with DAG(dag_id="producer", ...):
MyOperator(
# triggers "consumer" with the given extra!
outlets=[Dataset("s3://dataset/example.csv", extra={"team": "trainees"})],
...,
)
注意
**安全说明:**数据集 URI 和额外字段未加密,它们以明文形式存储在 Airflow 的元数据数据库中。请勿在数据集 URI 或额外键值中存储任何敏感值,尤其是凭据!
如何在 DAG 中使用数据集¶
您可以使用数据集来指定 DAG 中的数据依赖关系。以下示例显示了在 producer
DAG 中的 producer
任务成功完成后,Airflow 如何调度 consumer
DAG。仅当任务成功完成时,Airflow 才会将数据集标记为 updated
。如果任务失败或被跳过,则不会发生更新,并且 Airflow 不会调度 consumer
DAG。
example_dataset = Dataset("s3://dataset/example.csv")
with DAG(dag_id="producer", ...):
BashOperator(task_id="producer", outlets=[example_dataset], ...)
with DAG(dag_id="consumer", schedule=[example_dataset], ...):
...
您可以在 数据集视图 中找到数据集和 DAG 之间关系的列表
多个数据集¶
因为 schedule
参数是一个列表,所以 DAG 可能需要多个数据集。在 DAG 上次运行后,**所有** DAG 使用的数据集都至少更新一次后,Airflow 才会调度该 DAG
with DAG(
dag_id="multiple_datasets_example",
schedule=[
example_dataset_1,
example_dataset_2,
example_dataset_3,
],
...,
):
...
如果在一个数据集多次更新之前所有使用的的数据集都已更新,则下游 DAG 仍然只运行一次,如下图所示
从触发数据集事件中获取信息¶
触发的 DAG 可以使用 triggering_dataset_events
模板或参数从触发它的数据集中获取信息。有关详细信息,请参阅 模板参考。
示例
example_snowflake_dataset = Dataset("snowflake://my_db.my_schema.my_table")
with DAG(dag_id="load_snowflake_data", schedule="@hourly", ...):
SQLExecuteQueryOperator(
task_id="load", conn_id="snowflake_default", outlets=[example_snowflake_dataset], ...
)
with DAG(dag_id="query_snowflake_data", schedule=[example_snowflake_dataset], ...):
SQLExecuteQueryOperator(
task_id="query",
conn_id="snowflake_default",
sql="""
SELECT *
FROM my_db.my_schema.my_table
WHERE "updated_at" >= '{{ (triggering_dataset_events.values() | first | first).source_dag_run.data_interval_start }}'
AND "updated_at" < '{{ (triggering_dataset_events.values() | first | first).source_dag_run.data_interval_end }}';
""",
)
@task
def print_triggering_dataset_events(triggering_dataset_events=None):
for dataset, dataset_list in triggering_dataset_events.items():
print(dataset, dataset_list)
print(dataset_list[0].source_dag_run.dag_id)
print_triggering_dataset_events()
请注意,此示例使用 (.values() | first | first) 来获取提供给 DAG 的一个数据集中的第一个,以及该数据集的一个 DatasetEvent 中的第一个。如果您有多个数据集,并且可能有多个 DatasetEvent,则实现可能会非常复杂。
使用条件表达式进行高级数据集调度¶
Apache Airflow 包括使用数据集的条件表达式的高级调度功能。此功能允许您根据数据集更新定义 DAG 执行的复杂依赖关系,使用逻辑运算符可以更好地控制工作流触发器。
数据集的逻辑运算符¶
Airflow 支持两个用于组合数据集条件的逻辑运算符
**AND (``&``)**:指定仅在所有指定数据集都已更新后才应触发 DAG。
**OR (``|``)**:指定在任何指定数据集更新时都应触发 DAG。
这些运算符使您能够将 Airflow 工作流配置为使用更复杂的数据集更新条件,使其更加动态和灵活。
用例示例¶
基于多个数据集更新进行调度
要将 DAG 调度为仅在两个特定数据集都已更新时才运行,请使用 AND 运算符 (&
)
dag1_dataset = Dataset("s3://dag1/output_1.txt")
dag2_dataset = Dataset("s3://dag2/output_1.txt")
with DAG(
# Consume dataset 1 and 2 with dataset expressions
schedule=(dag1_dataset & dag2_dataset),
...,
):
...
基于任何数据集更新进行调度
要在两个数据集中的任何一个更新时触发 DAG 执行,请应用 OR 运算符 (|
)
with DAG(
# Consume dataset 1 or 2 with dataset expressions
schedule=(dag1_dataset | dag2_dataset),
...,
):
...
复杂条件逻辑
对于需要更复杂条件的场景,例如在一个数据集更新时或在另外两个数据集都更新时触发 DAG,请组合使用 OR 和 AND 运算符
dag3_dataset = Dataset("s3://dag3/output_3.txt")
with DAG(
# Consume dataset 1 or both 2 and 3 with dataset expressions
schedule=(dag1_dataset | (dag2_dataset & dag3_dataset)),
...,
):
...
组合数据集和基于时间的调度¶
DatasetTimetable 集成¶
您可以使用 DatasetOrTimeSchedule
根据数据集事件和基于时间的调度来调度 DAG。这允许您在 DAG 需要由数据更新触发并根据固定时间表定期运行时创建工作流。
有关 DatasetOrTimeSchedule
的更多详细信息,请参阅 DatasetOrTimeSchedule 中的相应部分。