airflow.models.dagrun¶
属性¶
类¶
DagRun.task_instance_scheduling_decisions 的返回类型。 |
|
DAG 的调用实例。 |
|
用于存储关于 DagRun 实例的任意备注。 |
模块内容¶
- airflow.models.dagrun.RUN_ID_REGEX = '^(?:manual|scheduled|asset_triggered)__(?:\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\+00:00)$'[source]¶
- class airflow.models.dagrun.TISchedulingDecision[source]¶
基类:
NamedTuple
DagRun.task_instance_scheduling_decisions 的返回类型。
- schedulable_tis: list[airflow.models.taskinstance.TaskInstance][source]¶
- unfinished_tis: list[airflow.models.taskinstance.TaskInstance][source]¶
- finished_tis: list[airflow.models.taskinstance.TaskInstance][source]¶
- class airflow.models.dagrun.DagRun(dag_id=None, run_id=None, *, queued_at=NOTSET, logical_date=None, run_after=None, start_date=None, conf=None, state=None, run_type=None, creating_job_id=None, data_interval=None, triggered_by=None, backfill_id=None, bundle_version=None)[source]¶
基类:
airflow.models.base.Base
,airflow.utils.log.logging_mixin.LoggingMixin
DAG 的调用实例。
DagRun 可以由调度器创建(即调度运行),也可以由外部触发器创建(即手动运行)。
- 属性 dag_versions: list[airflow.models.dag_version.DagVersion][source]¶
返回与此 DagRun 的任务实例 (TI) 关联的 DAG 版本。
- set_state(state)[source]¶
改变 DagRun 的状态。
根据下表实现属性更改(行代表旧状态,列代表新状态)
状态转换矩阵¶ QUEUED
RUNNING
SUCCESS
FAILED
None
queued_at = timezone.utcnow()
if empty: start_date = timezone.utcnow() end_date = None
end_date = timezone.utcnow()
end_date = timezone.utcnow()
QUEUED
queued_at = timezone.utcnow()
if empty: start_date = timezone.utcnow() end_date = None
end_date = timezone.utcnow()
end_date = timezone.utcnow()
RUNNING
queued_at = timezone.utcnow() start_date = None end_date = None
end_date = timezone.utcnow()
end_date = timezone.utcnow()
SUCCESS
queued_at = timezone.utcnow() start_date = None end_date = None
start_date = timezone.utcnow() end_date = None
FAILED
queued_at = timezone.utcnow() start_date = None end_date = None
start_date = timezone.utcnow() end_date = None
- refresh_from_db(session=NEW_SESSION)[source]¶
从数据库重新加载当前的 dagrun。
- 参数:
session (sqlalchemy.orm.Session) – 数据库会话
- 类方法 find(dag_id=None, run_id=None, logical_date=None, state=None, no_backfills=False, run_type=None, session=NEW_SESSION, logical_start_date=None, logical_end_date=None)[source]¶
返回符合给定搜索条件的 DagRun 集合。
- 参数:
dag_id (str | list[str] | None) – 用于查找 DAG 运行的 dag_id 或 dag_id 列表
run_id (collections.abc.Iterable[str] | None) – 定义此 DAG 运行的运行 ID
run_type (airflow.utils.types.DagRunType | None) – DAG 运行的类型
logical_date (datetime.datetime | collections.abc.Iterable[datetime.datetime] | None) – 逻辑日期
state (airflow.utils.state.DagRunState | None) – DAG 运行的状态
no_backfills (bool) – 不返回回填(True),返回所有(False)。默认为 False
session (sqlalchemy.orm.Session) – 数据库会话
logical_start_date (datetime.datetime | None) – 从此日期开始执行的 DAG 运行
logical_end_date (datetime.datetime | None) – 在此日期之前执行的 DAG 运行
- classmethod find_duplicate(dag_id, run_id, *, session=NEW_SESSION)[source]¶
返回指定 run_id 的 DAG 现有运行。
如果未找到此类 DAG 运行,则返回 None。
- 参数:
dag_id (str) – 用于查找重复项的 dag_id
run_id (str) – 定义此 DAG 运行的运行 ID
session (sqlalchemy.orm.Session) – 数据库会话
- static generate_run_id(*, run_type, logical_date=None, run_after)[source]¶
基于运行类型、run_after 和逻辑日期生成运行 ID。
- 参数:
run_type (airflow.utils.types.DagRunType) – DAG 运行类型
logical_date (datetime.datetime | None) – 逻辑日期
run_after (datetime.datetime) – DAG 运行不会在此日期之前启动的日期。
- static fetch_task_instances(dag_id=None, run_id=None, task_ids=None, state=None, session=NEW_SESSION)[source]¶
返回此 DAG 运行的任务实例。
- get_task_instances(state=None, session=NEW_SESSION)[source]¶
返回此 DAG 运行的任务实例。
重定向到 DagRun.fetch_task_instances 方法。保留此方法是因为它在代码中广泛使用。
- get_task_instance(task_id, session=NEW_SESSION, *, map_index=-1)[source]¶
返回此 DAG 运行中由 task_id 指定的任务实例。
- 参数:
task_id (str) – 任务 ID
session (sqlalchemy.orm.Session) – Sqlalchemy ORM Session
- static fetch_task_instance(dag_id, dag_run_id, task_id, session=NEW_SESSION, map_index=-1)[source]¶
返回此 DAG 运行中由 task_id 指定的任务实例。
- 参数:
dag_id (str) – DAG ID
dag_run_id (str) – DAG 运行 ID
task_id (str) – 任务 ID
session (sqlalchemy.orm.Session) – Sqlalchemy ORM Session
- static get_previous_dagrun(dag_run, state=None, session=NEW_SESSION)[source]¶
返回上一个 DagRun,如果存在。
- 参数:
dag_run (DagRun) – Dag 运行
session (sqlalchemy.orm.Session) – SQLAlchemy ORM Session
state (airflow.utils.state.DagRunState | None) – Dag 运行状态
- static get_previous_scheduled_dagrun(dag_run_id, session=NEW_SESSION)[source]¶
返回上一个 SCHEDULED DagRun,如果存在。
- 参数:
dag_run_id (int) – DAG 运行 ID
session (sqlalchemy.orm.Session) – SQLAlchemy ORM Session
- update_state(session=NEW_SESSION, execute_callbacks=True)[source]¶
根据其任务实例的状态确定 DagRun 的总体状态。
- 参数:
session (sqlalchemy.orm.Session) – Sqlalchemy ORM Session
execute_callbacks (bool) – 是否应直接调用 DAG 回调(成功/失败、SLA 等)(默认: true),或将其记录为
returned_callback
属性中的待处理请求
- 返回:
包含可在当前循环中调度的 tis 以及需要执行的 returned_callback 的元组
- 返回类型:
tuple[list[airflow.models.taskinstance.TaskInstance], airflow.callbacks.callback_requests.DagCallbackRequest | None]
- verify_integrity(*, session=NEW_SESSION, dag_version_id=None)[source]¶
通过检查已移除的任务或尚未在数据库中的任务来验证 DagRun。
它将把状态设置为 removed 或在需要时添加任务。
- 参数:
dag_version_id (sqlalchemy_utils.UUIDType | None) – DAG 版本 ID
session (sqlalchemy.orm.Session) – Sqlalchemy ORM Session