
在使用opensearch进行大规模数据分析时,一个常见的问题是标准搜索api(client.search)默认或最大只能返回10,000条结果。当需要检索的文档数量远超此限制时,例如进行全面的日志分析或数据导出,传统的from和size参数将不再适用,因为它们无法突破这一硬性上限。此时,opensearch提供的scroll api便成为了解决方案。
Scroll API旨在允许用户检索一个大型查询结果集,其工作原理是创建一个搜索上下文(search context),该上下文会保存查询在特定时间点的快照。通过迭代这个上下文,用户可以分批次地获取所有匹配的文档,而无需担心10,000条结果的限制。
首先,需要正确初始化opensearch-py客户端,以便与OpenSearch集群建立连接。这包括指定主机、端口、认证信息、SSL配置和连接超时等参数。
from opensearchpy import OpenSearch, RequestsHttpConnection
import csv
# 替换为你的OpenSearch集群信息
host = 'your-opensearch-host'
port = 443
auth = ('username', 'password') # 或者使用AWSV4Signer等认证方式
client = OpenSearch(
hosts=[{"host": host, "port": port}],
http_auth=auth,
use_ssl=True,
timeout=300,
verify_certs=True,
connection_class=RequestsHttpConnection,
pool_maxsize=20,
)查询体定义了你想要检索的数据的条件。为了优化性能和减少网络传输,建议只请求你需要的字段,而不是整个_source文档。这可以通过在查询体中设置_source: False并指定fields列表来实现。
query_body = {
"size": 10000, # 每次滚动获取的最大文档数,通常设置为10000
"timeout": "300s",
"query": {
"bool": {
"must": [
{"match": {"type": "req"}},
{"range": {"@timestamp": {"gte": "now-7d/d", "lte": "now/d"}}},
{"wildcard": {"req_h_user_agent": {"value": "*googlebot*"}}},
]
}
},
"fields": [
"@timestamp",
"resp_status",
"resp_bytes",
"req_h_referer",
"req_h_user_agent",
"req_h_host",
"req_uri",
"total_response_time",
],
"_source": False, # 不返回完整的_source,只返回fields中指定的字段
}使用client.search方法发起第一次搜索请求时,需要额外指定scroll参数。这个参数告诉OpenSearch保持一个搜索上下文,并指定该上下文的过期时间(例如'1m'表示1分钟)。响应中会包含一个_scroll_id,这是后续滚动请求的凭证。
index_name = "fastly-*" # 你的索引模式
initial_response = client.search(
scroll='1m', # 滚动上下文的有效期,每次滚动请求都会刷新此有效期
body=query_body,
index=index_name,
)
# 获取初始的滚动ID
scroll_id = initial_response["_scroll_id"]获取到_scroll_id后,可以通过一个循环不断调用client.scroll方法,并传入上一次请求返回的_scroll_id。每次调用都会返回下一批结果,直到hits列表为空,表示所有匹配的文档都已检索完毕。
在循环内部,可以对检索到的数据进行处理,例如写入CSV文件。
all_hits = [] # 用于存储所有检索到的文档
hits = initial_response["hits"]["hits"] # 第一次请求的文档
# 打开CSV文件并写入表头
with open("report.csv", "w", newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow(
[
"timestamp",
"url",
"response code",
"bytes",
"response_time",
"referer",
"user agent",
]
)
# 处理第一次请求返回的文档
for hit in hits:
fields = hit["fields"]
writer.writerow(
[
fields["@timestamp"][0] if "@timestamp" in fields else '',
(fields["req_h_host"][0] + fields["req_uri"][0]) if "req_h_host" in fields and "req_uri" in fields else '',
fields["resp_status"][0] if "resp_status" in fields else '',
fields["resp_bytes"][0] if "resp_bytes" in fields else '',
fields["total_response_time"][0] if "total_response_time" in fields else '',
fields["req_h_referer"][0] if "req_h_referer" in fields else '',
fields["req_h_user_agent"][0] if "req_h_user_agent" in fields else '',
]
)
# 循环获取剩余的文档
while len(hits) > 0:
scroll_response = client.scroll(
scroll='1m', # 每次滚动请求都刷新滚动上下文的有效期
scroll_id=scroll_id
)
hits = scroll_response["hits"]["hits"]
if not hits: # 如果没有更多结果,则退出循环
break
# 处理当前批次的文档
for hit in hits:
fields = hit["fields"]
writer.writerow(
[
fields["@timestamp"][0] if "@timestamp" in fields else '',
(fields["req_h_host"][0] + fields["req_uri"][0]) if "req_h_host" in fields and "req_uri" in fields else '',
fields["resp_status"][0] if "resp_status" in fields else '',
fields["resp_bytes"][0] if "resp_bytes" in fields else '',
fields["total_response_time"][0] if "total_response_time" in fields else '',
fields["req_h_referer"][0] if "req_h_referer" in fields else '',
fields["req_h_user_agent"][0] if "req_h_user_agent" in fields else '',
]
)
# 更新滚动ID,以备下一次请求使用
scroll_id = scroll_response["_scroll_id"]
print("所有结果已成功导出到 report.csv")注意事项:
通过opensearch-py结合OpenSearch的Scroll API,可以有效地突破10,000条结果的限制,实现对大规模数据集的完整检索。这种方法特别适用于数据导出、离线分析或需要处理所有匹配文档的场景。理解Scroll API的工作原理和相关注意事项,能够帮助开发者更高效、稳定地处理OpenSearch中的海量数据。
以上就是如何使用opensearchpy获取查询的所有结果的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号