参数

参数使您能够为任务提供运行时配置。您可以在 DAG 代码中配置默认参数,并在触发 DAG 时在运行时提供其他参数或覆盖参数值。Param 值使用 JSON Schema 进行验证。对于计划的 DAG 运行,将使用默认的 Param 值。

定义的参数也用于在手动触发时呈现友好的 UI。当您手动触发 DAG 时,您可以在 dagrun 开始之前修改其参数。如果用户提供的值未通过验证,Airflow 将显示警告而不是创建 dagrun。

DAG 级别的参数

要将参数添加到 DAG,请使用 params kwarg 初始化它。使用一个字典,将参数名称映射到 Param 或指示参数默认值的对象。

 from airflow import DAG
 from airflow.decorators import task
 from airflow.models.param import Param

 with DAG(
     "the_dag",
     params={
         "x": Param(5, type="integer", minimum=3),
         "my_int_param": 6
     },
 ) as dag:

     @task.python
     def example_task(params: dict):
         # This will print the default value, 6:
         dag.log.info(dag.params['my_int_param'])

         # This will print the manually-provided value, 42:
         dag.log.info(params['my_int_param'])

         # This will print the default value, 5, since it wasn't provided manually:
         dag.log.info(params['x'])

     example_task()

 if __name__ == "__main__":
     dag.test(
         run_conf={"my_int_param": 42}
     )

注意

DAG 级别参数是传递给任务的默认值。不应将这些与通过 UI 表单或 CLI 手动提供的值混淆,这些值仅存在于 DagRunTaskInstance 的上下文中。这种区别对于 TaskFlow DAG 至关重要,TaskFlow DAG 可能在 with DAG(...) as dag: 代码块中包含逻辑。在这种情况下,用户可能会尝试使用 dag 对象访问手动提供的参数值,但这只会包含默认值。要确保访问手动提供的值,请在您的任务中使用模板变量,例如 paramsti

任务级别的参数

您还可以将参数添加到单个任务。

def print_my_int_param(params):
  print(params.my_int_param)

PythonOperator(
    task_id="print_my_int_param",
    params={"my_int_param": 10},
    python_callable=print_my_int_param,
)

任务级别参数优先于 DAG 级别参数,并且用户提供的参数(在触发 DAG 时)优先于任务级别参数。

在任务中引用参数

可以在 模板字符串params 下引用参数。例如

 PythonOperator(
     task_id="from_template",
     op_args=[
         "{{ params.my_int_param + 10 }}",
     ],
     python_callable=(
         lambda my_int_param: print(my_int_param)
     ),
 )

即使参数可以使用各种类型,模板的默认行为是为您的任务提供一个字符串。您可以通过在初始化 DAG 时设置 render_template_as_native_obj=True 来更改此行为。

 with DAG(
     "the_dag",
     params={"my_int_param": Param(5, type="integer", minimum=3)},
     render_template_as_native_obj=True
 ):

这样,当 Param 的类型提供给您的任务时,它会受到尊重。

# prints <class 'str'> by default
# prints <class 'int'> if render_template_as_native_obj=True
PythonOperator(
    task_id="template_type",
    op_args=[
        "{{ params.my_int_param }}",
    ],
    python_callable=(
        lambda my_int_param: print(type(my_int_param))
    ),
)

访问参数的另一种方法是通过任务的 context kwarg。

 def print_my_int_param(**context):
     print(context["params"]["my_int_param"])

 PythonOperator(
     task_id="print_my_int_param",
     python_callable=print_my_int_param,
     params={"my_int_param": 12345},
 )

JSON Schema 验证

Param 使用 JSON Schema,因此您可以使用 https://json-schema.fullstack.org.cn/draft/2020-12/json-schema-validation.html 中提到的完整 JSON Schema 规范来定义 Param 对象。

with DAG(
    "my_dag",
    params={
        # an int with a default value
        "my_int_param": Param(10, type="integer", minimum=0, maximum=20),

        # a required param which can be of multiple types
        # a param must have a default value
        "multi_type_param": Param(5, type=["null", "number", "string"]),

        # an enum param, must be one of three values
        "enum_param": Param("foo", enum=["foo", "bar", 42]),

        # a param which uses json-schema formatting
        "email": Param(
            default="[email protected]",
            type="string",
            format="idn-email",
            minLength=5,
            maxLength=255,
        ),
    },
):

