airflow.providers.http.sensors.http

HttpSensor

执行 HTTP GET 语句;在遇到 404 Not Found 错误或 response_check 返回 False 时返回 False。

模块内容

class airflow.providers.http.sensors.http.HttpSensor(*, endpoint, http_conn_id='http_default', method='GET', request_params=None, request_kwargs=None, headers=None, response_error_codes_allowlist=None, response_check=None, extra_options=None, tcp_keep_alive=True, tcp_keep_alive_idle=120, tcp_keep_alive_count=20, tcp_keep_alive_interval=30, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]

继承自: airflow.sensors.base.BaseSensorOperator

执行 HTTP GET 语句;在遇到 404 Not Found 错误或 response_check 返回 False 时返回 False。

除 404(例如 403)之外的 HTTP 错误代码或连接拒绝错误会直接引发异常并导致 Sensor 本身失败(不再进行探测)。为了避免因 404 之外的其他代码导致任务失败,可以通过参数 response_error_codes_allowlist 传递一个包含所有允许的错误状态代码的列表,例如 ["404", "503"]。要完全跳过错误状态代码检查,可以通过参数 extra_option 传递值 {'check_response': False}。这将使得 response_check 对任何 HTTP 状态代码都执行。

响应检查可以访问 Operator 的模板上下文。

def response_check(response, task_instance):
    # The task_instance is injected, so you can pull data form xcom
    # Other context variables such as dag, ds, logical_date are also available.
    xcom_data = task_instance.xcom_pull(task_ids="pushing_task")
    # In practice you would do something more sensible with this data..
    print(xcom_data)
    return True


HttpSensor(task_id="my_http_sensor", ..., response_check=response_check)

另请参阅

有关如何使用此 Operator 的更多信息,请参阅指南:HttpSensor

参数:
  • http_conn_id (str) – 用于运行 Sensor 的 HTTP 连接

  • method (str) – 要使用的 HTTP 请求方法

  • endpoint (str) – 完整 URL 的相对路径部分

  • request_params (dict[str, Any] | None) – 要添加到 GET URL 的参数

  • headers (dict[str, Any] | None) – 要添加到 GET 请求的 HTTP 头

  • response_error_codes_allowlist (list[str] | None) – 一个允许列表,用于在 poke() 时返回 False 而不是引发异常。如果传入的值为 None,为了向后兼容,默认指定为 ["404"]。如果您也希望 404 Not Found 抛出错误,请明确传递空列表 []

  • response_check (Callable[Ellipsis, bool | airflow.sensors.base.PokeReturnValue] | None) – 对“requests”响应对象进行的检查。该可调用对象将响应对象作为第一个位置参数,并可选地接受上下文字典中任意数量的关键字参数。通过时应返回 True,否则返回 False。

  • extra_options (dict[str, Any] | None) – “requests”库的额外选项,请参阅“requests”文档(修改超时、ssl 等的选项)。

  • tcp_keep_alive (bool) – 为连接启用 TCP Keep Alive。

  • tcp_keep_alive_idle (int) – TCP Keep Alive 空闲参数(对应于 socket.TCP_KEEPIDLE)。

  • tcp_keep_alive_count (int) – TCP Keep Alive 计数参数(对应于 socket.TCP_KEEPCNT

  • tcp_keep_alive_interval (int) – TCP Keep Alive 间隔参数(对应于 socket.TCP_KEEPINTVL

  • deferrable (bool) – 如果等待完成,是否延迟任务直到完成,默认为 False

template_fields: collections.abc.Sequence[str] = ('endpoint', 'request_params', 'headers')[source]
endpoint[source]
http_conn_id = 'http_default'[source]
method = 'GET'[source]
response_error_codes_allowlist = ('404',)[source]
request_params[source]
headers[source]
extra_options[source]
response_check = None[source]
tcp_keep_alive = True[source]
tcp_keep_alive_idle = 120[source]
tcp_keep_alive_count = 20[source]
tcp_keep_alive_interval = 30[source]
deferrable = True[source]
request_kwargs[source]
poke(context)[source]

派生此类时覆盖此方法。

execute(context)[source]

创建 Operator 时派生此方法。

执行任务的主要方法。Context 与渲染 jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

execute_complete(context, event=None)[source]

此条目是否有帮助?