HTTP 操作器¶
以下代码示例使用 http_default
连接,这意味着请求是针对 httpbin 网站发送的,以执行基本的 HTTP 操作。
HttpSensor¶
使用 HttpSensor
不断探测,直到 response_check
可调用对象的值为 true
。
这里我们不断探测,直到 httpbin 给我们一个包含 httpbin
的响应文本。
task_http_sensor_check = HttpSensor(
task_id="http_sensor_check",
http_conn_id="http_default",
endpoint="",
request_params={},
response_check=lambda response: "httpbin" in response.text,
poke_interval=5,
dag=dag,
)
此传感器也可以在可延迟模式下使用
task_http_sensor_check_async = HttpSensor(
task_id="http_sensor_check_async",
http_conn_id="http_default",
endpoint="",
deferrable=True,
request_params={},
poke_interval=5,
dag=dag,
)
HttpOperator¶
使用 HttpOperator
调用 HTTP 请求并获取响应文本。
警告
通过 HttpOperator 配置 https
是反直觉的
出于历史原因,通过 HTTP 操作器配置 HTTPS
连接非常困难且反直觉。操作器默认为 http
协议,您可以通过 scheme
连接属性更改操作器使用的方案。但是,此字段最初是为数据库类型的 URI 添加到连接的,其中数据库方案传统上设置为 URI path
的第一个组件。因此,如果您想通过 URI 配置为 https
连接,则需要将 https
方案传递给 HttpOperator。尽管看起来很愚蠢,但您的连接 URI 将如下所示:http://your_host:443/https
。然后,如果您想在 HttpOperator 中使用不同的 URL 路径,则应在运行任务时将路径作为 endpoint
参数传递。例如,要运行对 https://your_host:443/my_endpoint
的查询,您需要将 endpoint 参数设置为 my_endpoint
。或者,如果您愿意,您也可以对主机进行百分比编码,包括 https://
前缀,并且只要它包含 ://
(百分比编码为 %3a%2f%2f
),路径的第一个组件就不会用作方案。然后,您的 URI 定义可能如下所示:http://https%3a%2f%2fyour_host:443/
。但是,在这种情况下,path
将根本不会被使用 - 如果您希望使用特定路径发出请求,您仍然需要在任务中使用 endpoint
参数。尽管这很不直观,但这是操作器/钩子工作方式的历史原因,并且在不破坏向后兼容性的情况下更改它并不容易,因为还有其他操作器构建在 HttpOperator
之上,它们依赖于该功能,并且已经有许多用户在使用它。
在第一个示例中,我们使用 json 数据调用 POST
,并在我们取回相同的 json 数据时成功,否则任务将失败。
task_post_op = HttpOperator(
task_id="post_op",
endpoint="post",
data=json.dumps({"priority": 5}),
headers={"Content-Type": "application/json"},
response_check=lambda response: response.json()["json"]["priority"] == 5,
dag=dag,
)
这里我们正在调用 GET
请求并向其传递参数。无论响应文本如何,任务都将成功。
task_get_op = HttpOperator(
task_id="get_op",
method="GET",
endpoint="get",
data={"param1": "value1", "param2": "value2"},
headers={},
dag=dag,
)
默认情况下,HttpOperator 以文本形式返回响应正文。如果您想在将响应传递到下游的下一个任务之前对其进行修改,请使用 response_filter
。这在以下情况下很有用
您正在使用的 API 返回一个大型 JSON 有效负载,而您只对其中一部分数据感兴趣
API 以 xml 或 csv 格式返回数据,而您想将其转换为 JSON
您对响应的标头而不是正文感兴趣
下面是一个从 REST API 检索数据并仅返回嵌套属性而不是完整响应正文的示例。
task_get_op_response_filter = HttpOperator(
task_id="get_op_response_filter",
method="GET",
endpoint="get",
response_filter=lambda response: response.json()["nested"]["property"],
dag=dag,
)
在第三个示例中,我们正在执行 PUT
操作,以根据提供给请求的数据放置/设置数据。
task_put_op = HttpOperator(
task_id="put_op",
method="PUT",
endpoint="put",
data=json.dumps({"priority": 5}),
headers={"Content-Type": "application/json"},
dag=dag,
)
在本例中,我们对 delete
端点调用 DELETE
操作。这次我们将表单数据传递给请求。
task_del_op = HttpOperator(
task_id="del_op",
method="DELETE",
endpoint="delete",
data="some=data",
headers={"Content-Type": "application/x-www-form-urlencoded"},
dag=dag,
)
这里我们将表单数据传递给 POST
操作,这等同于通常的表单提交。
task_post_op_formenc = HttpOperator(
task_id="post_op_formenc",
endpoint="post",
data="name=Joe",
headers={"Content-Type": "application/x-www-form-urlencoded"},
dag=dag,
)
HttpOperator
还允许重复调用 API 端点,通常是为了循环遍历其页面。所有 API 响应都由操作器存储在内存中,并以一个结果返回。因此,与非分页调用相比,它可能占用更多内存和 CPU 资源。
默认情况下,HttpOperator 的结果将成为 Response.text 列表(而不是单个 Response.text 对象)。
示例 - 假设您的 API 返回一个包含游标的 JSON 正文:您可以编写一个 pagination_function
,它将接收您的请求的原始 request.Response
对象,并根据此游标生成新的请求参数(作为 dict
)。HttpOperator 将重复调用 API,直到函数停止返回任何内容。
def get_next_page_cursor(response) -> dict | None:
"""
Take the raw `request.Response` object, and check for a cursor.
If a cursor exists, this function creates and return parameters to call
the next page of result.
"""
next_cursor = response.json().get("cursor")
if next_cursor:
return dict(data={"cursor": next_cursor})
return None
task_get_paginated = HttpOperator(
task_id="get_paginated",
method="GET",
endpoint="get",
data={"cursor": ""},
pagination_function=get_next_page_cursor,
dag=dag,
)