注意

如果为 DAG 定义了 schedule,则带有默认值的参数必须有效。这会在 DAG 解析期间进行验证。如果 schedule=None,则在 DAG 解析期间不会验证参数,而是在触发 DAG 之前验证参数。这在 DAG 作者不想提供默认值但希望强制用户在触发时提供有效参数的情况下很有用。

注意

目前,出于安全原因,不能使用从自定义类派生的 Param 对象。我们计划为自定义 Param 类建立一个注册系统,就像我们为操作符 ExtraLinks 所做的那样。

使用参数提供触发 UI 表单

2.6.0 版本中的新功能。

DAG 级别参数用于呈现用户友好的触发表单。当用户单击“触发 DAG”按钮时,将提供此表单。

触发 UI 表单是基于预定义的 DAG 参数呈现的。如果 DAG 没有定义任何参数,则会跳过触发表单。可以使用 Param 类定义表单元素,并且属性定义表单字段的显示方式。

触发 UI 表单支持以下功能

  • 来自顶级 DAG 参数的直接标量值(布尔值、整数、字符串、列表、字典)会自动装箱到 Param 对象中。从本机 Python 数据类型自动检测 type 属性。因此,这些简单类型会呈现为相应的字段类型。参数名称用作标签,并且不进行进一步验证,所有值都被视为可选。

  • 如果您使用 Param 类作为参数值的定义,则可以添加以下属性

    • Param 属性 title 用于呈现输入框的表单字段标签。如果未定义 title,则改为使用参数名称/键。

    • Param 属性 description 在输入字段下方呈现为灰色帮助文本。如果要提供特殊的格式或链接,则需要使用 Param 属性 description_md。有关示例,请参阅教程 DAG 参数 UI 示例 DAG

    • Param 属性 type 会影响字段的呈现方式。支持以下类型

      参数类型

      表单元素类型

      其他支持的属性

      示例

      string

      生成单行文本框或文本区域以编辑文本。

      • minLength:最小文本长度

      • maxLength:最大文本长度

      • format="date":生成日期选择器
        带有日历弹出窗口
      • format="date-time":生成日期和
        带有日历弹出窗口的时间选择器
      • format="time":生成时间选择器

      • format="multiline":生成多行文本区域

      • enum=["a", "b", "c"]:生成一个
        用于标量值的下拉选择列表。
        按照 JSON 验证,必须
        选择一个值,或者必须将该字段标记为
        显式可选。另请参阅内部的详细信息
      • values_display={"a": "Alpha", "b": "Beta"}:
        对于通过以下方式生成的选择下拉列表
        enum,您可以添加属性
        values_display,其中包含 dict 和映射数据
        值以显示标签。
      • examples=["One", "Two", "Three"]:如果您
        想要为值提供建议
        (不将用户限制为固定的 enum
        如上所述),您可以使用 examples
        ,它是一个项目列表。
      另请参阅
      在后端触发 DAG 之前会进行检查。

      Param("default", type="string", maxLength=10)

      Param(f"{datetime.date.today()}", type="string", format="date")

      number

      整数

      生成一个字段,限制只能添加
      数值。HTML 浏览器
      通常也会在
      右侧添加一个微调器来增加或减少
      值。integer 只允许整数,
      number 也允许
      小数值。
      • minimum:最小值

      • maximum:最大值

      另请参阅
      在后端触发 DAG 之前会进行检查。

      Param(42, type="integer", minimum=14, multipleOf=7)

      布尔值

      生成一个切换按钮,用作
      TrueFalse

      无。

      Param(True, type="boolean")

      数组

      生成一个 HTML 多行文本字段,
      编辑的每一行都会被转换为一个
      字符串数组作为值。
      • 如果添加了 examples 属性
        和一个列表,则会生成一个多值选择选项
        而不是自由文本字段。
      • values_display={"a": "Alpha", "b": "Beta"}:
        对于多值选择,examples 可以添加
        values_display 属性和一个字典,
        将数据值映射到显示标签。
      • 如果添加了 items 属性和一个
        包含字段 type 的字典,
        其值不是 “string”,则会为更多数组类型生成一个 JSON 条目字段,
        并且会进行额外的类型验证,如
        JSON Schema 数组项 中所述。

      Param(["a", "b", "c"], type="array")

      Param(["two", "three"], type="array", examples=["one", "two", "three", "four", "five"])

      Param(["[email protected]", "[email protected]"], type="array", items={"type": "string", "format": "idn-email"})

      对象

      生成一个具有
      文本验证的 JSON 条目字段。
      HTML 表单仅验证 JSON 输入的语法。为了验证
      特定结构的内容,请查看
      JSON Schema 对象详细信息

      Param({"key": "value"}, type=["object", "null"])

      空值

      指定不期望任何内容。
      单独使用没有太大意义,
      但对于类型组合很有用,
      例如 type=["null", "string"],因为
      type 属性也接受一个
      类型列表。
      默认情况下,如果指定类型,则会
      使该字段成为必填字段,并进行
      输入 - 这是因为 JSON 验证。
      如果希望字段值只是
      可选添加,则必须允许
      JSON schema 验证允许空值。
      JSON schema validation allowing null

      Param(None, type=["null", "string"])

  • 如果表单字段留空,则会将其作为 None 值传递到 params 字典中。

  • 表单字段按照 DAG 中 params 的定义顺序呈现。

  • 如果想向表单添加节,请将 section 属性添加到每个字段。文本将用作节标签。没有 section 的字段将在默认区域中呈现。其他节默认情况下会折叠。

  • 如果要使参数不显示,请使用 const 属性。这些参数将被提交,但在表单中隐藏。const 值必须与默认值匹配才能通过 JSON Schema 验证

  • 在表单底部,可以展开生成的 JSON 配置。如果要手动更改值,可以调整 JSON 配置。当表单字段更改时,更改会被覆盖。

  • 要在发布到触发表单的链接时预先填充表单中的值,您可以调用触发 URL /dags/<dag_name>/trigger 并在 URL 中添加 name=value 形式的查询参数,例如 /dags/example_params_ui_tutorial/trigger?required_field=some%20text。要预定义 DAG 运行的运行 ID,请使用 URL 参数 run_id

  • 字段可以是必填或可选的。为了确保通过 JSON schema 验证,默认情况下类型字段是必填的。要使类型字段可选,必须允许 “null” 类型。

  • 没有 “section” 的字段将在默认区域中呈现。其他节默认情况下会折叠。

