airflow.models.taskinstance
¶
模块内容¶
类¶
任务实例存储任务实例的状态。 |
|
简化的任务实例。 |
|
用于存储有关任务实例的任意注释。 |
函数¶
|
将当前执行上下文设置为提供的上下文对象。 |
|
清除一组任务实例,但确保正在运行的任务实例被终止。 |
属性¶
- airflow.models.taskinstance.set_current_context(context)[源代码]¶
将当前执行上下文设置为提供的上下文对象。
此方法应在每次任务执行时调用一次,在调用 operator.execute 之前。
- airflow.models.taskinstance.clear_task_instances(tis, session, activate_dag_runs=None, dag=None, dag_run_state=DagRunState.QUEUED)[源代码]¶
清除一组任务实例,但确保正在运行的任务实例被终止。
还将 Dagrun 的 state 设置为 QUEUED,并将 start_date 设置为执行时间。但仅适用于已完成的 DR(SUCCESS 和 FAILED)。不会清除正在运行的 DR(QUEUED 和 RUNNING)的 DR 的 state 和 start_date,因为清除已在运行的 DR 的状态是多余的,并且清除 start_date 会影响 DR 的持续时间。
- 参数
tis (list[TaskInstance]) – 任务实例列表
session (sqlalchemy.orm.session.Session) – 当前会话
dag_run_state (airflow.utils.state.DagRunState | airflow.typing_compat.Literal[False]) – 将已完成的 DagRuns 设置为的状态。如果设置为 False,则不会更改 DagRuns 状态。
dag (airflow.models.dag.DAG | None) – DAG 对象
activate_dag_runs (None) – 已弃用的参数,请勿传递
- class airflow.models.taskinstance.TaskInstance(task, execution_date=None, run_id=None, state=None, map_index=-1)[source]¶
基类:
airflow.models.base.Base
,airflow.utils.log.logging_mixin.LoggingMixin
任务实例存储任务实例的状态。
此表是关于任务已运行状态的权威且唯一的真实来源。
SqlAlchemy 模型特意没有与任务或 DAG 模型建立 SqlAlchemy 外键,以便更好地控制事务。
此表上的数据库事务应确保防止双重触发,以及防止多个调度程序可能触发任务实例时,对哪些任务实例准备好运行产生任何混淆。
map_index 中的值 -1 表示以下任何一种情况:未映射任务的 TI;具有尚未扩展的映射任务的 TI(状态=pending);具有扩展到空列表的映射任务的 TI(状态=skipped)。
- property key: airflow.models.taskinstancekey.TaskInstanceKey[source]¶
返回一个唯一标识任务实例的元组。
- property previous_ti: TaskInstance | airflow.serialization.pydantic.taskinstance.TaskInstancePydantic | None[source]¶
此属性已弃用。
请使用
airflow.models.taskinstance.TaskInstance.get_previous_ti
。
- property previous_ti_success: TaskInstance | airflow.serialization.pydantic.taskinstance.TaskInstancePydantic | None[source]¶
此属性已弃用。
请使用
airflow.models.taskinstance.TaskInstance.get_previous_ti
。
- property previous_start_date_success: pendulum.DateTime | None[source]¶
此属性已弃用。
请使用
airflow.models.taskinstance.TaskInstance.get_previous_start_date
。
- dag_model: airflow.models.dag.DagModel[source]¶
- command_as_list(mark_success=False, ignore_all_deps=False, ignore_task_deps=False, ignore_depends_on_past=False, wait_for_past_depends_before_skipping=False, ignore_ti_state=False, local=False, pickle_id=None, raw=False, job_id=None, pool=None, cfg_path=None)[source]¶
返回一个可以在任何安装了 Airflow 的地方执行的命令。
此命令是编排器发送给执行器的消息的一部分。
- static generate_command(dag_id, task_id, run_id, mark_success=False, ignore_all_deps=False, ignore_depends_on_past=False, wait_for_past_depends_before_skipping=False, ignore_task_deps=False, ignore_ti_state=False, local=False, pickle_id=None, file_path=None, raw=False, job_id=None, pool=None, cfg_path=None, map_index=-1)[source]¶
生成执行此任务实例所需的 shell 命令。
- 参数
dag_id (str) – DAG ID
task_id (str) – 任务 ID
run_id (str) – 此任务的 DagRun 的 run_id
mark_success (bool) – 是否将任务标记为成功
ignore_all_deps (bool) – 忽略所有可忽略的依赖项。覆盖其他 ignore_* 参数。
ignore_depends_on_past (bool) – 忽略 DAG 的 depends_on_past 参数(例如,对于回填)
wait_for_past_depends_before_skipping (bool) – 在将 ti 标记为跳过之前等待过去的依赖项
ignore_task_deps (bool) – 忽略特定于任务的依赖项,例如 depends_on_past 和触发规则
ignore_ti_state (bool) – 忽略任务实例之前的失败/成功
local (bool) – 是否在本地运行任务
pickle_id (int | None) – 如果 DAG 已序列化到数据库,则为与 pickle DAG 关联的 ID
file_path (pathlib.PurePath | str | None) – 包含 DAG 定义的文件的路径
raw (bool) – 原始模式(需要更多详细信息)
job_id (str | None) – 作业 ID(需要更多详细信息)
pool (str | None) – 任务应在其中运行的 Airflow 池
cfg_path (str | None) – 配置文件的路径
- 返回
可用于运行任务实例的 shell 命令
- 返回类型
list[str]
- current_state(session=NEW_SESSION)[source]¶
从数据库中获取最新的状态。
如果传递了会话,我们将使用它,并且查找状态成为会话的一部分,否则将使用新会话。
这里使用 sqlalchemy.inspect 获取主键,确保如果它们发生变化,它不会退化
- 参数
session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM 会话
- error(session=NEW_SESSION)[source]¶
强制将任务实例的状态在数据库中设置为 FAILED。
- 参数
session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM 会话
- classmethod get_task_instance(dag_id, run_id, task_id, map_index, lock_for_update=False, session=NEW_SESSION)[source]¶
- refresh_from_db(session=NEW_SESSION, lock_for_update=False)[source]¶
根据主键从数据库刷新任务实例。
- 参数
session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM 会话
lock_for_update (bool) – 如果为 True,则表示数据库应锁定 TaskInstance(发出 FOR UPDATE 子句),直到提交会话。
- refresh_from_task(task, pool_override=None)[源代码]¶
从给定的任务复制通用属性。
- 参数
task (airflow.models.operator.Operator) – 要从中复制的任务对象
pool_override (str | None) – 使用 pool_override 代替任务的池
- set_state(state, session=NEW_SESSION)[源代码]¶
设置 TaskInstance 状态。
- 参数
state (str | None) – 要为 TI 设置的状态
session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM 会话
- 返回
状态是否已更改
- 返回类型
- are_dependents_done(session=NEW_SESSION)[源代码]¶
检查此任务实例的直接依赖项是否已成功或已跳过。
这旨在供 wait_for_downstream 使用。
当您不希望在依赖项完成之前开始处理任务的下一个计划时,这很有用。例如,如果任务删除并重新创建表。
- 参数
session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM 会话
- get_previous_dagrun(state=None, session=None)[源代码]¶
返回在此任务实例的 DagRun 之前运行的 DagRun。
- 参数
state (airflow.utils.state.DagRunState | None) – 如果传递,则仅考虑特定状态的实例。
session (sqlalchemy.orm.session.Session | None) – SQLAlchemy ORM 会话。
- get_previous_ti(state=None, session=NEW_SESSION)[源代码]¶
返回在此任务实例之前运行的任务的任务实例。
- 参数
session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM 会话
state (airflow.utils.state.DagRunState | None) – 如果传递,则仅考虑特定状态的实例。
- get_previous_execution_date(state=None, session=NEW_SESSION)[源代码]¶
从属性 previous_ti_success 返回执行日期。
- 参数
state (airflow.utils.state.DagRunState | None) – 如果传递,则仅考虑特定状态的实例。
session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM 会话
- get_previous_start_date(state=None, session=NEW_SESSION)[源代码]¶
从属性 previous_ti_success 返回开始日期。
- 参数
state (airflow.utils.state.DagRunState | None) – 如果传递,则仅考虑特定状态的实例。
session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM 会话
- are_dependencies_met(dep_context=None, session=NEW_SESSION, verbose=False)[源代码]¶
给定依赖项的上下文,是否满足此任务实例运行的所有条件。
(例如,从 UI 强制运行的任务实例将忽略某些依赖项)。
- 参数
dep_context (airflow.ti_deps.dep_context.DepContext | None) – 确定应评估的依赖项的执行上下文。
session (sqlalchemy.orm.session.Session) – 数据库会话
verbose (bool) – 是否在 info 或 debug 日志级别记录有关失败依赖项的详细信息
- get_dagrun(session=NEW_SESSION)[源代码]¶
返回此 TaskInstance 的 DagRun。
- 参数
session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM 会话
- 返回
DagRun
- 返回类型
- check_and_change_state_before_execution(verbose=True, ignore_all_deps=False, ignore_depends_on_past=False, wait_for_past_depends_before_skipping=False, ignore_task_deps=False, ignore_ti_state=False, mark_success=False, test_mode=False, job_id=None, pool=None, external_executor_id=None, session=NEW_SESSION)[source]¶
- emit_state_change_metric(new_state)[source]¶
发送一个时间指标,表示给定状态转换所花费的时间。
之前的状态和指标名称是从任务所处的状态推断出来的。
- 参数
new_state (airflow.utils.state.TaskInstanceState) – 为此任务刚刚设置的状态。我们不使用 self.state,因为有时状态是直接在数据库中更新的,而不是在本地 TaskInstance 对象中更新的。支持的状态:QUEUED 和 RUNNING
- run(verbose=True, ignore_all_deps=False, ignore_depends_on_past=False, wait_for_past_depends_before_skipping=False, ignore_task_deps=False, ignore_ti_state=False, mark_success=False, test_mode=False, job_id=None, pool=None, session=NEW_SESSION, raise_on_defer=False)[source]¶
运行 TaskInstance。
- classmethod fetch_handle_failure_context(ti, error, test_mode=None, context=None, force_fail=False, *, session, fail_stop=False)[source]¶
处理 TaskInstance 的失败。
- 参数
fail_stop (bool) – 如果为 true,则停止 DAG 中剩余的任务
- handle_failure(error, test_mode=None, context=None, force_fail=False, session=NEW_SESSION)[source]¶
处理任务实例的失败。
- 参数
error (None | str | BaseException) – 如果指定,则记录抛出的特定异常
session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM 会话
test_mode (bool | None) – 如果为 True,则不在数据库中记录成功或失败
context (airflow.utils.context.Context | None) – Jinja2 上下文
force_fail (bool) – 如果为 True,则任务不会重试
- get_template_context(session=None, ignore_param_exceptions=True)[source]¶
返回 TI 上下文。
- 参数
session (sqlalchemy.orm.session.Session | None) – SQLAlchemy ORM 会话
ignore_param_exceptions (bool) – 在初始化 ParamsDict 时,抑制值异常的标志
- get_rendered_template_fields(session=NEW_SESSION)[source]¶
更新任务,使其包含 UI 中展示的渲染后的模板字段。
如果任务已经运行,将从数据库中获取;否则将进行渲染。
- render_templates(context=None, jinja_env=None)[source]¶
渲染操作符字段中的模板。
如果任务最初是映射的,这可能会将
self.task
替换为未映射的、完全渲染的 BaseOperator。 原始替换前的self.task
会被返回。
- get_email_subject_content(exception, task=None)[source]¶
获取异常的电子邮件主题内容。
- 参数
exception (BaseException) – 电子邮件中发送的异常
task (airflow.models.baseoperator.BaseOperator | None) –
- email_alert(exception, task)[source]¶
发送包含异常信息的警报电子邮件。
- 参数
exception – 异常
task (airflow.models.baseoperator.BaseOperator) – 与异常相关的任务
- xcom_push(key, value, execution_date=None, session=NEW_SESSION)[source]¶
使 XCom 可供任务拉取。
- 参数
key (str) – 存储值的键。
value (Any) – 要存储的值。可能的类型取决于
enable_xcom_pickling
是否为 true。 如果为 true,则可以是任何可 pickling 的对象;否则只能使用 JSON 可序列化的对象。execution_date (datetime.datetime | None) – 已弃用的参数,不起作用。
- xcom_pull(task_ids=None, dag_id=None, key=XCOM_RETURN_KEY, include_prior_dates=False, session=NEW_SESSION, *, map_indexes=None, default=None)[source]¶
拉取可选地满足某些条件的 XCom。
- 参数
key (str) – XCom 的键。如果提供,则只返回具有匹配键的 XCom。默认键为
'return_value'
,也可以作为常量XCOM_RETURN_KEY
使用。 此键会自动赋给由任务返回的 XCom(与手动推送相反)。要移除筛选器,请传递 None。task_ids (str | Iterable[str] | None) – 只会拉取来自具有匹配 ID 的任务的 XCom。传递 None 可移除筛选器。
dag_id (str | None) – 如果提供,则只会从这个 DAG 中拉取 XCom。如果为 None(默认值),则会使用调用任务的 DAG。
map_indexes (int | Iterable[int] | None) – 如果提供,则只拉取具有匹配索引的 XCom。如果为 None(默认值),则会从正在拉取的任务中推断出来(详情见下文)。
include_prior_dates (bool) – 如果为 False,则只返回当前 execution_date 的 XCom。如果为 True,则还会返回之前日期的 XCom。
当拉取一个单独的任务(
task_id
为 None 或字符串)而没有指定map_indexes
时,返回值会根据指定的任务是否被映射而推断出来。 如果未映射,则返回来自一个单独任务实例的值。 如果要拉取的任务是映射的,则会返回一个迭代器(不是列表),该迭代器产生来自映射任务实例的 XCom。 在这两种情况下,如果找不到匹配的 XCom,则会返回default
(如果未指定,则为 None)。当拉取多个任务时(即
task_id
或map_index
为非字符串可迭代对象),会返回一个匹配 XCom 的列表。列表中元素的顺序由task_id
和map_index
中项目的顺序确定。
- get_relevant_upstream_map_indexes(upstream, ti_count, *, session)[source]¶
推断与此任务实例相关的上游任务的映射索引。
大部分逻辑主要用于解决以下示例描述的问题,其中“val”必须根据引用位置解析为不同的值
@task def this_task(v): # This is self.task. return v * 2 @task_group def tg1(inp): val = upstream(inp) # This is the upstream task. this_task(val) # When inp is 1, val here should resolve to 2. return val # This val is the same object returned by tg1. val = tg1.expand(inp=[1, 2, 3]) @task_group def tg2(inp): another_task(inp, val) # val here should resolve to [2, 4, 6]. tg2.expand(inp=["a", "b"])
检查
upstream
和self.task
周围的映射任务组,以找到一个共同的“祖先”。如果找到这样的祖先,我们需要返回特定的映射索引,以便从上游 XCom 中提取部分值。
- class airflow.models.taskinstance.SimpleTaskInstance(dag_id, task_id, run_id, start_date, end_date, try_number, map_index, state, executor, executor_config, pool, queue, key, run_as_user=None, priority_weight=None)[source]¶
简化的任务实例。
用于通过队列在进程之间发送数据。