动态任务映射¶
动态任务映射允许工作流在运行时根据当前数据创建多个任务,而无需 DAG 作者事先知道需要多少任务。
这类似于在 for 循环中定义任务,但不是让 DAG 文件获取数据并自行完成,而是调度器可以根据前一个任务的输出完成此操作。在映射任务执行之前,调度器会创建 n 个任务副本,每个输入对应一个副本。
还可以让任务对映射任务的收集输出进行操作,这通常被称为 map 和 reduce。
简单映射¶
最简单的形式是,您可以使用 expand()
函数而不是直接调用任务,来映射 DAG 文件中直接定义的列表。
如果您想看到动态任务映射的简单用法,可以查看下方内容。
src/airflow/example_dags/example_dynamic_task_mapping.py
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# https://apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Example DAG demonstrating the usage of dynamic task mapping."""
from __future__ import annotations
from datetime import datetime
from airflow.sdk import DAG, task
with DAG(dag_id="example_dynamic_task_mapping", schedule=None, start_date=datetime(2022, 3, 4)) as dag:
@task
def add_one(x: int):
return x + 1
@task
def sum_it(values):
total = sum(values)
print(f"Total was {total}")
added_values = add_one.expand(x=[1, 2, 3])
sum_it(added_values)
with DAG(
dag_id="example_task_mapping_second_order", schedule=None, catchup=False, start_date=datetime(2022, 3, 4)
) as dag2:
@task
def get_nums():
return [1, 2, 3]
@task
def times_2(num):
return num * 2
@task
def add_10(num):
return num + 10
_get_nums = get_nums()
_times_2 = times_2.expand(num=_get_nums)
add_10.expand(num=_times_2)
执行时,任务日志中将显示 Total was 9
。
这是生成的 DAG 结构。

网格视图还在详细信息面板中提供了映射任务的可见性。

