Airflow Summit 2025 即将于 10 月 07-09 日举行。立即注册,获取早鸟票!

airflow.models.dagrun

属性

CreatedTasks

RUN_ID_REGEX

TISchedulingDecision

DagRun.task_instance_scheduling_decisions 的返回类型。

DagRun

DAG 的调用实例。

DagRunNote

用于存储关于 DagRun 实例的任意备注。

模块内容

airflow.models.dagrun.CreatedTasks[source]
airflow.models.dagrun.RUN_ID_REGEX = '^(?:manual|scheduled|asset_triggered)__(?:\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\+00:00)$'[source]
class airflow.models.dagrun.TISchedulingDecision[source]

基类: NamedTuple

DagRun.task_instance_scheduling_decisions 的返回类型。

tis: list[airflow.models.taskinstance.TaskInstance][source]
schedulable_tis: list[airflow.models.taskinstance.TaskInstance][source]
changed_tis: bool[source]
unfinished_tis: list[airflow.models.taskinstance.TaskInstance][source]
finished_tis: list[airflow.models.taskinstance.TaskInstance][source]
class airflow.models.dagrun.DagRun(dag_id=None, run_id=None, *, queued_at=NOTSET, logical_date=None, run_after=None, start_date=None, conf=None, state=None, run_type=None, creating_job_id=None, data_interval=None, triggered_by=None, backfill_id=None, bundle_version=None)[source]

基类: airflow.models.base.Base, airflow.utils.log.logging_mixin.LoggingMixin

DAG 的调用实例。

DagRun 可以由调度器创建(即调度运行),也可以由外部触发器创建(即手动运行)。

active_spans[source]
__tablename__ = 'dag_run'[source]
id[source]
dag_id[source]
queued_at[source]
logical_date[source]
start_date[source]
end_date[source]
run_id[source]
creating_job_id[source]
run_type[source]
triggered_by[source]
conf[source]
data_interval_start[source]
data_interval_end[source]
run_after[source]
last_scheduling_decision[source]
log_template_id[source]
updated_at[source]
clear_number[source]
backfill_id[source]

此 DagRun 当前关联的 backfill。

如果例如 DagRun 被清除以便重新运行,或者可能重新 backfill,则此关联可能会改变。

bundle_version[source]
scheduled_by_job_id[source]
context_carrier[source]
span_status[source]
dag: airflow.models.dag.DAG | None[source]
__table_args__[source]
task_instances[source]
task_instances_histories[source]
dag_model[source]
dag_run_note[source]
backfill[source]
backfill_max_active_runs[source]
max_active_runs[source]
note[source]
DEFAULT_DAGRUNS_TO_EXAMINE[source]
__repr__()[source]
validate_run_id(key, run_id)[source]
属性 dag_versions: list[airflow.models.dag_version.DagVersion][source]

返回与此 DagRun 的任务实例 (TI) 关联的 DAG 版本。

属性 version_number: int | None[source]

返回与此 DagRun 最新任务实例 (TI) 关联的 DAG 版本号。

check_version_id_exists_in_dr(dag_version_id, session=NEW_SESSION)[source]
属性 stats_tags: dict[str, str][source]
类方法 set_active_spans(active_spans)[source]
get_state()[source]
set_state(state)[source]

改变 DagRun 的状态。

根据下表实现属性更改(行代表旧状态,列代表新状态)

状态转换矩阵

QUEUED

RUNNING

SUCCESS

FAILED

None

queued_at = timezone.utcnow()

if empty: start_date = timezone.utcnow() end_date = None

end_date = timezone.utcnow()

end_date = timezone.utcnow()

QUEUED

queued_at = timezone.utcnow()

if empty: start_date = timezone.utcnow() end_date = None

end_date = timezone.utcnow()

end_date = timezone.utcnow()

RUNNING

queued_at = timezone.utcnow() start_date = None end_date = None

end_date = timezone.utcnow()

end_date = timezone.utcnow()

SUCCESS

queued_at = timezone.utcnow() start_date = None end_date = None

start_date = timezone.utcnow() end_date = None

FAILED

queued_at = timezone.utcnow() start_date = None end_date = None

start_date = timezone.utcnow() end_date = None

属性 state[source]
refresh_from_db(session=NEW_SESSION)[source]

从数据库重新加载当前的 dagrun。

参数:

session (sqlalchemy.orm.Session) – 数据库会话

类方法 find(dag_id=None, run_id=None, logical_date=None, state=None, no_backfills=False, run_type=None, session=NEW_SESSION, logical_start_date=None, logical_end_date=None)[source]

