Airflow Summit 2025 将于 10 月 07 日至 09 日举行。立即注册获取早鸟票!

airflow.models.taskinstance

属性

TR

log

PAST_DEPENDS_MET

TaskInstanceStateType

TaskInstance

任务实例存储任务实例的状态。

SimpleTaskInstance

简化任务实例。

TaskInstanceNote

用于存储与任务实例相关的任意备注。

函数

set_current_context(context)

将当前执行上下文设置为提供的上下文对象。

clear_task_instances(tis, session[, dag, dag_run_state])

清除一组任务实例,但确保正在运行的任务被杀死。

uuid7()

生成新的 UUID7 字符串。

模块内容

airflow.models.taskinstance.TR[source]
airflow.models.taskinstance.log[source]
airflow.models.taskinstance.PAST_DEPENDS_MET = 'past_depends_met'[source]
airflow.models.taskinstance.set_current_context(context)[source]

将当前执行上下文设置为提供的上下文对象。

此方法应在每次任务执行之前调用一次 operator.execute。

airflow.models.taskinstance.clear_task_instances(tis, session, dag=None, dag_run_state=DagRunState.QUEUED)[source]

清除一组任务实例,但确保正在运行的任务被杀死。

同时将 Dagrun 的 state 设置为 QUEUED,start_date 设置为执行时间。但仅针对已完成的 DR(SUCCESS 和 FAILED)。对于正在运行的 DR(QUEUED 和 RUNNING),不会清除 DR 的 state 和 start_date`,因为清除已在运行的 DR 的状态是多余的,并且清除 `start_date` 会影响 DR 的持续时间。

参数::
airflow.models.taskinstance.uuid7()[source]

生成新的 UUID7 字符串。

class airflow.models.taskinstance.TaskInstance(task, run_id=None, state=None, map_index=-1, dag_version_id=None)[source]

基类:airflow.models.base.Base, airflow.utils.log.logging_mixin.LoggingMixin

任务实例存储任务实例的状态。

此表是关于哪些任务已运行及其所处状态的权威和单一事实来源。

SqlAlchemy 模型故意没有指向 task 或 dag 模型的 SqlAlchemy 外键,以便更好地控制事务。

对此表的数据库事务应确保不会发生重复触发,并避免在多个调度程序可能正在触发任务实例时对哪些任务实例已准备好运行产生混淆。

map_index 中的值 -1 表示以下任何一种情况:没有映射任务的任务实例;带有映射任务但尚未展开(state=pending)的任务实例;带有映射任务但展开为空列表(state=skipped)的任务实例。

__tablename__ = 'task_instance'[source]
id[source]
task_id[source]
dag_id[source]
run_id[source]
map_index[source]
start_date[source]
end_date[source]
duration[source]
state[source]
try_number[source]
max_tries[source]
hostname[source]
unixname[source]
pool[source]
pool_slots[source]
queue[source]
priority_weight[source]
operator[source]
custom_operator_name[source]
queued_dttm[source]
scheduled_dttm[source]
queued_by_job_id[source]
last_heartbeat_at[source]
pid[source]
executor[source]
executor_config[source]
updated_at[source]
context_carrier[source]
span_status[source]
external_executor_id[source]
trigger_id[source]
trigger_timeout[source]
next_method[source]
next_kwargs[source]
dag_version_id[source]
dag_version[source]
__table_args__[source]
dag_model: airflow.models.dag.DagModel[source]
trigger[source]
triggerer_job[source]
dag_run[source]
rendered_task_instance_fields[source]
run_after[source]
logical_date[source]
task_instance_note[source]
note[source]
task: airflow.sdk.definitions._internal.abstractoperator.Operator | None = None[source]
test_mode: bool = False[source]
is_trigger_log_context: bool = False[source]
run_as_user: str | None = None[source]
__hash__()[source]
property stats_tags: dict[str, str][source]

返回任务实例标签。

init_on_load()[source]

初始化未存储在数据库中的属性。

property operator_name: str | None[source]

@property: 如果设置,为 operator 使用一个更友好的显示名称。

task_display_name()[source]
rendered_map_index()[source]
classmethod from_runtime_ti(runtime_ti)[source]
to_runtime_ti(context_from_server)[source]
command_as_list(mark_success=False, ignore_all_deps=False, ignore_task_deps=False, ignore_depends_on_past=False, wait_for_past_depends_before_skipping=False, ignore_ti_state=False, local=False, raw=False, pool=None, cfg_path=None)[source]

返回一个可以在任何安装了 airflow 的地方执行的命令。

此命令是调度器发送给执行器的消息的一部分。

static generate_command(dag_id, task_id, run_id, mark_success=False, ignore_all_deps=False, ignore_depends_on_past=False, wait_for_past_depends_before_skipping=False, ignore_task_deps=False, ignore_ti_state=False, local=False, file_path=None, raw=False, pool=None, cfg_path=None, map_index=-1)[source]

生成执行此任务实例所需的 shell 命令。

参数::
  • dag_id (str) – DAG ID

  • task_id (str) – Task ID

  • run_id (str) – 此任务的 DagRun 的 run_id

  • mark_success (bool) – 是否将任务标记为成功

  • ignore_all_deps (bool) – 忽略所有可忽略的依赖项。覆盖其他 ignore_* 参数。

  • ignore_depends_on_past (bool) – 忽略 DAG 的 depends_on_past 参数(例如,用于 Backfills)

  • wait_for_past_depends_before_skipping (bool) – 在将任务实例标记为跳过之前等待过去的依赖项

  • ignore_task_deps (bool) – 忽略任务特定的依赖项,例如 depends_on_past 和 trigger rule

  • ignore_ti_state (bool) – 忽略任务实例之前的失败/成功状态

  • local (bool) – 是否在本地运行任务

  • file_path (pathlib.PurePath | str | None) – 包含 DAG 定义的文件的路径

  • raw (bool) – raw 模式(需要更多详细信息)

  • pool (str | None) – 任务应该运行所在的 Airflow pool

  • cfg_path (str | None) – 配置文件的路径

返回:

可用于运行任务实例的 shell 命令

返回类型:

list[str]

property log_url: str[source]

任务实例的日志 URL。

property mark_success_url: str[source]

用于将任务实例标记为成功的 URL。

error(session=NEW_SESSION)[source]

在数据库中将任务实例的状态强制设置为 FAILED。

参数::

session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session

classmethod get_task_instance(dag_id, run_id, task_id, map_index, lock_for_update=False, session=NEW_SESSION)[source]
refresh_from_db(session=NEW_SESSION, lock_for_update=False, keep_local_changes=False)[source]

根据主键从数据库刷新任务实例。

参数::
  • session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session

  • lock_for_update (bool) – 如果为 True,表示数据库应该锁定任务实例(发出 FOR UPDATE 子句),直到 session 提交。

  • keep_local_changes (bool) – 如果为 False(默认值),则强制所有属性使用数据库中的值,如果为 True,则不覆盖本地设置的属性

refresh_from_task(task, pool_override=None)[source]

从给定的任务中复制通用属性。

参数::
  • task (airflow.sdk.definitions._internal.abstractoperator.Operator) – 要从中复制的任务对象

  • pool_override (str | None) – 使用 pool_override 而不是任务的 pool

clear_xcom_data(session=NEW_SESSION)[source]
property key: airflow.models.taskinstancekey.TaskInstanceKey[source]

返回唯一标识任务实例的 tuple。

set_state(state, session=NEW_SESSION)[source]

设置任务实例状态。

参数::
返回:

状态是否已更改

返回类型:

bool

property is_premature: bool[source]

返回任务是否处于 UP_FOR_RETRY 状态且其重试间隔已过去。

prepare_db_for_next_try(session)[source]

使用将此任务实例排队等待下一次重试所需的所有记录更新元数据。

are_dependents_done(session=NEW_SESSION)[source]

检查此任务实例的直接下游任务是否已成功或已被跳过。

这旨在供 wait_for_downstream 使用。

当您不想在下游任务完成之前开始处理任务的下一个调度时,这很有用。例如,如果任务 DROPs 并重新创建表。

参数::

session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session

get_previous_dagrun(state=None, session=None)[source]

返回在此任务实例的 DagRun 之前运行的 DagRun。

参数::
get_previous_ti(state=None, session=NEW_SESSION)[source]

返回在此任务实例之前运行的任务的任务实例。

参数::
are_dependencies_met(dep_context=None, session=NEW_SESSION, verbose=False)[source]

给定依赖关系的上下文,是否满足此任务实例运行的所有条件。

(例如,从 UI 强制运行的任务实例将忽略某些依赖项)。

参数::
  • dep_context (airflow.ti_deps.dep_context.DepContext | None) – 确定应评估的依赖项的执行上下文。

  • session (sqlalchemy.orm.session.Session) – 数据库 session

  • verbose (bool) – 是否在 info 或 debug 日志级别记录失败依赖项的详细信息

get_failed_dep_statuses(dep_context=None, session=NEW_SESSION)[source]

获取失败的依赖项。

__repr__()[source]
next_retry_datetime()[source]

如果任务实例失败,获取下次重试的日期时间。

对于指数退避,retry_delay 用作基数,并将转换为秒。

ready_for_retry()[source]

检查任务实例是否处于正确的状态和时间范围以进行重试。

get_dagrun(session=NEW_SESSION)[source]

返回此任务实例的 DagRun。

参数::

session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session

返回:

DagRun

返回类型:

airflow.models.dagrun.DagRun

check_and_change_state_before_execution(verbose=True, ignore_all_deps=False, ignore_depends_on_past=False, wait_for_past_depends_before_skipping=False, ignore_task_deps=False, ignore_ti_state=False, mark_success=False, test_mode=False, pool=None, external_executor_id=None, session=NEW_SESSION)[source]
emit_state_change_metric(new_state)[source]

发送一个时间指标,表示给定状态转换花费了多少时间。

根据任务所处的状态推断出前一个状态和指标名称。

参数::

new_state (airflow.utils.state.TaskInstanceState) – 刚为此任务设置的状态。我们不使用 self.state,因为有时状态会直接在数据库中更新,而不是在本地任务实例对象中更新。支持的状态:QUEUED 和 RUNNING

clear_next_method_args()[source]

确保我们取消设置 next_method 和 next_kwargs,以确保任何重试都不会重复使用它们。

static register_asset_changes_in_db(ti, task_outlets, outlet_events, session=NEW_SESSION)[source]
update_rtif(rendered_fields, session=NEW_SESSION)[source]
update_heartbeat()[source]
defer_task(exception, session=NEW_SESSION)[source]

将任务标记为延迟状态,并设置在 TaskDeferred 被引发时恢复它所需的 trigger。

run(verbose=True, ignore_all_deps=False, ignore_depends_on_past=False, wait_for_past_depends_before_skipping=False, ignore_task_deps=False, ignore_ti_state=False, mark_success=False, test_mode=False, pool=None, session=NEW_SESSION, raise_on_defer=False)[source]

运行 TaskInstance。

dry_run()[source]

仅为 TaskInstance 渲染模板。

classmethod fetch_handle_failure_context(ti, error, test_mode=None, context=None, force_fail=False, *, session, fail_fast=False)[source]

获取处理失败所需的上下文。

参数::
  • ti (TaskInstance) – TaskInstance

  • error (None | str | BaseException) – 如果指定,记录抛出的特定异常

  • test_mode (bool | None) – 如果为 True,则不记录数据库中的成功或失败

  • context (airflow.utils.context.Context | None) – Jinja2 上下文

  • force_fail (bool) – 如果为 True,任务不会重试

  • session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session

  • fail_fast (bool) – 如果为 True,所有下游任务都将失败

static save_to_db(ti, session=NEW_SESSION)[source]
handle_failure(error, test_mode=None, context=None, force_fail=False, session=NEW_SESSION)[source]

处理任务实例的失败。

参数::
  • error (None | str | BaseException) – 如果指定,记录抛出的特定异常

  • session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session

  • test_mode (bool | None) – 如果为 True,则不记录数据库中的成功或失败

  • context (airflow.utils.context.Context | None) – Jinja2 上下文

  • force_fail (bool) – 如果为 True,任务不会重试

is_eligible_to_retry()[source]

任务实例是否符合重试条件。

get_template_context(session=None, ignore_param_exceptions=True)[source]

返回 Task Instance 上下文。

参数::
get_rendered_template_fields(session=NEW_SESSION)[source]

使用渲染后的模板字段更新任务,以便在 UI 中显示。

如果任务已运行,将从数据库中获取;否则将进行渲染。

overwrite_params_with_dag_run_conf(params, dag_run)[source]

使用 DagRun.conf 覆盖任务参数 (Task Params)。

render_templates(context=None, jinja_env=None)[source]

渲染 Operator 字段中的模板。

如果任务最初是映射的,这可能会将 self.task 替换为未映射的、完全渲染的 BaseOperator。返回替换前的原始 self.task

get_email_subject_content(exception, task=None)[source]

获取异常电子邮件的主题内容。

参数::
  • exception (BaseException) – 电子邮件中发送的异常

  • task (airflow.models.baseoperator.BaseOperator | None)

email_alert(exception, task)[source]

发送包含异常信息的警报邮件。

参数::
  • exception – 异常

  • task (airflow.models.baseoperator.BaseOperator) – 与异常相关的任务

set_duration()[source]

设置任务实例时长。

xcom_push(key, value, session=NEW_SESSION)[source]

使 XCom 可供任务拉取。

参数::
  • key (str) – 存储值的键。

  • value (Any) – 要存储的值。否则只能使用 JSON 可序列化的值。

get_num_running_task_instances(session, same_dagrun=False)[source]

从数据库返回正在运行的任务实例数量。

static filter_for_tis(tis)[source]

返回 SQLAlchemy 过滤器,用于查询选定的任务实例。

get_relevant_upstream_map_indexes(upstream, ti_count, *, session)[source]

推断与此任务实例“相关”的上游的 map indexes。

大部分逻辑主要用于解决以下示例描述的问题,其中 ‘val’ 必须根据引用的使用位置解析为不同的值

@task
def this_task(v):  # This is self.task.
    return v * 2


@task_group
def tg1(inp):
    val = upstream(inp)  # This is the upstream task.
    this_task(val)  # When inp is 1, val here should resolve to 2.
    return val


# This val is the same object returned by tg1.
val = tg1.expand(inp=[1, 2, 3])


@task_group
def tg2(inp):
    another_task(inp, val)  # val here should resolve to [2, 4, 6].


tg2.expand(inp=["a", "b"])

检查 upstream 和 self.task 周围的 mapped task groups 以查找共同的“祖先”。如果找到此类祖先,我们需要返回特定的 map indexes 以从上游 XCom 拉取部分值。

参数::
  • upstream (airflow.sdk.definitions._internal.abstractoperator.Operator) – 引用的上游任务。

  • ti_count (int | None) – 调度器扩展此任务的任务实例总数,即模板上下文中的 expanded_ti_count

返回:

要拉取的特定 map index 或 map indexes,如果需要“完整”返回值(即不涉及 mapped task groups),则为 None。

返回类型:

int | range | None

classmethod duration_expression_update(end_date, query, bind)[source]

返回一个 SQL 表达式,用于根据开始和结束日期列计算此任务实例的时长。

static validate_inlet_outlet_assets_activeness(inlets, outlets, session)[source]
get_first_reschedule_date(context)[source]

获取任务实例的第一次重新调度日期。

airflow.models.taskinstance.TaskInstanceStateType[source]
class airflow.models.taskinstance.SimpleTaskInstance(dag_id, task_id, run_id, queued_dttm, start_date, end_date, try_number, map_index, state, executor, executor_config, pool, queue, key, run_as_user=None, priority_weight=None, parent_context_carrier=None, context_carrier=None, span_status=None)[source]

简化任务实例。

用于通过队列在进程间发送数据。

dag_id[source]
task_id[source]
run_id[source]
map_index[source]
queued_dttm[source]
start_date[source]
end_date[source]
try_number[source]
state[source]
executor[source]
executor_config[source]
run_as_user = None[source]
pool[source]
priority_weight = None[source]
queue[source]
key[source]
parent_context_carrier = None[source]
context_carrier = None[source]
span_status = None[source]
__repr__()[source]
__eq__(other)[source]
classmethod from_ti(ti)[source]
class airflow.models.taskinstance.TaskInstanceNote(content, user_id=None)[source]

Bases: airflow.models.base.Base

用于存储与任务实例相关的任意备注。

__tablename__ = 'task_instance_note'[source]
ti_id[source]
user_id[source]
content[source]
created_at[source]
updated_at[source]
task_instance[source]
__table_args__[source]
__repr__()[source]

此条目有帮助吗?