airflow.providers.http.operators.http

HttpOperator

调用 HTTP 系统上的一个端点来执行操作。

模块内容

class airflow.providers.http.operators.http.HttpOperator(*, endpoint=None, method='POST', data=None, headers=None, pagination_function=None, response_check=None, response_filter=None, extra_options=None, request_kwargs=None, http_conn_id='http_default', log_response=False, auth_type=None, tcp_keep_alive=True, tcp_keep_alive_idle=120, tcp_keep_alive_count=20, tcp_keep_alive_interval=30, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), retry_args=None, **kwargs)[source]

基础: airflow.models.BaseOperator

调用 HTTP 系统上的一个端点来执行操作。

另请参阅

有关如何使用此运算符的更多信息,请参阅指南: HttpOperator

参数:
  • http_conn_id (str) – 要运行此运算符所针对的 HTTP 连接

  • endpoint (str | None) – 完整 URL 的相对部分。(模板化)

  • method (str) – 要使用的 HTTP 方法,默认为 “POST”

  • data (dict[str, Any] | str | None) – 要传递的数据。POST/PUT 请求中的 POST 数据,GET 请求中 URL 中的参数。(模板化)

  • headers (dict[str, str] | None) – 要添加到 GET 请求中的 HTTP 头部

  • pagination_function (Callable[Ellipsis, Any] | None) – 一个可调用对象,用于根据之前的响应生成再次调用 API 所使用的参数。通常用于 API 分页并返回游标、"下一页 ID" 或 "下一页 URL" 等情况。提供此参数后,操作符将重复调用 API,直到此可调用对象返回 None。默认情况下,操作符的结果将成为 Response.text 对象列表(而不是单个响应对象)。同样,其他注入函数(如 response_check、response_filter 等)也将接收 Response 对象列表。此函数接收来自上一次调用的 Response 对象,并应返回一个嵌套字典,其中包含以下可选键:endpointdataheadersextra_options。这些键将被合并和/或覆盖 HttpOperator 声明中提供的参数。当它们都是字典时(例如:HttpOperator.headers 将与此函数提供的 `headers` 字典合并),参数会合并。合并时,此函数返回的字典项将覆盖初始项(例如:如果 HttpOperator.headers 和 headers 都有 'cookie' 项,则保留 headers 提供的项)。当其中任何一个为字符串时(例如:HttpOperator.endpoint 被 endpoint 覆盖),参数会被简单地覆盖。

  • response_check (Callable[Ellipsis, bool] | None) – 对“requests”响应对象进行的检查。可调用对象将响应对象作为第一个位置参数,并可选地接受 context 字典中任意数量的关键字参数。如果通过则返回 True,否则返回 False。如果提供了 pagination_function,此函数将接收响应对象列表而不是单个响应对象。

  • response_filter (Callable[Ellipsis, Any] | None) – 一个允许你操作响应文本的函数。例如 response_filter=lambda response: json.loads(response.text)。可调用对象将响应对象作为第一个位置参数,并可选地接受 context 字典中任意数量的关键字参数。如果提供了 pagination_function,此函数将接收响应对象列表而不是单个响应对象。

  • extra_options (dict[str, Any] | None) – “requests”库的额外选项,请参阅“requests”文档(修改超时、ssl 等的选项)

  • log_response (bool) – 记录响应(默认值:False)

  • auth_type (type[requests.auth.AuthBase] | None) – 服务的认证类型

  • tcp_keep_alive (bool) – 为连接启用 TCP Keep Alive。

  • tcp_keep_alive_idle (int) – TCP Keep Alive Idle 参数(对应于 socket.TCP_KEEPIDLE)。

  • tcp_keep_alive_count (int) – TCP Keep Alive count 参数(对应于 socket.TCP_KEEPCNT

  • tcp_keep_alive_interval (int) – TCP Keep Alive interval 参数(对应于 socket.TCP_KEEPINTVL

  • deferrable (bool) – 在可延迟模式下运行运算符

  • retry_args (dict[str, Any] | None) – 定义重试行为的参数。请参阅 Tenacity 文档:https://github.com/jd/tenacity

conn_id_field = 'http_conn_id'[source]
template_fields: collections.abc.Sequence[str] = ('endpoint', 'data', 'headers')[source]
template_fields_renderers[source]
template_ext: collections.abc.Sequence[str] = ()[source]
ui_color = '#f4a460'[source]
http_conn_id = 'http_default'[source]
method = 'POST'[source]
endpoint = None[source]
headers[source]
data[source]
pagination_function = None[source]
response_check = None[source]
response_filter = None[source]
extra_options[source]
log_response = False[source]
auth_type = None[source]
tcp_keep_alive = True[source]
tcp_keep_alive_idle = 120[source]
tcp_keep_alive_count = 20[source]
tcp_keep_alive_interval = 30[source]
deferrable = True[source]
retry_args = None[source]
request_kwargs[source]
property hook: airflow.providers.http.hooks.http.HttpHook[source]

根据连接类型获取 Http Hook。

execute(context)[source]

创建运算符时派生。

Context 是与渲染 jinja 模板时使用的相同的字典。

有关更多 context,请参阅 get_template_context。

execute_sync(context)[source]
paginate_sync(response)[source]
execute_async(context)[source]
process_response(context, response)[source]

处理响应。

execute_complete(context, event, paginated_responses=None)[source]

触发器触发时执行回调;立即返回。

依赖于触发器抛出异常,否则假定执行成功。

paginate_async(context, response, previous_responses=None)[source]

此条目是否有用?