airflow.models.dagbag

模块内容

FileLoadStat

有关单个文件的信息。

DagBag

dagbag 是从文件夹树中解析出来的 DAG 集合,并具有高级配置设置。

DagPriorityParsingRequest

模型用于存储在解析文件时将优先处理的 dag 解析请求。

函数

generate_md5_hash(context)

class airflow.models.dagbag.FileLoadStat[源代码]

基类: NamedTuple

有关单个文件的信息。

参数
  • file – 已加载的文件。

  • duration – 处理文件所花费的时间。

  • dag_num – 此文件中加载的 DAG 总数。

  • task_num – 此文件中加载的任务总数。

  • dags – 此文件中加载的 DAG 名称。

  • warning_num – 从处理此文件捕获的警告总数。

file: str[源代码]
duration: datetime.timedelta[源代码]
dag_num: int[源代码]
task_num: int[源代码]
dags: str[源代码]
warning_num: int[源代码]
class airflow.models.dagbag.DagBag(dag_folder=None, include_examples=NOTSET, safe_mode=NOTSET, read_dags_from_db=False, store_serialized_dags=None, load_op_links=True, collect_dags=True)[源代码]

基类: airflow.utils.log.logging_mixin.LoggingMixin

dagbag 是从文件夹树中解析出来的 DAG 集合,并具有高级配置设置。

一些可能的设置是作为后端的数据库以及用于触发任务的执行器。这使得为生产和开发、测试或不同的团队或安全配置文件运行不同的环境变得更容易。原本是系统级的设置现在是 dagbag 级别,以便一个系统可以运行多个独立的设置集。

参数
  • dag_folder (str | pathlib.Path | None) – 要扫描以查找 DAG 的文件夹

  • include_examples (bool | airflow.utils.types.ArgNotSet) – 是否包含 Airflow 附带的示例

  • safe_mode (bool | airflow.utils.types.ArgNotSet) – 当 False 时,扫描所有 Python 模块以查找 DAG。当 True 时,使用启发式方法(包含 DAGairflow 字符串的文件)来过滤要扫描 DAG 的 Python 模块。

  • read_dags_from_db (bool) – 如果传递 True,则从数据库读取 DAG。如果 False,则从 Python 文件读取 DAG。

  • store_serialized_dags (bool | None) – 已弃用的参数,与 read_dags_from_db 效果相同

  • load_op_links (bool) – 在反序列化 DAG 时是否应通过插件加载额外的操作符链接?此标志在 Scheduler 中设置为 False,以便不加载额外的操作符链接,从而不在 Scheduler 中运行用户代码。

  • collect_dags (bool) – 当为 True 时,在类初始化期间收集 DAG。

property store_serialized_dags: bool[源代码]

是否从数据库读取 DAG。

property dag_ids: list[str][源代码]

获取 DAG ID。

返回

此包中 DAG ID 的列表

返回类型

list[str]

size()[源代码]
返回

此 dagbag 中包含的 dag 数量

返回类型

int

get_dag(dag_id, session=None)[源代码]

从字典中获取 DAG,并在过期时刷新它。

参数

dag_id – DAG ID

process_file(filepath, only_if_updated=True, safe_mode=True)[source]

给定 Python 模块或 zip 文件的路径,导入模块并查找其中的 dag 对象。

bag_dag(dag, root_dag)[source]

将 DAG 添加到包中,递归处理子 DAG。

引发异常

如果在此 dag 或其子 dag 中检测到循环,则抛出 AirflowDagCycleException。

引发异常

如果此 dag 或其子 dag 已存在于包中,则抛出 AirflowDagDuplicatedIdException。

collect_dags(dag_folder=None, only_if_updated=True, include_examples=conf.getboolean('core', 'LOAD_EXAMPLES'), safe_mode=conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE'))[source]

在给定路径中查找 Python 模块,导入它们,并将它们添加到 dagbag 集合中。

请注意,如果在处理目录时找到 .airflowignore 文件,它的行为将非常类似于 .gitignore,忽略与文件中指定的任何模式匹配的文件。

注意.airflowignore 中的模式被解释为非锚定正则表达式或类似 gitignore 的 glob 表达式,具体取决于 DAG_IGNORE_FILE_SYNTAX 配置参数。

collect_dags_from_db()[source]

从数据库收集 DAG。

dagbag_report()[source]

打印有关 DagBag 加载统计信息的报告。

sync_to_db(processor_subdir=None, session=NEW_SESSION)[source]
airflow.models.dagbag.generate_md5_hash(context)[source]
class airflow.models.dagbag.DagPriorityParsingRequest(fileloc)[source]

基类:airflow.models.base.Base

模型用于存储在解析文件时将优先处理的 dag 解析请求。

__tablename__ = 'dag_priority_parsing_request'[source]
id[source]
fileloc[source]
__repr__()[source]

返回 repr(self)。

此条目是否有帮助?