HTTP 操作符

以下代码示例使用 http_default 连接,这意味着请求将发送到 httpbin 站点以执行基本的 HTTP 操作。

HttpSensor

使用 HttpSensor 进行轮询,直到 response_check 可调用对象评估为 true

这里我们进行轮询,直到 httpbin 给我们一个包含 httpbin 的响应文本。

tests/system/http/example_http.py[源代码]

task_http_sensor_check = HttpSensor(
    task_id="http_sensor_check",
    http_conn_id="http_default",
    endpoint="",
    request_params={},
    response_check=lambda response: "httpbin" in response.text,
    poke_interval=5,
    dag=dag,
)

此传感器也可以在可延迟模式下使用

tests/system/http/example_http.py[源代码]

task_http_sensor_check_async = HttpSensor(
    task_id="http_sensor_check_async",
    http_conn_id="http_default",
    endpoint="",
    deferrable=True,
    request_params={},
    poke_interval=5,
    dag=dag,
)

HttpOperator

使用 HttpOperator 来调用 HTTP 请求并获取响应文本。

警告

通过 HttpOperator 配置 https 是违反直觉的

由于历史原因,通过 HTTP 操作符配置 HTTPS 连接是困难且违反直觉的。操作符默认为 http 协议,您可以通过 scheme 连接属性更改操作符使用的方案。但是,此字段最初是为了数据库类型的 URI 而添加到连接中的,其中数据库方案传统上设置为 URI path 的第一个组成部分。因此,如果您想通过 URI 配置为 https 连接,您需要将 https 方案传递给 HttpOperator。尽管这看起来很愚蠢,但您的连接 URI 将如下所示:http://your_host:443/https。然后,如果您想在 HttpOperator 中使用不同的 URL 路径,您应该在运行任务时将您的路径作为 endpoint 参数传递。例如,要向 https://your_host:443/my_endpoint 运行查询,您需要将 endpoint 参数设置为 my_endpoint。或者,如果您愿意,您也可以对主机(包括 https:// 前缀)进行百分比编码,只要它包含 :// (百分比编码的 %3a%2f%2f),路径的第一个组成部分将不会被用作方案。您的 URI 定义可能如下所示:http://https%3a%2f%2fyour_host:443/。但是,在这种情况下,path 将不会被使用 - 如果您希望使用特定路径发出请求,仍然需要在任务中使用 endpoint 参数。尽管这违反直觉,但这在历史上是操作符/钩子的工作方式,并且在不破坏向后兼容性的情况下不容易更改,因为还有其他操作符构建在 HttpOperator 之上,这些操作符依赖于该功能,并且已经有很多用户在使用它。

在第一个示例中,我们使用 json 数据调用 POST,并在我们获得相同的 json 数据时成功,否则任务将失败。

tests/system/http/example_http.py[源代码]

task_post_op = HttpOperator(
    task_id="post_op",
    endpoint="post",
    data=json.dumps({"priority": 5}),
    headers={"Content-Type": "application/json"},
    response_check=lambda response: response.json()["json"]["priority"] == 5,
    dag=dag,
)

这里我们调用一个 GET 请求并向其传递参数。无论响应文本如何,任务都将成功。

tests/system/http/example_http.py[源代码]

task_get_op = HttpOperator(
    task_id="get_op",
    method="GET",
    endpoint="get",
    data={"param1": "value1", "param2": "value2"},
    headers={},
    dag=dag,
)

HttpOperator 默认返回响应正文作为文本。如果您想在将其传递到下一个下游任务之前修改响应,请使用 response_filter。这在以下情况下很有用

  • 您正在使用的 API 返回一个大型 JSON 有效负载,并且您只对数据的一个子集感兴趣

  • API 以 xml 或 csv 格式返回数据,您想将其转换为 JSON

  • 您对响应的标头而不是正文感兴趣

下面是一个从 REST API 检索数据并仅返回嵌套属性而不是完整响应正文的示例。

tests/system/http/example_http.py[源代码]

task_get_op_response_filter = HttpOperator(
    task_id="get_op_response_filter",
    method="GET",
    endpoint="get",
    response_filter=lambda response: response.json()["nested"]["property"],
    dag=dag,
)

在第三个示例中,我们根据提供给请求的数据执行 PUT 操作来放置/设置数据。

tests/system/http/example_http.py[源代码]

task_put_op = HttpOperator(
    task_id="put_op",
    method="PUT",
    endpoint="put",
    data=json.dumps({"priority": 5}),
    headers={"Content-Type": "application/json"},
    dag=dag,
)

在此示例中,我们对 delete 端点调用 DELETE 操作。这次我们将表单数据传递给请求。

tests/system/http/example_http.py[源代码]

task_del_op = HttpOperator(
    task_id="del_op",
    method="DELETE",
    endpoint="delete",
    data="some=data",
    headers={"Content-Type": "application/x-www-form-urlencoded"},
    dag=dag,
)

在这里,我们将表单数据传递给 POST 操作,这等同于通常的表单提交。

tests/system/http/example_http.py[源代码]

task_post_op_formenc = HttpOperator(
    task_id="post_op_formenc",
    endpoint="post",
    data="name=Joe",
    headers={"Content-Type": "application/x-www-form-urlencoded"},
    dag=dag,
)

HttpOperator 还允许重复调用 API 端点,通常是为了循环访问其页面。所有 API 响应都由操作符存储在内存中,并在一个结果中返回。因此,与非分页调用相比,它可能会占用更多的内存和 CPU。

默认情况下,HttpOperator 的结果将成为 Response.text 的列表(而不是单个 Response.text 对象)。

示例 - 假设您的 API 返回一个包含光标的 JSON 正文:您可以编写一个 pagination_function,它将接收您请求的原始 request.Response 对象,并根据此光标生成新的请求参数(作为 dict)。HttpOperator 将重复调用 API,直到该函数停止返回任何内容。

tests/system/http/example_http.py[源代码]



def get_next_page_cursor(response) -> dict | None:
    """
    Take the raw `request.Response` object, and check for a cursor.
    If a cursor exists, this function creates and return parameters to call
    the next page of result.
    """
    next_cursor = response.json().get("cursor")
    if next_cursor:
        return dict(data={"cursor": next_cursor})
    return None


task_get_paginated = HttpOperator(
    task_id="get_paginated",
    method="GET",
    endpoint="get",
    data={"cursor": ""},
    pagination_function=get_next_page_cursor,
    dag=dag,
)

此条目是否有帮助?