airflow.models.taskinstance¶
属性¶
类¶
任务实例存储任务实例的状态。 |
|
简化任务实例。 |
|
用于存储与任务实例相关的任意备注。 |
函数¶
|
将当前执行上下文设置为提供的上下文对象。 |
|
清除一组任务实例,但确保正在运行的任务被杀死。 |
|
生成新的 UUID7 字符串。 |
模块内容¶
- airflow.models.taskinstance.set_current_context(context)[source]¶
将当前执行上下文设置为提供的上下文对象。
此方法应在每次任务执行之前调用一次 operator.execute。
- airflow.models.taskinstance.clear_task_instances(tis, session, dag=None, dag_run_state=DagRunState.QUEUED)[source]¶
清除一组任务实例,但确保正在运行的任务被杀死。
同时将 Dagrun 的 state 设置为 QUEUED,start_date 设置为执行时间。但仅针对已完成的 DR(SUCCESS 和 FAILED)。对于正在运行的 DR(QUEUED 和 RUNNING),不会清除 DR 的 state 和 start_date`,因为清除已在运行的 DR 的状态是多余的,并且清除 `start_date` 会影响 DR 的持续时间。
- 参数::
tis (list[TaskInstance]) – 任务实例列表
session (sqlalchemy.orm.session.Session) – 当前会话
dag_run_state (airflow.utils.state.DagRunState | airflow.typing_compat.Literal[False]) – 要设置已完成 DagRuns 的状态。如果设置为 False,则不会更改 DagRuns 的状态。
dag (airflow.sdk.definitions.dag.DAG | None) – DAG 对象
- class airflow.models.taskinstance.TaskInstance(task, run_id=None, state=None, map_index=-1, dag_version_id=None)[source]¶
基类:
airflow.models.base.Base
,airflow.utils.log.logging_mixin.LoggingMixin
任务实例存储任务实例的状态。
此表是关于哪些任务已运行及其所处状态的权威和单一事实来源。
SqlAlchemy 模型故意没有指向 task 或 dag 模型的 SqlAlchemy 外键,以便更好地控制事务。
对此表的数据库事务应确保不会发生重复触发,并避免在多个调度程序可能正在触发任务实例时对哪些任务实例已准备好运行产生混淆。
map_index 中的值 -1 表示以下任何一种情况:没有映射任务的任务实例;带有映射任务但尚未展开(state=pending)的任务实例;带有映射任务但展开为空列表(state=skipped)的任务实例。
- dag_model: airflow.models.dag.DagModel[source]¶
- command_as_list(mark_success=False, ignore_all_deps=False, ignore_task_deps=False, ignore_depends_on_past=False, wait_for_past_depends_before_skipping=False, ignore_ti_state=False, local=False, raw=False, pool=None, cfg_path=None)[source]¶
返回一个可以在任何安装了 airflow 的地方执行的命令。
此命令是调度器发送给执行器的消息的一部分。
- static generate_command(dag_id, task_id, run_id, mark_success=False, ignore_all_deps=False, ignore_depends_on_past=False, wait_for_past_depends_before_skipping=False, ignore_task_deps=False, ignore_ti_state=False, local=False, file_path=None, raw=False, pool=None, cfg_path=None, map_index=-1)[source]¶
生成执行此任务实例所需的 shell 命令。
- 参数::
dag_id (str) – DAG ID
task_id (str) – Task ID
run_id (str) – 此任务的 DagRun 的 run_id
mark_success (bool) – 是否将任务标记为成功
ignore_all_deps (bool) – 忽略所有可忽略的依赖项。覆盖其他 ignore_* 参数。
ignore_depends_on_past (bool) – 忽略 DAG 的 depends_on_past 参数(例如,用于 Backfills)
wait_for_past_depends_before_skipping (bool) – 在将任务实例标记为跳过之前等待过去的依赖项
ignore_task_deps (bool) – 忽略任务特定的依赖项,例如 depends_on_past 和 trigger rule
ignore_ti_state (bool) – 忽略任务实例之前的失败/成功状态
local (bool) – 是否在本地运行任务
file_path (pathlib.PurePath | str | None) – 包含 DAG 定义的文件的路径
raw (bool) – raw 模式(需要更多详细信息)
pool (str | None) – 任务应该运行所在的 Airflow pool
cfg_path (str | None) – 配置文件的路径
- 返回:
可用于运行任务实例的 shell 命令
- 返回类型:
- error(session=NEW_SESSION)[source]¶
在数据库中将任务实例的状态强制设置为 FAILED。
- 参数::
session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session
- classmethod get_task_instance(dag_id, run_id, task_id, map_index, lock_for_update=False, session=NEW_SESSION)[source]¶
- refresh_from_db(session=NEW_SESSION, lock_for_update=False, keep_local_changes=False)[source]¶
根据主键从数据库刷新任务实例。
- 参数::
session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session
lock_for_update (bool) – 如果为 True,表示数据库应该锁定任务实例(发出 FOR UPDATE 子句),直到 session 提交。
keep_local_changes (bool) – 如果为 False(默认值),则强制所有属性使用数据库中的值,如果为 True,则不覆盖本地设置的属性
- refresh_from_task(task, pool_override=None)[source]¶
从给定的任务中复制通用属性。
- 参数::
task (airflow.sdk.definitions._internal.abstractoperator.Operator) – 要从中复制的任务对象
pool_override (str | None) – 使用 pool_override 而不是任务的 pool
- property key: airflow.models.taskinstancekey.TaskInstanceKey[source]¶
返回唯一标识任务实例的 tuple。
- set_state(state, session=NEW_SESSION)[source]¶
设置任务实例状态。
- 参数::
state (str | None) – 要为任务实例设置的状态
session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session
- 返回:
状态是否已更改
- 返回类型:
- are_dependents_done(session=NEW_SESSION)[source]¶
检查此任务实例的直接下游任务是否已成功或已被跳过。
这旨在供 wait_for_downstream 使用。
当您不想在下游任务完成之前开始处理任务的下一个调度时,这很有用。例如,如果任务 DROPs 并重新创建表。
- 参数::
session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session
- get_previous_dagrun(state=None, session=None)[source]¶
返回在此任务实例的 DagRun 之前运行的 DagRun。
- 参数::
state (airflow.utils.state.DagRunState | None) – 如果传入,则仅考虑特定状态的实例。
session (sqlalchemy.orm.session.Session | None) – SQLAlchemy ORM Session。
- get_previous_ti(state=None, session=NEW_SESSION)[source]¶
返回在此任务实例之前运行的任务的任务实例。
- 参数::
session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session
state (airflow.utils.state.DagRunState | None) – 如果传入,则仅考虑特定状态的实例。
- are_dependencies_met(dep_context=None, session=NEW_SESSION, verbose=False)[source]¶
给定依赖关系的上下文,是否满足此任务实例运行的所有条件。
(例如,从 UI 强制运行的任务实例将忽略某些依赖项)。
- 参数::
dep_context (airflow.ti_deps.dep_context.DepContext | None) – 确定应评估的依赖项的执行上下文。
session (sqlalchemy.orm.session.Session) – 数据库 session
verbose (bool) – 是否在 info 或 debug 日志级别记录失败依赖项的详细信息
- get_dagrun(session=NEW_SESSION)[source]¶
返回此任务实例的 DagRun。
- 参数::
session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session
- 返回:
DagRun
- 返回类型:
- check_and_change_state_before_execution(verbose=True, ignore_all_deps=False, ignore_depends_on_past=False, wait_for_past_depends_before_skipping=False, ignore_task_deps=False, ignore_ti_state=False, mark_success=False, test_mode=False, pool=None, external_executor_id=None, session=NEW_SESSION)[source]¶
- emit_state_change_metric(new_state)[source]¶
发送一个时间指标,表示给定状态转换花费了多少时间。
根据任务所处的状态推断出前一个状态和指标名称。
- 参数::
new_state (airflow.utils.state.TaskInstanceState) – 刚为此任务设置的状态。我们不使用 self.state,因为有时状态会直接在数据库中更新,而不是在本地任务实例对象中更新。支持的状态:QUEUED 和 RUNNING
- defer_task(exception, session=NEW_SESSION)[source]¶
将任务标记为延迟状态,并设置在 TaskDeferred 被引发时恢复它所需的 trigger。
- run(verbose=True, ignore_all_deps=False, ignore_depends_on_past=False, wait_for_past_depends_before_skipping=False, ignore_task_deps=False, ignore_ti_state=False, mark_success=False, test_mode=False, pool=None, session=NEW_SESSION, raise_on_defer=False)[source]¶
运行 TaskInstance。
- classmethod fetch_handle_failure_context(ti, error, test_mode=None, context=None, force_fail=False, *, session, fail_fast=False)[source]¶
获取处理失败所需的上下文。
- 参数::
ti (TaskInstance) – TaskInstance
error (None | str | BaseException) – 如果指定,记录抛出的特定异常
test_mode (bool | None) – 如果为 True,则不记录数据库中的成功或失败
context (airflow.utils.context.Context | None) – Jinja2 上下文
force_fail (bool) – 如果为 True,任务不会重试
session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session
fail_fast (bool) – 如果为 True,所有下游任务都将失败
- handle_failure(error, test_mode=None, context=None, force_fail=False, session=NEW_SESSION)[source]¶
处理任务实例的失败。
- 参数::
error (None | str | BaseException) – 如果指定,记录抛出的特定异常
session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM Session
test_mode (bool | None) – 如果为 True,则不记录数据库中的成功或失败
context (airflow.utils.context.Context | None) – Jinja2 上下文
force_fail (bool) – 如果为 True,任务不会重试
- get_template_context(session=None, ignore_param_exceptions=True)[source]¶
返回 Task Instance 上下文。
- 参数::
session (sqlalchemy.orm.session.Session | None) – SQLAlchemy ORM 会话
ignore_param_exceptions (bool) – 初始化 ParamsDict 时抑制值异常的标志
- get_rendered_template_fields(session=NEW_SESSION)[source]¶
使用渲染后的模板字段更新任务,以便在 UI 中显示。
如果任务已运行,将从数据库中获取;否则将进行渲染。
- render_templates(context=None, jinja_env=None)[source]¶
渲染 Operator 字段中的模板。
如果任务最初是映射的,这可能会将
self.task
替换为未映射的、完全渲染的 BaseOperator。返回替换前的原始self.task
。
- get_email_subject_content(exception, task=None)[source]¶
获取异常电子邮件的主题内容。
- 参数::
exception (BaseException) – 电子邮件中发送的异常
task (airflow.models.baseoperator.BaseOperator | None)
- email_alert(exception, task)[source]¶
发送包含异常信息的警报邮件。
- 参数::
exception – 异常
task (airflow.models.baseoperator.BaseOperator) – 与异常相关的任务
- xcom_push(key, value, session=NEW_SESSION)[source]¶
使 XCom 可供任务拉取。
- 参数::
key (str) – 存储值的键。
value (Any) – 要存储的值。否则只能使用 JSON 可序列化的值。
- get_relevant_upstream_map_indexes(upstream, ti_count, *, session)[source]¶
推断与此任务实例“相关”的上游的 map indexes。
大部分逻辑主要用于解决以下示例描述的问题,其中 ‘val’ 必须根据引用的使用位置解析为不同的值
@task def this_task(v): # This is self.task. return v * 2 @task_group def tg1(inp): val = upstream(inp) # This is the upstream task. this_task(val) # When inp is 1, val here should resolve to 2. return val # This val is the same object returned by tg1. val = tg1.expand(inp=[1, 2, 3]) @task_group def tg2(inp): another_task(inp, val) # val here should resolve to [2, 4, 6]. tg2.expand(inp=["a", "b"])
检查 upstream 和
self.task
周围的 mapped task groups 以查找共同的“祖先”。如果找到此类祖先,我们需要返回特定的 map indexes 以从上游 XCom 拉取部分值。
- class airflow.models.taskinstance.SimpleTaskInstance(dag_id, task_id, run_id, queued_dttm, start_date, end_date, try_number, map_index, state, executor, executor_config, pool, queue, key, run_as_user=None, priority_weight=None, parent_context_carrier=None, context_carrier=None, span_status=None)[source]¶
简化任务实例。
用于通过队列在进程间发送数据。