注意

如果字段是必填的,则默认值也必须根据 schema 有效。如果 DAG 定义为 schedule=None,则参数值验证将在触发时进行。

有关示例,请查看提供的两个示例 DAG:参数触发示例 DAG参数 UI 示例 DAG

airflow/example_dags/example_params_trigger_ui.py[源]

with DAG(
    dag_id=Path(__file__).stem,
    dag_display_name="Params Trigger UI",
    description=__doc__.partition(".")[0],
    doc_md=__doc__,
    schedule=None,
    start_date=datetime.datetime(2022, 3, 4),
    catchup=False,
    tags=["example", "params"],
    params={
        "names": Param(
            ["Linda", "Martha", "Thomas"],
            type="array",
            description="Define the list of names for which greetings should be generated in the logs."
            " Please have one name per line.",
            title="Names to greet",
        ),
        "english": Param(True, type="boolean", title="English"),
        "german": Param(True, type="boolean", title="German (Formal)"),
        "french": Param(True, type="boolean", title="French"),
    },
) as dag:

    @task(task_id="get_names", task_display_name="Get names")
    def get_names(**kwargs) -> list[str]:
        params: ParamsDict = kwargs["params"]
        if "names" not in params:
            print("Uuups, no names given, was no UI used to trigger?")
            return []
        return params["names"]

    @task.branch(task_id="select_languages", task_display_name="Select languages")
    def select_languages(**kwargs) -> list[str]:
        params: ParamsDict = kwargs["params"]
        selected_languages = []
        for lang in ["english", "german", "french"]:
            if params[lang]:
                selected_languages.append(f"generate_{lang}_greeting")
        return selected_languages

    @task(task_id="generate_english_greeting", task_display_name="Generate English greeting")
    def generate_english_greeting(name: str) -> str:
        return f"Hello {name}!"

    @task(task_id="generate_german_greeting", task_display_name="Erzeuge Deutsche Begrüßung")
    def generate_german_greeting(name: str) -> str:
        return f"Sehr geehrter Herr/Frau {name}."

    @task(task_id="generate_french_greeting", task_display_name="Produire un message d'accueil en français")
    def generate_french_greeting(name: str) -> str:
        return f"Bonjour {name}!"

    @task(task_id="print_greetings", task_display_name="Print greetings", trigger_rule=TriggerRule.ALL_DONE)
    def print_greetings(greetings1, greetings2, greetings3) -> None:
        for g in greetings1 or []:
            print(g)
        for g in greetings2 or []:
            print(g)
        for g in greetings3 or []:
            print(g)
        if not (greetings1 or greetings2 or greetings3):
            print("sad, nobody to greet :-(")

    lang_select = select_languages()
    names = get_names()
    english_greetings = generate_english_greeting.expand(name=names)
    german_greetings = generate_german_greeting.expand(name=names)
    french_greetings = generate_french_greeting.expand(name=names)
    lang_select >> [english_greetings, german_greetings, french_greetings]
    results_print = print_greetings(english_greetings, german_greetings, french_greetings)