返回符合给定搜索条件的 DagRun 集合。

参数:
classmethod find_duplicate(dag_id, run_id, *, session=NEW_SESSION)[source]

返回指定 run_id 的 DAG 现有运行。

如果未找到此类 DAG 运行,则返回 None

参数:
  • dag_id (str) – 用于查找重复项的 dag_id

  • run_id (str) – 定义此 DAG 运行的运行 ID

  • session (sqlalchemy.orm.Session) – 数据库会话

static generate_run_id(*, run_type, logical_date=None, run_after)[source]

基于运行类型、run_after 和逻辑日期生成运行 ID。

参数:
  • run_type (airflow.utils.types.DagRunType) – DAG 运行类型

  • logical_date (datetime.datetime | None) – 逻辑日期

  • run_after (datetime.datetime) – DAG 运行不会在此日期之前启动的日期。

static fetch_task_instances(dag_id=None, run_id=None, task_ids=None, state=None, session=NEW_SESSION)[source]

返回此 DAG 运行的任务实例。

get_task_instances(state=None, session=NEW_SESSION)[source]

返回此 DAG 运行的任务实例。

重定向到 DagRun.fetch_task_instances 方法。保留此方法是因为它在代码中广泛使用。

get_task_instance(task_id, session=NEW_SESSION, *, map_index=-1)[source]

返回此 DAG 运行中由 task_id 指定的任务实例。

参数:
static fetch_task_instance(dag_id, dag_run_id, task_id, session=NEW_SESSION, map_index=-1)[source]

返回此 DAG 运行中由 task_id 指定的任务实例。

参数:
get_dag()[source]

返回与此 DagRun 相关联的 Dag。

返回:

DAG

返回类型:

airflow.models.dag.DAG

static get_previous_dagrun(dag_run, state=None, session=NEW_SESSION)[source]

返回上一个 DagRun,如果存在。

参数:
static get_previous_scheduled_dagrun(dag_run_id, session=NEW_SESSION)[source]

返回上一个 SCHEDULED DagRun,如果存在。

参数:
set_dagrun_span_attrs(span)[source]
start_dr_spans_if_needed(tis)[source]
end_dr_span_if_needed()[source]
update_state(session=NEW_SESSION, execute_callbacks=True)[source]

根据其任务实例的状态确定 DagRun 的总体状态。

参数:
  • session (sqlalchemy.orm.Session) – Sqlalchemy ORM Session

  • execute_callbacks (bool) – 是否应直接调用 DAG 回调(成功/失败、SLA 等)(默认: true),或将其记录为 returned_callback 属性中的待处理请求

返回:

包含可在当前循环中调度的 tis 以及需要执行的 returned_callback 的元组

返回类型:

tuple[list[airflow.models.taskinstance.TaskInstance], airflow.callbacks.callback_requests.DagCallbackRequest | None]

task_instance_scheduling_decisions(session=NEW_SESSION)[source]
notify_dagrun_state_changed(msg='')[source]
verify_integrity(*, session=NEW_SESSION, dag_version_id=None)[source]

通过检查已移除的任务或尚未在数据库中的任务来验证 DagRun。

它将把状态设置为 removed 或在需要时添加任务。

参数:
  • dag_version_id (sqlalchemy_utils.UUIDType | None) – DAG 版本 ID

  • session (sqlalchemy.orm.Session) – Sqlalchemy ORM Session

classmethod get_latest_runs(session=NEW_SESSION)[source]

返回每个 DAG 的最新 DagRun。

schedule_tis(schedulable_tis, session=NEW_SESSION, max_tis_per_query=None)[source]

将给定的任务实例设置为 scheduled 状态。

schedulable_tis 的每个元素都应该已经设置了其 task 属性。

任何没有回调或输出端口的 EmptyOperator 将直接设置为 success 状态。

所有 TIs 都应属于此 DagRun,但这段代码位于热路径中,不会对此进行检查——调用者有责任只使用来自单个 DAG 运行的 TIs 调用此函数。

get_log_template(*, session=NEW_SESSION)[source]
class airflow.models.dagrun.DagRunNote(content, user_id=None)[source]

基类: airflow.models.base.Base

用于存储关于 DagRun 实例的任意备注。

__tablename__ = 'dag_run_note'[source]
user_id[source]
dag_run_id[source]
content[source]
created_at[source]
updated_at[source]
dag_run[source]
__table_args__[source]
__repr__()[source]

此条目有帮助吗?