注意
expand()
函数仅允许传入关键字参数。
注意
从映射任务传递的值是惰性代理
在上面的示例中,sum_it
接收到的 values
是 add_one
的每个映射实例返回的所有值的集合。然而,由于无法事先知道我们将有多少个 add_one
实例,values
不是一个普通的列表,而是一个“惰性序列”,只有在请求时才会检索每个单独的值。因此,如果您直接运行 print(values)
,您会得到类似这样的结果:
LazySelectSequence([15 items])
您可以在此对象上使用正常的序列语法(例如 values[0]
),或使用 for
循环正常迭代。list(values)
将为您提供一个“真实”的 list
,但由于这将立即加载所有引用上游映射任务的值,如果映射数量很大,您必须注意潜在的性能影响。
请注意,当您将此代理对象推送到 XCom 时,同样适用。Airflow 会尝试智能地强制转换值,但会为此发出警告,以便您知道这一点。例如:
@task
def forward_values(values):
return values # This is a lazy proxy!
将发出如下警告:
Coercing mapped lazy proxy return value from task forward_values to list, which may degrade
performance. Review resource requirements for this operation, and call list() explicitly to suppress this message. See Dynamic Task Mapping documentation for more information about lazy proxy objects.
可以通过修改任务来抑制此消息,如下所示:
@task
def forward_values(values):
return list(values)
注意
不需要 reduce 任务。
虽然我们在这里展示了一个“reduce”任务(sum_it
)您不一定需要它,即使没有下游任务,映射任务仍将执行。
任务生成的映射¶
上面展示的示例都可以通过在 DAG 文件中使用 for
循环来实现,但动态任务映射的真正强大之处在于能够让任务生成要迭代的列表。
@task
def make_list():
# This can also be from an API call, checking a database, -- almost anything you like, as long as the
# resulting list/dictionary can be stored in the current XCom backend.
return [1, 2, {"a": "b"}, "str"]
@task
def consumer(arg):
print(arg)
with DAG(dag_id="dynamic-map", start_date=datetime(2022, 4, 2)) as dag:
consumer.expand(arg=make_list())
make_list
任务作为普通任务运行,必须返回一个列表或字典(参见 可以展开哪些数据类型?),然后 consumer
任务将被调用四次,每次调用都会接收 make_list
返回值中的一个值。
警告
任务生成的映射不能与 TriggerRule.ALWAYS
一起使用。
在任务生成的映射中设置 trigger_rule=TriggerRule.ALWAYS
是不允许的,因为在任务立即执行时,展开的参数是未定义的。这在 DAG 解析时强制执行,对任务和映射任务组都适用,如果您尝试使用它将引发错误。在最近的示例中,在 consumer
任务中设置 trigger_rule=TriggerRule.ALWAYS
将引发错误,因为 make_list
是任务生成的映射。
重复映射¶
一个映射任务的结果也可以用作下一个映射任务的输入。
with DAG(dag_id="repeated_mapping", start_date=datetime(2022, 3, 4)) as dag:
@task
def add_one(x: int):
return x + 1
first = add_one.expand(x=[1, 2, 3])
second = add_one.expand(x=first)
这将导致结果为 [3, 4, 5]
。
添加不展开的参数¶
除了传递在运行时展开的参数外,还可以传递不改变的参数——为了清楚区分这两种参数,我们使用不同的函数:expand()
用于映射参数,partial()
用于未映射参数。
@task
def add(x: int, y: int):
return x + y
added_values = add.partial(y=10).expand(x=[1, 2, 3])
# This results in add function being expanded to
# add(x=1, y=10)
# add(x=2, y=10)
# add(x=3, y=10)
这将导致值为 11、12 和 13。
这对于将连接 ID、数据库表名或 bucket 名称等内容传递给任务也很有用。
映射多个参数¶
除了单个参数外,还可以传递多个参数进行展开。这将产生“笛卡尔积”的效果,用每个参数组合调用映射任务。
@task
def add(x: int, y: int):
return x + y
added_values = add.expand(x=[2, 4, 8], y=[5, 10])
# This results in the add function being called with
# add(x=2, y=5)
# add(x=2, y=10)
# add(x=4, y=5)
# add(x=4, y=10)
# add(x=8, y=5)
# add(x=8, y=10)
这将导致 add 任务被调用 6 次。但请注意,不能保证展开的顺序。
命名映射¶
默认情况下,映射任务被分配一个整数索引。可以通过在 Airflow UI 中为每个映射任务基于任务输入提供一个名称来覆盖整数索引。这是通过为任务提供一个 Jinja 模板来实现的,使用 map_index_template
。通常,当展开看起来像 .expand(<property>=...)
时,模板看起来像 map_index_template="{{ task.<property> }}"
。此模板在每个展开的任务执行后使用任务上下文进行渲染。这意味着您可以像这样引用任务上的属性:
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
# The two expanded task instances will be named "2024-01-01" and "2024-01-02".
SQLExecuteQueryOperator.partial(
...,
sql="SELECT * FROM data WHERE date = %(date)s",
map_index_template="""{{ task.parameters['date'] }}""",
).expand(
parameters=[{"date": "2024-01-01"}, {"date": "2024-01-02"}],
)
在上面的示例中,展开的任务实例将被命名为“2024-01-01”和“2024-01-02”。这些名称会显示在 Airflow UI 中,而不是分别显示“0”和“1”。
由于模板是在主执行块之后渲染的,因此也可以动态注入到渲染上下文中。这在 Jinja 模板语法难以表达所需的名称渲染逻辑时非常有用,尤其是在 taskflow 函数中。例如:
from airflow.sdk import get_current_context
@task(map_index_template="{{ my_variable }}")
def my_task(my_value: str):
context = get_current_context()
context["my_variable"] = my_value * 3
... # Normal execution...
# The task instances will be named "aaa" and "bbb".
my_task.expand(my_value=["a", "b"])
使用非 TaskFlow Operator 进行映射¶
也可以将 partial
和 expand
与经典风格的 Operator 一起使用。某些参数不可映射,必须传递给 partial()
,例如 task_id
、queue
、pool
以及 BaseOperator
的大多数其他参数。
src/airflow/example_dags/example_dynamic_task_mapping_with_no_taskflow_operators.py
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# https://apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Example DAG demonstrating the usage of dynamic task mapping with non-TaskFlow operators."""
from __future__ import annotations
from datetime import datetime
from airflow.models.baseoperator import BaseOperator
from airflow.sdk import DAG
class AddOneOperator(BaseOperator):
"""A custom operator that adds one to the input."""
def __init__(self, value, **kwargs):
super().__init__(**kwargs)
self.value = value
def execute(self, context):
return self.value + 1
class SumItOperator(BaseOperator):
"""A custom operator that sums the input."""
template_fields = ("values",)
def __init__(self, values, **kwargs):
super().__init__(**kwargs)
self.values = values
def execute(self, context):
total = sum(self.values)
print(f"Total was {total}")
return total
with DAG(
dag_id="example_dynamic_task_mapping_with_no_taskflow_operators",
schedule=None,
start_date=datetime(2022, 3, 4),
catchup=False,
):
# map the task to a list of values
add_one_task = AddOneOperator.partial(task_id="add_one").expand(value=[1, 2, 3])
# aggregate (reduce) the mapped tasks results
sum_it_task = SumItOperator(task_id="sum_it", values=add_one_task.output)
注意
partial()
函数仅允许传入关键字参数。
映射经典 Operator 的结果¶
如果您想映射经典 Operator 的结果,您应该明确引用其 *输出*,而不是 Operator 本身。
# Create a list of data inputs.
extract = ExtractOperator(task_id="extract")
# Expand the operator to transform each input.
transform = TransformOperator.partial(task_id="transform").expand(input=extract.output)
# Collect the transformed inputs, expand the operator to load each one of them to the target.
load = LoadOperator.partial(task_id="load").expand(input=transform.output)
混合使用 TaskFlow 和经典 Operator¶
在此示例中,您有一个常规数据传递到 S3 bucket,并且想要对到达的每个文件应用相同的处理,无论每次到达多少文件。
from datetime import datetime
from airflow.sdk import DAG
from airflow.sdk import task
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.operators.s3 import S3ListOperator
with DAG(dag_id="mapped_s3", start_date=datetime(2020, 4, 7)) as dag:
list_filenames = S3ListOperator(
task_id="get_input",
bucket="example-bucket",
prefix='incoming/provider_a/{{ data_interval_start.strftime("%Y-%m-%d") }}',
)
@task
def count_lines(aws_conn_id, bucket, filename):
hook = S3Hook(aws_conn_id=aws_conn_id)
return len(hook.read_key(filename, bucket).splitlines())
@task
def total(lines):
return sum(lines)
counts = count_lines.partial(aws_conn_id="aws_default", bucket=list_filenames.bucket).expand(
filename=list_filenames.output
)
total(lines=counts)
为非 TaskFlow Operator 分配多个参数¶
有时上游需要为下游 Operator 指定多个参数。为此,您可以使用 expand_kwargs
函数,该函数接受一个映射序列来进行映射。
BashOperator.partial(task_id="bash").expand_kwargs(
[
{"bash_command": "echo $ENV1", "env": {"ENV1": "1"}},
{"bash_command": "printf $ENV2", "env": {"ENV2": "2"}},
],
)
这在运行时产生两个任务实例,分别打印 1
和 2
。
还可以将 expand_kwargs
与大多数 Operator 参数混合使用,例如 PythonOperator 的 op_kwargs
。
def print_args(x, y):
print(x)
print(y)
return x + y
PythonOperator.partial(task_id="task-1", python_callable=print_args).expand_kwargs(
[
{"op_kwargs": {"x": 1, "y": 2}, "show_return_value_in_logs": True},
{"op_kwargs": {"x": 3, "y": 4}, "show_return_value_in_logs": False},
]
)
与 expand
类似,您也可以映射返回字典列表的 XCom,或映射返回字典的 XCom 列表。重用上面的 S3 示例,您可以使用映射任务来执行“分支”并将文件复制到不同的 bucket。
list_filenames = S3ListOperator(...) # Same as the above example.
@task
def create_copy_kwargs(filename):
if filename.rsplit(".", 1)[-1] not in ("json", "yml"):
dest_bucket_name = "my_text_bucket"
else:
dest_bucket_name = "my_other_bucket"
return {
"source_bucket_key": filename,
"dest_bucket_key": filename,
"dest_bucket_name": dest_bucket_name,
}
copy_kwargs = create_copy_kwargs.expand(filename=list_filenames.output)
# Copy files to another bucket, based on the file's extension.
copy_filenames = S3CopyObjectOperator.partial(
task_id="copy_files", source_bucket_name=list_filenames.bucket
).expand_kwargs(copy_kwargs)
映射任务组¶
类似于 TaskFlow 任务,您也可以对 @task_group
装饰的函数调用 expand
或 expand_kwargs
来创建映射任务组。
注意
为简洁起见,本节省略了单个任务的实现。
@task_group
def file_transforms(filename):
return convert_to_yaml(filename)
file_transforms.expand(filename=["data1.json", "data2.json"])
在上面的示例中,任务 convert_to_yaml
在运行时被展开为两个任务实例。第一个展开的实例将接收 "data1.json"
作为输入,第二个接收 "data2.json"
。
任务组函数中的值引用¶
任务函数(@task
)和任务 *组* 函数(@task_group
)之间一个重要的区别是,由于任务组没有关联的工作进程,任务组函数中的代码无法解析传递给它的参数;实际值只有当引用传递给任务时才会被解析。
例如,这段代码将 *无法* 工作:
@task def my_task(value): print(value) @task_group def my_task_group(value): if not value: # DOES NOT work as you'd expect! task_a = EmptyOperator(...) else: task_a = PythonOperator(...) task_a << my_task(value) my_task_group.expand(value=[0, 1, 2])
当 my_task_group
中的代码执行时,value
仍然只会是一个引用,而不是实际值,所以 if not value
分支不会按照您可能期望的方式工作。但是,如果您将该引用传递给一个任务,它将在任务执行时被解析,因此三个 my_task
实例将分别接收到 1、2 和 3。
因此,重要的是要记住,如果您打算对传递给任务组函数的值执行任何逻辑,则必须始终使用任务来运行逻辑,例如用于条件的 @task.branch
(或 BranchPythonOperator
),以及用于循环的任务映射方法。
注意
映射任务组中不允许进行任务映射
目前不允许在映射任务组中嵌套进行任务映射。虽然此功能的技术方面不是特别困难,但我们决定有意省略此功能,因为它会增加相当大的 UI 复杂性,并且可能对一般用例不是必需的。未来可能会根据用户反馈重新考虑此限制。
深度优先执行¶
如果映射任务组包含多个任务,则组中的所有任务将针对相同的输入“一起”展开。例如:
@task_group
def file_transforms(filename):
converted = convert_to_yaml(filename)
return replace_defaults(converted)
file_transforms.expand(filename=["data1.json", "data2.json"])
由于组 file_transforms
被展开为两个,任务 convert_to_yaml
和 replace_defaults
在运行时将各自成为两个实例。
通过如下所示单独展开这两个任务,也可以实现类似的效果:
converted = convert_to_yaml.expand(filename=["data1.json", "data2.json"])
replace_defaults.expand(filename=converted)
然而,区别在于任务组允许其中的每个任务只依赖于其“相关输入”。对于上面的示例,第一个 replace_defaults
将能够在 convert_to_yaml("data2.json")
完成之前运行,并且不需要关心它是否成功。这种策略被称为*深度优先执行*(与简单的无组*广度优先执行*相对),它允许更合乎逻辑的任务分离、更精细的依赖规则以及准确的资源分配——以上面的示例为例,第一个 replace_defaults
将能够在 convert_to_yaml("data2.json")
完成之前运行,并且不需要关心它是否成功。
依赖映射任务组的输出¶
类似于映射任务组,依赖于映射任务组的输出也将自动聚合组的结果。
@task_group
def add_to(value):
value = add_one(value)
return double(value)
results = add_to.expand(value=[1, 2, 3])
consumer(results) # Will receive [4, 6, 8].
也可以像对待普通映射任务的结果一样执行任何操作。
根据映射任务组的输出进行分支¶
虽然不能对映射任务的结果实现分支逻辑(例如使用 @task.branch
),但可以根据任务组的 *输入* 进行分支。以下示例演示了根据映射任务组的输入执行三个任务之一。
inputs = ["a", "b", "c"]
@task_group(group_id="my_task_group")
def my_task_group(input):
@task.branch
def branch(element):
if "a" in element:
return "my_task_group.a"
elif "b" in element:
return "my_task_group.b"
else:
return "my_task_group.c"
@task
def a():
print("a")
@task
def b():
print("b")
@task
def c():
print("c")
branch(input) >> [a(), b(), c()]
my_task_group.expand(input=inputs)
过滤映射任务中的项¶
映射任务可以通过返回 None
来阻止任何元素传递给其下游任务。例如,如果我们只想将 S3 bucket 中具有特定扩展名的文件复制到另一个 bucket,我们可以这样实现 create_copy_kwargs
:
@task
def create_copy_kwargs(filename):
# Skip files not ending with these suffixes.
if filename.rsplit(".", 1)[-1] not in ("json", "yml"):
return None
return {
"source_bucket_key": filename,
"dest_bucket_key": filename,
"dest_bucket_name": "my_other_bucket",
}
# copy_kwargs and copy_files are implemented the same.
这使得 copy_files
只针对 .json
和 .yml
文件进行展开,同时忽略其余文件。
转换展开数据¶
由于通常需要转换任务映射的输出数据格式,尤其是从非 TaskFlow Operator 获取的输出(其输出格式是预定的,且无法轻松转换,例如上面示例中的 create_copy_kwargs
),可以使用特殊的 map()
函数来轻松执行此类转换。因此,上面的示例可以修改如下:
from airflow.exceptions import AirflowSkipException
list_filenames = S3ListOperator(...) # Unchanged.
def create_copy_kwargs(filename):
if filename.rsplit(".", 1)[-1] not in ("json", "yml"):
raise AirflowSkipException(f"skipping {filename!r}; unexpected suffix")
return {
"source_bucket_key": filename,
"dest_bucket_key": filename,
"dest_bucket_name": "my_other_bucket",
}
copy_kwargs = list_filenames.output.map(create_copy_kwargs)
# Unchanged.
copy_filenames = S3CopyObjectOperator.partial(...).expand_kwargs(copy_kwargs)
有几点需要注意:
组合上游数据(又称“zip”)¶
将多个输入源组合成一个任务映射可迭代对象也很常见。这通常称为“zipping”(类似于 Python 内置的 zip()
函数),也作为下游任务的预处理执行。
这对于任务映射中的条件逻辑特别有用。例如,如果您想从 S3 下载文件,但重命名这些文件,就可以实现类似这样的功能:
list_filenames_a = S3ListOperator(
task_id="list_files_in_a",
bucket="bucket",
prefix="incoming/provider_a/{{ data_interval_start|ds }}",
)
list_filenames_b = ["rename_1", "rename_2", "rename_3", ...]
filenames_a_b = list_filenames_a.output.zip(list_filenames_b)
@task
def download_filea_from_a_rename(filenames_a_b):
fn_a, fn_b = filenames_a_b
S3Hook().download_file(fn_a, local_path=fn_b)
download_filea_from_a_rename.expand(filenames_a_b=filenames_a_b)
类似于内置的 zip()
,您可以将任意数量的可迭代对象一起打包,以获得包含位置参数数量的元组的可迭代对象。默认情况下,打包后的可迭代对象的长度与最短的打包可迭代对象相同,多余的项将被丢弃。可以传递可选的关键字参数 default
来切换行为,使其与 Python 的 itertools.zip_longest()
匹配——打包后的可迭代对象将具有与*最长*的打包可迭代对象相同的长度,缺失的项将用 default
提供的值填充。
连接多个上游¶
组合输入源的另一种常见模式是针对多个可迭代对象运行相同的任务。当然,针对每个可迭代对象单独运行相同的代码是完全有效的,例如:
list_filenames_a = S3ListOperator(
task_id="list_files_in_a",
bucket="bucket",
prefix="incoming/provider_a/{{ data_interval_start|ds }}",
)
list_filenames_b = S3ListOperator(
task_id="list_files_in_b",
bucket="bucket",
prefix="incoming/provider_b/{{ data_interval_start|ds }}",
)
@task
def download_file(filename):
S3Hook().download_file(filename)
# process file...
download_file.override(task_id="download_file_a").expand(filename=list_filenames_a.output)
download_file.override(task_id="download_file_b").expand(filename=list_filenames_b.output)
然而,如果任务可以合并成一个,DAG 将更具可扩展性且更易于检查。可以使用 concat
来完成此操作:
# Tasks list_filenames_a and list_filenames_b, and download_file stay unchanged.
list_filenames_concat = list_filenames_a.concat(list_filenames_b)
download_file.expand(filename=list_filenames_concat)
这将创建一个任务,针对两个列表进行展开。您可以将任意数量的可迭代对象进行 concat
(例如 foo.concat(bar, rex)
);或者,由于返回值也是一个 XCom 引用,因此可以将 concat
调用链接起来(例如 foo.concat(bar).concat(rex)
)以达到相同的效果:一个单个可迭代对象,按顺序连接所有可迭代对象,类似于 Python 的 itertools.chain()
。
可以展开哪些数据类型?¶
目前只能映射字典、列表,或者存储在 XCom 中作为任务结果的这些类型之一。
如果上游任务返回不可映射的类型,映射任务将在运行时以 UnmappableXComTypePushed
异常失败。例如,您不能让上游任务返回一个纯字符串——它必须是列表或字典。
模板字段和映射参数如何交互?¶
Operator 的所有参数都可以映射,即使是那些不接受模板参数的参数。
如果字段被标记为模板化并且被映射,它将**不会被模板化**。
例如,这将打印 {{ ds }}
而不是日期戳。
@task
def make_list():
return ["{{ ds }}"]
@task
def printer(val):
print(val)
printer.expand(val=make_list())
如果您想插入值,可以自己调用 task.render_template
,或使用插值。
@task
def make_list(ds=None):
return [ds]
@task
def make_list(**context):
return [context["task"].render_template("{{ ds }}", context)]
对映射任务设置限制¶
您可以对任务设置两个限制:
展开可以创建的映射任务实例数量。
映射任务可以同时运行的数量。
限制映射任务的数量
[core]
max_map_length
配置选项是expand
可以创建的最大任务数量——默认值为 1024。如果源任务(我们前面示例中的
make_list
)返回的列表长度超过此值,将导致*该*任务失败。限制映射任务的并行副本数量
如果您不想让大型映射任务占用所有可用的运行器插槽,您可以使用任务上的
max_active_tis_per_dag
设置来限制同时运行的任务数量。但请注意,这适用于该任务在所有活动 DagRun 中的所有副本,而不仅仅是此特定 DagRun 中的副本。
@task(max_active_tis_per_dag=16) def add_one(x: int): return x + 1 BashOperator.partial(task_id="my_task", max_active_tis_per_dag=16).expand(bash_command=commands)
自动跳过零长度映射¶
如果输入为空(零长度),将不会创建新任务,并且映射任务将被标记为 SKIPPED
。