airflow/example_dags/example_params_ui_tutorial.py[源]

    params={
        # Let's start simple: Standard dict values are detected from type and offered as entry form fields.
        # Detected types are numbers, text, boolean, lists and dicts.
        # Note that such auto-detected parameters are treated as optional (not required to contain a value)
        "x": 3,
        "text": "Hello World!",
        "flag": False,
        "a_simple_list": ["one", "two", "three", "actually one value is made per line"],
        # But of course you might want to have it nicer! Let's add some description to parameters.
        # Note if you can add any Markdown formatting to the description, you need to use the description_md
        # attribute.
        "most_loved_number": Param(
            42,
            type="integer",
            title="Your favorite number",
            description_md="Everybody should have a **favorite** number. Not only _math teachers_. "
            "If you can not think of any at the moment please think of the 42 which is very famous because"
            "of the book [The Hitchhiker's Guide to the Galaxy]"
            "(https://en.wikipedia.org/wiki/Phrases_from_The_Hitchhiker%27s_Guide_to_the_Galaxy#"
            "The_Answer_to_the_Ultimate_Question_of_Life,_the_Universe,_and_Everything_is_42).",
        ),
        # If you want to have a selection list box then you can use the enum feature of JSON schema
        "pick_one": Param(
            "value 42",
            type="string",
            title="Select one Value",
            description="You can use JSON schema enum's to generate drop down selection boxes.",
            enum=[f"value {i}" for i in range(16, 64)],
        ),

airflow/example_dags/example_params_ui_tutorial.py[源]

        "required_field": Param(
            # In this example we have no default value
            # Form will enforce a value supplied by users to be able to trigger
            type="string",
            title="Required text field",
            description="This field is required. You can not submit without having text in here.",
        ),
        "optional_field": Param(
            "optional text, you can trigger also w/o text",
            type=["null", "string"],
            title="Optional text field",
            description_md="This field is optional. As field content is JSON schema validated you must "
            "allow the `null` type.",
        ),

airflow/example_dags/example_params_ui_tutorial.py[源]

    @task(task_display_name="Show used parameters")
    def show_params(**kwargs) -> None:
        params: ParamsDict = kwargs["params"]
        print(f"This DAG was triggered with the following parameters:\n\n{json.dumps(params, indent=4)}\n")

    show_params()
../_images/trigger-dag-tutorial-form.png

2.7.0 版本新增: 即使没有定义参数,也可以使用配置开关 webserver.show_trigger_form_if_no_params 强制显示触发表单。

2.8.0 版本变更: 默认情况下不允许使用自定义 HTML,以防止注入脚本或其他恶意 HTML 代码。如果您信任 DAG 作者,可以通过将配置条目 webserver.allow_raw_html_descriptions 设置为 True 来更改参数描述的信任级别,从而允许原始 HTML。使用默认设置时,所有 HTML 都将显示为纯文本。这与之前使用 description_html 属性启用富格式化的功能有关,该属性现在被 description_md 属性取代。使用 custom_html_form 属性的自定义表单元素允许 DAG 作者指定原始 HTML 表单模板。这些自定义 HTML 表单元素自 2.8.0 版本起已弃用。

禁用运行时参数修改

在触发 DAG 时更新参数的能力取决于标志 core.dag_run_conf_overrides_params。将此配置设置为 False 将有效地将默认参数变为常量。

此条目是否有帮助?