airflow.models.dag
¶
模块内容¶
类¶
DAG(有向无环图)是具有方向依赖性的任务的集合。 |
|
每个 DAG 的标签名称,允许在 DAG 视图中快速筛选。 |
|
定义不同所有者属性的表。 |
|
包含 DAG 属性的表。 |
|
当 DAG 用作 ContextManager 时,DAG 上下文用于保持当前的 DAG。 |
函数¶
|
从 |
|
返回 DAG 的最后一个 DAG 运行,如果没有则返回 None。 |
|
获取 dag_ids 列表的下一个运行信息。 |
|
Python DAG 装饰器,将函数包装到 Airflow DAG 中。 |
属性¶
- airflow.models.dag.DEFAULT_VIEW_PRESETS = ['grid', 'graph', 'duration', 'gantt', 'landing_times'][source]¶
- exception airflow.models.dag.InconsistentDataInterval(instance, start_field_name, end_field_name)[source]¶
基类:
airflow.exceptions.AirflowException
当模型错误地填充数据间隔字段时引发的异常。
数据间隔字段应都为 None (对于在 AIP-39 之前计划的运行) 或都为 datetime (对于在 AIP-39 实现后计划的运行)。如果只有一个字段为 None,则会引发此异常。
- airflow.models.dag.create_timetable(interval, timezone)[source]¶
从
schedule_interval
参数创建 Timetable 实例。
- airflow.models.dag.get_last_dagrun(dag_id, session, include_externally_triggered=False)[source]¶
返回 DAG 的最后一个 DAG 运行,如果没有则返回 None。
最后的 DAG 运行可以是任何类型的运行,例如计划运行或回填运行。被覆盖的 DagRuns 将被忽略。
- airflow.models.dag.get_dataset_triggered_next_run_info(dag_ids, *, session)[source]¶
获取 dag_ids 列表的下一个运行信息。
给定 dag_ids 列表,获取一个字符串,表示任何数据集触发的 DAG 距离下次运行有多近,例如 “1 of 2 datasets updated”。
- class airflow.models.dag.DAG(dag_id, description=None, schedule=NOTSET, schedule_interval=NOTSET, timetable=None, start_date=None, end_date=None, full_filepath=None, template_searchpath=None, template_undefined=jinja2.StrictUndefined, user_defined_macros=None, user_defined_filters=None, default_args=None, concurrency=None, max_active_tasks=airflow_conf.getint('core', 'max_active_tasks_per_dag'), max_active_runs=airflow_conf.getint('core', 'max_active_runs_per_dag'), max_consecutive_failed_dag_runs=airflow_conf.getint('core', 'max_consecutive_failed_dag_runs_per_dag'), dagrun_timeout=None, sla_miss_callback=None, default_view=airflow_conf.get_mandatory_value('webserver', 'dag_default_view').lower(), orientation=airflow_conf.get_mandatory_value('webserver', 'dag_orientation'), catchup=airflow_conf.getboolean('scheduler', 'catchup_by_default'), on_success_callback=None, on_failure_callback=None, doc_md=None, params=None, access_control=None, is_paused_upon_creation=None, jinja_environment_kwargs=None, render_template_as_native_obj=False, tags=None, owner_links=None, auto_register=True, fail_stop=False, dag_display_name=None)[source]¶
基类:
airflow.utils.log.logging_mixin.LoggingMixin
DAG(有向无环图)是具有方向依赖性的任务的集合。
DAG 也有一个计划、一个开始日期和一个结束日期(可选)。对于每个计划(例如每天或每小时),DAG 需要在满足其依赖关系时运行每个单独的任务。某些任务具有依赖于其自身过去执行的属性,这意味着它们必须等到之前的计划(和上游任务)完成后才能运行。
DAG 本质上充当任务的命名空间。一个 task_id 只能添加到 DAG 一次。
请注意,如果您计划使用时区,则所有提供的日期都应为 pendulum 日期。请参阅时区感知 DAG。
2.4 版本新增: schedule 参数用于指定基于时间的调度逻辑 (timetable),或数据集驱动的触发器。
自 2.4 版本起已弃用: schedule_interval 和 timetable 参数。它们的功能已合并到新的 schedule 参数中。
- 参数:
dag_id (str) – DAG 的 ID;必须仅包含字母数字字符、破折号、点号和下划线(所有 ASCII 字符)
description (str | None) – DAG 的描述,例如在 Web 服务器上显示
schedule (ScheduleArg) – 定义 DAG 运行的调度规则。可以接受 cron 字符串、timedelta 对象、Timetable 或 Dataset 对象列表。如果未提供此参数,DAG 将设置为默认调度
timedelta(days=1)
。另请参阅 使用时间表自定义 DAG 调度。start_date (datetime.datetime | None) – 调度器将尝试回填的时间戳
end_date (datetime.datetime | None) – 超过此日期,您的 DAG 将不会运行,将其保留为 None 以进行开放式调度
template_searchpath (str | Iterable[str] | None) – 此文件夹列表(非相对路径)定义 Jinja 将在哪里查找您的模板。顺序很重要。请注意,Jinja/Airflow 默认包含您的 DAG 文件的路径
template_undefined (type[jinja2.StrictUndefined]) – 模板未定义类型。
user_defined_macros (dict | None) – 一个宏字典,将在您的 Jinja 模板中公开。例如,将
dict(foo='bar')
传递给此参数允许您在与此 DAG 相关的所有 Jinja 模板中使用{{ foo }}
。请注意,您可以在此处传递任何类型的对象。user_defined_filters (dict | None) – 一个过滤器字典,将在您的 Jinja 模板中公开。例如,将
dict(hello=lambda name: 'Hello %s' % name)
传递给此参数允许您在与此 DAG 相关的所有 Jinja 模板中使用{{ 'world' | hello }}
。default_args (dict | None) – 一个默认参数字典,在初始化运算符时用作构造函数关键字参数。请注意,运算符具有相同的钩子,并且优先于此处定义的钩子,这意味着如果您的字典包含 ‘depends_on_past’: True,而运算符的 default_args 调用中包含 ‘depends_on_past’: False,则实际值将为 False。
params (collections.abc.MutableMapping | None) – 一个 DAG 级别参数字典,可以在模板中访问,命名空间在 params 下。这些参数可以在任务级别被覆盖。
max_active_tasks (int) – 允许并发运行的任务实例数量
max_active_runs (int) – 最大活动 DAG 运行次数,超出此数量的正在运行状态的 DAG 运行,调度程序将不会创建新的活动 DAG 运行
max_consecutive_failed_dag_runs (int) – (实验性)最大连续失败的 DAG 运行次数,超过此次数,调度程序将禁用 DAG
dagrun_timeout (datetime.timedelta | None) – 指定 DagRun 在超时/失败之前应该运行多长时间,以便可以创建新的 DagRun。
sla_miss_callback (None | SLAMissCallback | list[SLAMissCallback]) – 指定在报告 SLA 超时时调用的函数或函数列表。有关传递给回调的函数签名和参数的更多信息,请参阅 sla_miss_callback。
default_view (str) – 指定 DAG 默认视图(网格、图形、持续时间、甘特图、着陆时间),默认为网格
orientation (str) – 指定图形视图中的 DAG 方向(LR、TB、RL、BT),默认为 LR
catchup (bool) – 执行调度程序追赶(还是只运行最新的)?默认为 True
on_failure_callback (None | DagStateChangeCallback | list[DagStateChangeCallback]) – 当此 dag 的 DagRun 失败时要调用的函数或函数列表。上下文字典作为单个参数传递给此函数。
on_success_callback (None | DagStateChangeCallback | list[DagStateChangeCallback]) – 与
on_failure_callback
非常相似,不同之处在于它在 dag 成功时执行。access_control (dict[str, dict[str, Collection[str]]] | dict[str, Collection[str]] | None) – 指定可选的 DAG 级别操作,例如,“{‘role1’: {‘can_read’}, ‘role2’: {‘can_read’, ‘can_edit’, ‘can_delete’}}”;或者如果存在 DAGs Run 资源,则可以指定资源名称,例如,“{‘role1’: {‘DAG Runs’: {‘can_create’}}, ‘role2’: {‘DAGs’: {‘can_read’, ‘can_edit’, ‘can_delete’}}}”
is_paused_upon_creation (bool | None) – 指定首次创建 DAG 时是否暂停。如果 DAG 已存在,则此标志将被忽略。如果未指定此可选参数,则将使用全局配置设置。
jinja_environment_kwargs (dict | None) –
要传递给 Jinja
Environment
以进行模板渲染的其他配置选项示例:为了避免 Jinja 从模板字符串中删除尾随换行符
DAG( dag_id="my-dag", jinja_environment_kwargs={ "keep_trailing_newline": True, # some other jinja2 Environment options here }, )
请参阅:Jinja 环境文档
render_template_as_native_obj (bool) – 如果为 True,则使用 Jinja
NativeEnvironment
将模板渲染为本机 Python 类型。如果为 False,则使用 JinjaEnvironment
将模板渲染为字符串值。tags (list[str] | None) – 用于在 UI 中帮助过滤 DAG 的标签列表。
owner_links (dict[str, str] | None) – 所有者及其链接的字典,这些链接将在 DAG 视图 UI 上可点击。可以用作 HTTP 链接(例如指向您的 Slack 通道的链接)或 mailto 链接。例如:{“dag_owner”: “https://airflow.apache.org/”}
auto_register (bool) – 当此 DAG 在
with
代码块中使用时自动注册fail_stop (bool) – 当 DAG 中的任务失败时,使当前正在运行的任务失败。警告:失败停止 DAG 只能具有默认触发规则(“all_success”)的任务。如果失败停止 DAG 中的任何任务具有非默认触发规则,则会抛出异常。
dag_display_name (str | None) – DAG 的显示名称,它出现在 UI 上。
- 属性 relative_fileloc: pathlib.Path[源代码]¶
可导入的 dag“文件”相对于配置的 DAG 文件夹的文件位置。
- is_fixed_time_schedule()[source]¶
判断计划是否具有固定时间(例如,每天凌晨 3 点)。
检测是通过“窥视”接下来的两个 cron 触发时间来完成的;如果这两个时间具有相同的分钟和小时值,则该计划是固定的,我们*不需要*执行 DST 修复。
这假设 DST 发生在整分钟更改时(例如 12:59 -> 12:00)。
不要试图理解这实际上意味着什么。这是一个不应在任何地方使用的旧逻辑。
- next_dagrun_info(last_automated_dagrun, *, restricted=True)[source]¶
获取此 DAG 在
date_last_automated_dagrun
之后的下一个 DagRun 的相关信息。这会根据 DAG 的时间表、start_date、end_date 等计算下一个 DagRun 应运行的时间间隔(其执行日期)以及何时可以安排它。这不会检查最大活动运行次数或任何其他“max_active_tasks”类型的限制,而仅基于此 DAG 及其任务的各种日期和间隔字段执行计算。
- 参数:
last_automated_dagrun (None | datetime.datetime | airflow.timetables.base.DataInterval) – 此 DAG 的现有“自动”DagRun(已安排或回填,但不是手动)的
max(execution_date)
。restricted (bool) – 如果设置为 False (默认值为 True),则忽略 DAG 或任务上指定的
start_date
、end_date
和catchup
。
- 返回
下一个 dagrun 的 DagRunInfo,如果不会安排 dagrun 则为 None。
- 返回类型
- iter_dagrun_infos_between(earliest, latest, *, align=True)[source]¶
在给定间隔之间使用此 DAG 的时间表生成 DagRunInfo。
如果 DagRunInfo 实例的
logical_date
不早于earliest
,也不晚于latest
,则生成这些实例。实例按其logical_date
从最早到最晚排序。如果
align
为False
,则第一次运行将立即在earliest
上发生,即使它没有落在逻辑时间表上。默认值为True
,但子 DAG 将忽略此值,并且为了向后兼容,始终表现得好像已将其设置为False
。示例:一个 DAG 计划在每天午夜运行 (
0 0 * * *
)。如果earliest
为2021-06-03 23:00:00
,则如果align=False
,则第一个 DagRunInfo 将为2021-06-03 23:00:00
;如果align=True
,则为2021-06-04 00:00:00
。
- get_run_dates(start_date, end_date=None)[source]¶
使用此 DAG 的计划间隔返回接收为参数的间隔之间的日期列表。
返回的日期可用于执行日期。
- 参数:
start_date – 间隔的开始日期。
end_date – 间隔的结束日期。默认为
timezone.utcnow()
。
- 返回
间隔内遵循 DAG 计划的日期列表。
- 返回类型
- param(name, default=NOTSET)[source]¶
为当前 DAG 返回 DagParam 对象。
- 参数:
name (str) – DAG 参数名称。
default (Any) – DAG 参数的后备值。
- 返回
指定名称和当前 DAG 的 DagParam 实例。
- 返回类型
- static fetch_callback(dag, dag_run_id, success=True, reason=None, *, session=NEW_SESSION)[source]¶
根据成功的值获取适当的回调。
此方法获取此 DagRun 中单个 TaskInstance 的上下文,并将其与回调列表一起返回。
- 参数:
dag (DAG) – DAG 对象
dag_run_id (str) – DAG 运行 ID
success (bool) – 标志,用于指定是应调用失败回调还是成功回调
reason (str | None) – 完成原因
session (sqlalchemy.orm.session.Session) – 数据库会话
- handle_callback(dagrun, success=True, reason=None, session=NEW_SESSION)[source]¶
根据需要触发 on_failure_callback 或 on_success_callback。
此方法获取此 DagRun 中单个 TaskInstance 的上下文,并将其与 “reason” 一起传递给可调用对象,主要用于区分 DagRun 失败。
- 参数:
dagrun (airflow.models.dagrun.DagRun) – DagRun 对象
success – 标志,用于指定是应调用失败回调还是成功回调
reason – 完成原因
session – 数据库会话
- classmethod execute_callback(callbacks, context, dag_id)[source]¶
使用给定的上下文触发回调。
- 参数:
callbacks (list[Callable] | None) – 要调用的回调列表
context (airflow.models.taskinstance.Context | None) – 要传递给所有回调的上下文
dag_id (str) – 要查找的 DAG 的 dag_id。
- get_num_active_runs(external_trigger=None, only_running=True, session=NEW_SESSION)[source]¶
返回活动“正在运行”的 DAG 运行数。
- 参数:
external_trigger – 对于外部触发的活动 DAG 运行,为 True
session –
- 返回
活动 DAG 运行的数字大于 0
- static fetch_dagrun(dag_id, execution_date=None, run_id=None, session=NEW_SESSION)[source]¶
如果存在,则返回给定执行日期或 run_id 的 DAG 运行,否则返回 None。
- 参数:
dag_id (str) – 要查找的 DAG 的 dag_id。
execution_date (datetime.datetime | None) – 要查找的 DagRun 的执行日期。
run_id (str | None) – 要查找的 DagRun 的 run_id。
session (sqlalchemy.orm.session.Session) –
- 返回
如果找到 DagRun,则返回 DagRun,否则返回 None。
- 返回类型
airflow.models.dagrun.DagRun | airflow.serialization.pydantic.dag_run.DagRunPydantic
- get_dagruns_between(start_date, end_date, session=NEW_SESSION)[source]¶
返回 start_date(包含)和 end_date(包含)之间的 dag 运行列表。
- 参数:
start_date – 要查找的 DagRun 的起始执行日期。
end_date – 要查找的 DagRun 的结束执行日期。
session –
- 返回
找到的 DagRun 列表。
- set_dependency(upstream_task_id, downstream_task_id)[source]¶
设置已使用 add_task() 添加到 DAG 的两个任务之间的依赖关系。
- get_task_instances_before(base_date, num, *, session=NEW_SESSION)[source]¶
获取
base_date
之前(包括)的num
个任务实例。返回的列表可能包含与任何 DagRunType 对应的正好
num
个任务实例。如果base_date
之前调度的 DAG 运行次数少于num
,则它可能更少。
- set_task_instance_state(*, task_id, map_indexes=None, execution_date=None, run_id=None, state, upstream=False, downstream=False, future=False, past=False, commit=True, session=NEW_SESSION)[source]¶
设置 TaskInstance 的状态,并清除处于 failed 或 upstream_failed 状态的下游任务。
- 参数:
task_id (str) – TaskInstance 的任务 ID
map_indexes (Collection[int] | None) – 仅当其 map_index 匹配时才设置 TaskInstance。如果为 None(默认),则设置该任务的所有映射 TaskInstance。
execution_date (datetime.datetime | None) – TaskInstance 的执行日期
run_id (str | None) – TaskInstance 的 run_id
state (airflow.utils.state.TaskInstanceState) – 要将 TaskInstance 设置为的状态
upstream (bool) – 包括给定 task_id 的所有上游任务
downstream (bool) – 包括给定 task_id 的所有下游任务
future (bool) – 包括给定 task_id 的所有未来 TaskInstance
commit (bool) – 提交更改
past (bool) – 包括给定 task_id 的所有过去 TaskInstance
- set_task_group_state(*, group_id, execution_date=None, run_id=None, state, upstream=False, downstream=False, future=False, past=False, commit=True, session=NEW_SESSION)[source]¶
将 TaskGroup 设置为给定状态,并清除处于 failed 或 upstream_failed 状态的下游任务。
- 参数:
group_id (str) – TaskGroup 的 group_id
execution_date (datetime.datetime | None) – TaskInstance 的执行日期
run_id (str | None) – TaskInstance 的 run_id
state (airflow.utils.state.TaskInstanceState) – 要将 TaskInstance 设置为的状态
upstream (bool) – 包括给定 task_id 的所有上游任务
downstream (bool) – 包括给定 task_id 的所有下游任务
future (bool) – 包括给定 task_id 的所有未来 TaskInstance
commit (bool) – 提交更改
past (bool) – 包括给定 task_id 的所有过去 TaskInstance
session (sqlalchemy.orm.session.Session) – 新会话
- topological_sort(include_subdag_tasks=False)[source]¶
按拓扑顺序对任务进行排序,使任务位于其任何上游依赖项之后。
已弃用,请使用
task_group.topological_sort
代替
- set_dag_runs_state(state=DagRunState.RUNNING, session=NEW_SESSION, start_date=None, end_date=None, dag_ids=None)[source]¶
- clear(task_ids=None, start_date=None, end_date=None, only_failed=False, only_running=False, confirm_prompt=False, include_subdags=True, include_parentdag=True, dag_run_state=DagRunState.QUEUED, dry_run=False, session=NEW_SESSION, get_tis=False, recursion_depth=0, max_recursion_depth=None, dag_bag=None, exclude_task_ids=frozenset())[source]¶
清除与当前 DAG 关联的、指定日期范围内的任务实例集合。
- 参数:
task_ids (Collection[str | tuple[str, int]] | None) – 要清除的任务 ID 列表或 (
task_id
,map_index
) 元组。start_date (datetime.datetime | None) – 要清除的最小 execution_date。
end_date (datetime.datetime | None) – 要清除的最大 execution_date。
only_failed (bool) – 仅清除失败的任务。
only_running (bool) – 仅清除正在运行的任务。
confirm_prompt (bool) – 请求确认。
include_subdags (bool) – 清除子 DAG 中的任务,并清除由 ExternalTaskMarker 指示的外部任务。
include_parentdag (bool) – 清除子 DAG 父 DAG 中的任务。
dag_run_state (airflow.utils.state.DagRunState) – 设置 DagRun 的状态。如果设置为 False,则不会更改 dagrun 状态。
dry_run (bool) – 查找要清除的任务,但不实际清除它们。
session (sqlalchemy.orm.session.Session) – 要使用的 sqlalchemy 会话。
dag_bag (airflow.models.dagbag.DagBag | None) – 用于查找 DAG 子 DAG 的 DagBag (可选)。
exclude_task_ids (frozenset[str] | frozenset[tuple[str, int]] | None) – 不应清除的
task_id
或 (task_id
,map_index
) 元组的集合。
- classmethod clear_dags(dags, start_date=None, end_date=None, only_failed=False, only_running=False, confirm_prompt=False, include_subdags=True, include_parentdag=False, dag_run_state=DagRunState.QUEUED, dry_run=False)[source]¶
- partial_subset(task_ids_or_regex, include_downstream=False, include_upstream=True, include_direct_upstream=False)[source]¶
根据与一个或多个任务匹配的正则表达式,返回当前 DAG 的子集。
根据应该匹配一个或多个任务的正则表达式,返回当前 DAG 的子集,作为当前 DAG 的深层副本,并根据传递的标志包含上游和下游的相邻任务。
- add_tasks(tasks)[源代码]¶
向 DAG 添加一个任务列表。
- 参数:
tasks (Iterable[airflow.models.operator.Operator]) – 你想要添加的任务列表
- run(start_date=None, end_date=None, mark_success=False, local=False, donot_pickle=airflow_conf.getboolean('core', 'donot_pickle'), ignore_task_deps=False, ignore_first_depends_on_past=True, pool=None, delay_on_limit_secs=1.0, verbose=False, conf=None, rerun_failed_tasks=False, run_backwards=False, run_at_least_once=False, continue_on_failures=False, disable_retry=False)[源代码]¶
运行 DAG。
- 参数:
start_date – 要运行的范围的开始日期
end_date – 要运行的范围的结束日期
mark_success – 设置为 True 时,将作业标记为成功而不运行它们
local – 设置为 True 时,使用 LocalExecutor 运行任务
executor – 用于运行任务的执行器实例
donot_pickle – 设置为 True 时,避免序列化 DAG 对象并将其发送给工作节点
ignore_task_deps – 设置为 True 时,跳过上游任务
ignore_first_depends_on_past – 设置为 True 时,仅对第一组任务忽略 depends_on_past 依赖
pool – 要使用的资源池
delay_on_limit_secs – 当达到 max_active_runs 限制时,在下次尝试运行 DAG 运行之前等待的时间(秒)
verbose – 使日志输出更详细
conf – 从 CLI 传递的用户定义的字典
rerun_failed_tasks –
run_backwards –
run_at_least_once – 如果为 true,即使在时间范围内不存在逻辑运行,也始终至少运行一次 DAG。
- test(execution_date=None, run_conf=None, conn_file_path=None, variable_file_path=None, use_executor=False, mark_success_pattern=None, session=NEW_SESSION)[源代码]¶
为给定的 DAG 和执行日期执行一次 DagRun。
- 参数:
execution_date (datetime.datetime | None) – DAG 运行的执行日期
run_conf (dict[str, Any] | None) – 传递给新创建的 dagrun 的配置
conn_file_path (str | None) – 连接文件的文件路径,格式为 yaml 或 json
variable_file_path (str | None) – 变量文件的文件路径,格式为 yaml 或 json
use_executor (bool) – 如果设置,则使用执行器来测试 DAG
mark_success_pattern (Pattern | str | None) – 要标记为成功而不是运行的 task_ids 的正则表达式
session (sqlalchemy.orm.session.Session) – 数据库连接(可选)
- create_dagrun(state, execution_date=None, run_id=None, start_date=None, external_trigger=False, conf=None, run_type=None, session=NEW_SESSION, dag_hash=None, creating_job_id=None, data_interval=None)[源代码]¶
从此 DAG 创建一个 dag 运行,包括与此 DAG 关联的任务。
返回 dag 运行。
- 参数:
run_id (str | None) – 定义此 dag 运行的运行 id
run_type (airflow.utils.types.DagRunType | None) – DagRun 的类型
execution_date (datetime.datetime | None) – 此 dag 运行的执行日期
state (airflow.utils.state.DagRunState) – dag 运行的状态
start_date (datetime.datetime | None) – 应该评估此 dag 运行的日期
external_trigger (bool | None) – 此 dag 运行是否由外部触发
conf (dict | None) – 包含要传递给 DAG 的配置/参数的字典
creating_job_id (int | None) – 创建此 DagRun 的作业的 id
session (sqlalchemy.orm.session.Session) – 数据库会话
dag_hash (str | None) – 序列化 DAG 的哈希值
data_interval (tuple[datetime.datetime, datetime.datetime] | None) – DagRun 的数据间隔
- classmethod bulk_sync_to_db(dags, session=NEW_SESSION)[source]¶
使用 airflow.models.DAG.bulk_write_to_db,此方法已弃用。
- classmethod bulk_write_to_db(dags, processor_subdir=None, session=NEW_SESSION)[source]¶
确保数据库 dag 表中给定 DAG 的 DagModel 行是最新的。
请注意,此方法可以为 DAG 和 SubDAG 调用。SubDag 实际上是一个 SubDagOperator。
- 参数:
dags (Collection[DAG]) – 要保存到数据库的 DAG 对象
- 返回
无
- sync_to_db(processor_subdir=None, session=NEW_SESSION)[source]¶
将此 DAG 的属性保存到数据库。
请注意,此方法可以为 DAG 和 SubDAG 调用。SubDag 实际上是一个 SubDagOperator。
- 返回
无
- static deactivate_unknown_dags(active_dag_ids, session=NEW_SESSION)[source]¶
给定已知 DAG 的列表,停用在 ORM 中标记为活动的任何其他 DAG。
- 参数:
active_dag_ids – 活动 DAG ID 的列表
- 返回
无
- static deactivate_stale_dags(expiration_date, session=NEW_SESSION)[source]¶
停用上次被调度程序触及的时间早于过期日期的任何 DAG。
这些 DAG 很可能已被删除。
- 参数:
expiration_date – 设置在此时间之前被触及的非活动 DAG
- 返回
无
- static get_num_task_instances(dag_id, run_id=None, task_ids=None, states=None, session=NEW_SESSION)[source]¶
返回给定 DAG 中任务实例的数量。
- 参数:
session – ORM 会话
dag_id – 要获取任务并发的 DAG 的 ID
run_id – 要获取任务并发的 DAG 运行的 ID
task_ids – 给定 DAG 的有效任务 ID 列表
states – 如果提供,则用于筛选的状态列表
- 返回
正在运行的任务数
- 返回类型
- class airflow.models.dag.DagTag(name, doc)[source]¶
基类:
airflow.models.base.Base
每个 DAG 的标签名称,允许在 DAG 视图中快速筛选。
- 类 airflow.models.dag.DagOwnerAttributes(name, doc)[源代码]¶
基类:
airflow.models.base.Base
定义不同所有者属性的表。
例如,所有者的链接将作为超链接传递到“DAGs”视图。
- 类 airflow.models.dag.DagModel(concurrency=None, **kwargs)[源代码]¶
基类:
airflow.models.base.Base
包含 DAG 属性的表。
- property next_dagrun_data_interval: airflow.timetables.base.DataInterval | None[源代码]¶
- property relative_fileloc: pathlib.Path | None[源代码]¶
可导入的 dag“文件”相对于配置的 DAG 文件夹的文件位置。
- set_is_paused(is_paused, including_subdags=True, session=NEW_SESSION)[源代码]¶
暂停/取消暂停 DAG。
- 参数:
is_paused (bool) – DAG 是否已暂停
including_subdags (bool) – 是否包含 DAG 的子 DAG
session – 会话
- classmethod deactivate_deleted_dags(alive_dag_filelocs, processor_subdir, session=NEW_SESSION)[源代码]¶
将 DAG 的
is_active
设置为False
,对于那些 DAG 文件已被删除的 DAG。- 参数:
alive_dag_filelocs (Container[str]) – 活跃 DAG 的文件路径
processor_subdir (str) – DAG 处理器子目录
session (sqlalchemy.orm.session.Session) – ORM 会话
- classmethod dags_needing_dagruns(session)[源代码]¶
返回(并锁定)一个 DAG 对象列表,这些对象由于要创建一个新的 DagRun。
这将返回一个行级锁定的行结果集,使用“SELECT ... FOR UPDATE”查询,您应确保在单个事务中做出任何调度决策 - 一旦事务提交,它将被解锁。
- calculate_dagrun_date_fields(dag, last_automated_dag_run)[源代码]¶
计算
next_dagrun
和 next_dagrun_create_after`。- 参数:
dag (DAG) – DAG 对象
last_automated_dag_run (None | datetime.datetime | airflow.timetables.base.DataInterval) – 此 DAG 最近一次运行的数据区间(或日期时间),如果尚未计划,则为 none。
- airflow.models.dag.dag(dag_id='', description=None, schedule=NOTSET, schedule_interval=NOTSET, timetable=None, start_date=None, end_date=None, full_filepath=None, template_searchpath=None, template_undefined=jinja2.StrictUndefined, user_defined_macros=None, user_defined_filters=None, default_args=None, concurrency=None, max_active_tasks=airflow_conf.getint('core', 'max_active_tasks_per_dag'), max_active_runs=airflow_conf.getint('core', 'max_active_runs_per_dag'), max_consecutive_failed_dag_runs=airflow_conf.getint('core', 'max_consecutive_failed_dag_runs_per_dag'), dagrun_timeout=None, sla_miss_callback=None, default_view=airflow_conf.get_mandatory_value('webserver', 'dag_default_view').lower(), orientation=airflow_conf.get_mandatory_value('webserver', 'dag_orientation'), catchup=airflow_conf.getboolean('scheduler', 'catchup_by_default'), on_success_callback=None, on_failure_callback=None, doc_md=None, params=None, access_control=None, is_paused_upon_creation=None, jinja_environment_kwargs=None, render_template_as_native_obj=False, tags=None, owner_links=None, auto_register=True, fail_stop=False, dag_display_name=None)[source]¶
Python DAG 装饰器,将函数包装到 Airflow DAG 中。
接受操作符 kwarg 的 kwargs。 可以用来参数化 DAG。
- 参数:
dag_args – DAG 对象的参数
dag_kwargs – DAG 对象的 Kwargs。