OpenSearch¶
操作符¶
在 OpenSearch 中创建索引¶
使用 OpenSearchCreateIndexOperator
在 OpenSearch 域中创建新索引。
create_index = OpenSearchCreateIndexOperator(
task_id="create_index",
index_name=INDEX_NAME,
index_body={"settings": {"index": {"number_of_shards": 1}}},
)
将文档添加到 OpenSearch 上的索引¶
使用 OpenSearchAddDocumentOperator
将单个文档添加到 OpenSearch 索引
add_document_by_args = OpenSearchAddDocumentOperator(
task_id="add_document_with_args",
index_name=INDEX_NAME,
doc_id=1,
document={"log_group_id": 1, "logger": "python", "message": "hello world"},
)
add_document_by_class = OpenSearchAddDocumentOperator(
task_id="add_document_by_class",
doc_class=LogDocument(log_group_id=2, logger="airflow", message="hello airflow"),
)
针对 OpenSearch 索引运行查询¶
使用 OpenSearchQueryOperator
针对 OpenSearch 索引运行查询。
search_low_level = OpenSearchQueryOperator(
task_id="low_level_query",
index_name="system_test",
query={"query": {"bool": {"must": {"match": {"message": "hello world"}}}}},
)
search = Search()
search._index = [INDEX_NAME]
search_object = search.filter("term", logger="airflow").query("match", message="hello airflow")
search_high_level = OpenSearchQueryOperator(task_id="high_level_query", search_object=search_object)