Lindorm向量引擎支持向量數據檢索功能,兼容Elasticsearch協議,同時支持標量、向量、全文混合檢索功能。本文介紹基于Go語言,通過OpenSearch Go客戶端連接和使用向量引擎的方法。
前提條件
準備工作
安裝OpenSearch Go客戶端
您需要安裝OpenSearch Go客戶端,支持以下兩種方式:
方式一:
修改
go.mod
文件配置,添加相關依賴,具體如下:module SearchDemo go 1.23.0 //請替換為您的Go版本 require ( github.com/opensearch-project/opensearch-go/v2 v2.3.0 github.com/google/uuid v1.6.0 // 非必須,示例代碼中生成uuid需要該項,請根據實際情況配置 )
執行以下命令,更新
go.mod
文件。go mod tidy
方式二:執行以下命令直接下載。
go get github.com/opensearch-project/opensearch-go/v2@v2.3.0
連接搜索引擎
package main
import (
"bufio"
"bytes"
"context"
"encoding/json"
"fmt"
"github.com/opensearch-project/opensearch-go/v2"
"github.com/opensearch-project/opensearch-go/v2/opensearchapi"
"math/rand"
"net/http"
"strings"
"time"
)
var client *opensearch.Client
func init() {
// 替換為搜索引擎域名
cfg := opensearch.Config{
Addresses: []string{
"http://ld-t4n5668xk31ui****-proxy-search-vpc.lindorm.aliyuncs.com:30070",
},
Username: "<Username>",
Password: "<Password>",
}
var err error
client, err = opensearch.NewClient(cfg)
if err != nil {
fmt.Println("Init client error %s", err)
}
}
// 定義錯誤處理函數
func handlerCommonError(res *opensearchapi.Response) {
var errorResponse map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&errorResponse); err != nil {
fmt.Printf("Error parsing the error response: %s\n", err)
return
}
fmt.Printf("ERROR %v\n", errorResponse)
}
// 定義knnSearch返回的結構體
type Source struct {
Field1 int `json:"field1"`
}
type KnnResponse struct {
Hits struct {
Hits []struct {
ID string `json:"_id"`
Score float64 `json:"_score"`
Source Source `json:"_source"`
} `json:"hits"`
} `json:"hits"`
}
其中Addresses、Username和Password分別為搜索引擎的連接地址、默認用戶名和默認密碼,如何獲取,請參見查看搜索引擎連接地址。
創建向量索引
創建向量索引,其中vector1
為向量列、field1
為普通列。向量列及其相關參數必須在創建索引時通過mappings結構顯式指定。
hnsw類型索引
以創建索引vector_test
為例:
func createHnswIndex() {
indexName := "vector_test"
vectorColumn := "vector1"
indexBody := fmt.Sprintf(`
{
"settings": {
"index": {
"number_of_shards": 2,
"knn": true
}
},
"mappings": {
"_source": {
"excludes": ["%s"]
},
"properties": {
"%s": {
"type": "knn_vector",
"dimension": 3,
"data_type": "float",
"method": {
"engine": "lvector",
"name": "hnsw",
"space_type": "l2",
"parameters": {
"m": 24,
"ef_construction": 500
}
}
},
"field1": {
"type": "long"
}
}
}
}`, vectorColumn, vectorColumn)
content := strings.NewReader(indexBody)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
req := opensearchapi.IndicesCreateRequest{
Index: indexName,
Body: content,
}
res, err := req.Do(ctx, client)
if err != nil {
fmt.Println("Create index error: ", err)
return
}
defer res.Body.Close()
if res.IsError() {
handlerCommonError(res)
return
}
fmt.Println("Create response: ", res.String())
}
ivfpq類型索引
以創建索引vector_ivfpq_test
為例:
func createIvfPQIndex() {
indexName := "vector_ivfpq_test"
vectorColumn := "vector1"
// parameters中的m與dimension一致
dim := 3
// 下方的nlist參數,少量數據集可以用1000,正式務必用10000
indexBody := fmt.Sprintf(`
{
"settings": {
"index": {
"number_of_shards": 4,
"knn": true,
"knn.offline.construction": true
}
},
"mappings": {
"_source": {
"excludes": ["%s"]
},
"properties": {
"%s": {
"type": "knn_vector",
"dimension": %d,
"data_type": "float",
"method": {
"engine": "lvector",
"name": "ivfpq",
"space_type": "cosinesimil",
"parameters": {
"m": %d,
"nlist": 10000,
"centroids_use_hnsw": true,
"centroids_hnsw_m": 48,
"centroids_hnsw_ef_construct": 500,
"centroids_hnsw_ef_search": 200
}
}
},
"field1": {
"type": "long"
}
}
}
}`, vectorColumn, vectorColumn, dim, dim)
content := strings.NewReader(indexBody)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// 創建索引請求
req := opensearchapi.IndicesCreateRequest{
Index: indexName,
Body: content,
}
res, err := req.Do(ctx, client)
if err != nil {
fmt.Println("Create index error ", err)
return
}
defer res.Body.Close()
fmt.Println("Create response %s", res.String())
}
創建ivfpq類型索引,您必須注意以下事項:
ivfpq中knn.offline.construction務必設置為
true
,后續需要寫入一定量的數據才能發起構建索引。使用ivfpq算法請將dimension替換業務真實向量維度,并將m值設置為與dimension相同的值。
稀疏向量索引
以創建索引vector_sparse_test
為例:
func createSparseVectorIndex() {
indexName := "vector_sparse_test"
vectorColumn := "vector1"
indexBody := fmt.Sprintf(`
{
"settings": {
"index": {
"number_of_shards": 2,
"knn": true
}
},
"mappings": {
"_source": {
"excludes": ["%s"]
},
"properties": {
"%s": {
"type": "knn_vector",
"data_type": "sparse_vector",
"method": {
"engine": "lvector",
"name": "sparse_hnsw",
"space_type": "innerproduct",
"parameters": {
"m": 24,
"ef_construction": 200
}
}
},
"field1": {
"type": "long"
}
}
}
}`, vectorColumn, vectorColumn)
content := strings.NewReader(indexBody)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
req := opensearchapi.IndicesCreateRequest{
Index: indexName,
Body: content,
}
res, err := req.Do(ctx, client)
if err != nil {
fmt.Println("Create index error: ", err)
return
}
defer res.Body.Close()
if res.IsError() {
handlerCommonError(res)
return
}
fmt.Println("Create response: ", res.String())
}
創建稀疏向量時,data_type參數必須填寫sparse_vector
,name參數必須填寫sparse_hnsw
,space_type參數必須填寫innerproduct
。
參數說明
通用參數
參數 | 是否必填 | 說明 |
type | 是 | 索引列的類型。對于向量列,固定為 |
dimension | 是 | 向量數據的維度。取值范圍:[1,32768]。 |
data_type | 否 | 向量數據的類型。目前支持以下類型:
|
method.name | 是 | 向量數據的索引算法。取值如下:
|
method.space_type | 否 | 向量數據的距離算法。取值如下:
|
HNSW算法參數
參數 | 是否必填 | 說明 |
method.parameters.m | 否 | 每一層圖的最大出邊數量。 取值范圍:[1,100]。默認值為 |
method.parameters.ef_construction | 否 | 索引構建時動態列表的長度。 取值范圍:[1,1000]。默認值為 |
IVFPQ算法參數
參數 | 是否必填 | 說明 |
method.parameters.m | 否 | 量化中子空間的數量。取值范圍:[2,32768]。默認值為 重要 創建ivfpq類型索引時,該參數值必須與通用參數dimension的值相同。 |
method.parameters.nlist | 否 | 聚類中心的數量。 取值范圍:[2, 1000000]。默認值為 |
method.parameters.centroids_use_hnsw | 否 | 是否在聚類中心搜索時使用HNSW算法。 取值如下:
|
method.parameters.centroids_hnsw_m | 否 | 若在聚類中心搜索時使用HNSW算法,設定HNSW算法的每一層圖的最大出邊數量。 取值范圍:[1,100]。默認值為 |
method.parameters.centroids_hnsw_ef_construct | 否 | 若在聚類中心搜索時使用HNSW算法,設定HNSW算法在索引構建時動態列表的長度。 取值范圍:[1,1000]。默認值為 |
method.parameters.centroids_hnsw_ef_search | 否 | 若在聚類中心搜索時使用HNSW算法,設定HNSW算法在查詢時動態列表的長度。 取值范圍:[1,1000]。默認值為 |
數據寫入
包含向量列的索引的數據寫入方式與普通索引的數據寫入方式一致。
單條寫入
您可以根據業務需要選擇插入寫或更新寫。
插入寫
若目標數據已存在,則不允許寫入。以寫入索引vector_test
為例:
func writeDataCreate() {
// 插入寫
doc := strings.NewReader(`{
"field1": 2,
"vector1": [2.2, 2.3, 2.4]
}`)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
req := opensearchapi.CreateRequest{
Index: "vector_test",
DocumentID: "2",
Body: doc,
}
res, err := req.Do(ctx, client)
if err != nil {
fmt.Println("Write index error ", err)
}
defer res.Body.Close()
}
更新寫
以寫入索引vector_test
為例:
func writeDataIndex() {
// 更新寫
doc := strings.NewReader(`{
"field1": 1,
"vector1": [1.2, 1.3, 1.4]
}`)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
req := opensearchapi.IndexRequest{
Index: "vector_test",
DocumentID: "1",
Body: doc,
}
res, err := req.Do(ctx, client)
if err != nil {
fmt.Println("Write index error ", err)
}
defer res.Body.Close()
}
批量寫入
通用寫入
_bulk寫入方式為指定JSON寫入。以寫入索引vector_test
為例,具體如下:
// 人為指定json串
func writeDataBulk() {
// bulk 寫入
doc := strings.NewReader(`
{ "index" : { "_index" : "vector_test", "_id" : "3" } }
{ "field1" : 3, "vector1": [3.2, 3.3, 3.4] }
{ "create" : { "_index" : "vector_test", "_id" : "4" } }
{ "field1" : 4, "vector1": [4.2, 4.3, 4.4] }
{ "delete" : { "_index" : "vector_test", "_id" : "2" } }
{ "update" : {"_id" : "1", "_index" : "vector_test"} }
{ "doc" : {"field1" : 3, "vector1": [2.21, 3.31, 4.41]} }
`)
// 確保每行后都有換行符
var buffer bytes.Buffer
scanner := bufio.NewScanner(doc)
for scanner.Scan() {
buffer.WriteString(scanner.Text())
buffer.WriteString("\n")
}
res, err := client.Bulk(&buffer)
fmt.Println(res.String())
if err != nil {
fmt.Println("Bulk write error ", err)
}
defer res.Body.Close()
}
您也可以定義數據寫入邏輯,通過_bulk一次性寫入大量數據。以寫入索引vector_ivfpq_test
為例:
// 代碼樣例是隨機生成dim維度的向量,實際業務替換
// 生成格式 [0.7862035,0.9371029,0.50112325]
func randomDenseVector(dim int) string {
random := rand.New(rand.NewSource(time.Now().UnixNano()))
randomArray := make([]float32, dim)
for i := range randomArray {
randomArray[i] = random.Float32()
}
var formattedArray []string
for _, num := range randomArray {
formattedArray = append(formattedArray, fmt.Sprintf("%.8f", num))
}
result := "[" + strings.Join(formattedArray, ", ") + "]"
return result
}
func bulkWriteDenseVector() {
indexName := "vector_ivfpq_test"
var buf bytes.Buffer
// 添加多條數據,bulk寫入也要控制每次寫入的條數
for i := 1; i <= 20; i++ {
// 寫20次,每次寫入100條
for j := (i-1)*100 + 1; j <= i*100; j++ {
// id 可以根據業務自己指定,向量的random請替換為業務自己的。create可以替換為index
meta := []byte(fmt.Sprintf(`{ "index": { "_index": "%s", "_id": "%d" } }`, indexName, j))
data := map[string]interface{}{
"vector1": randomDenseVector(3),
"field1": i,
}
dataJson, err := json.Marshal(data)
if err != nil {
fmt.Println("Error marshaling JSON:", err)
continue
}
// 將操作寫入緩沖區
buf.Grow(len(meta) + len(dataJson) + 2) // 預留空間
buf.Write(meta)
buf.WriteByte('\n') // 換行
buf.Write(dataJson)
buf.WriteByte('\n') // 換行
}
// 執行 Bulk 請求
bulkReq := opensearchapi.BulkRequest{
Body: &buf,
Index: indexName,
}
// 設置超時
ctx, cancel := context.WithTimeout(context.Background(), 30*2*time.Second)
defer cancel()
res, err := bulkReq.Do(ctx, client)
if err != nil {
fmt.Println("happend err", err)
}
defer res.Body.Close()
}
}
稀疏向量寫入
以寫入索引vector_sparse_test
為例:
type Data struct {
Indices []int `json:"indices"`
Values []float32 `json:"values"`
}
/*
* 格式參考curl使用文檔,稀疏向量的格式說明
{"indices":[91,30,92],"values":[0.7862035,0.9371029,0.50112325]}
*/
func randomSparseVector() string {
random := rand.New(rand.NewSource(time.Now().UnixNano()))
// 隨機生成 indices
sparsedim := random.Intn(16) + 2
indices := make([]int, sparsedim)
for i := range indices {
indices[i] = random.Intn(100) // 生成 0-99 之間的隨機整數
}
// 隨機生成 values
values := make([]float32, sparsedim)
for i := range values {
values[i] = random.Float32() // 生成 0.0-1.0 之間的隨機浮點數
}
// 創建數據結構
data := Data{
Indices: indices,
Values: values,
}
// 將 data 編碼為 JSON
jsonData, err := json.Marshal(data)
if err != nil {
fmt.Println("Error marshaling to JSON:", err)
return ""
}
// 打印生成的 JSON 字符串
return string(jsonData)
}
func bulkWriteSparseVector() {
indexName := "vector_sparse_test"
var buf bytes.Buffer
// 添加多條數據,bulk寫入也要控制每次寫入的條數,不能過大
for i := 5; i <= 50; i++ {
// id 可以根據業務自己指定,向量的random請替換為業務自己的。create可以替換為index
meta := []byte(fmt.Sprintf(`{ "index": { "_index": "%s", "_id": "%d" } }`, indexName, i))
data := map[string]interface{}{
"vector1": randomSparseVector(),
"field1": i,
}
dataJson, err := json.Marshal(data)
if err != nil {
fmt.Println("Error marshaling JSON: ", err)
continue
}
// 將操作寫入緩沖區
buf.Grow(len(meta) + len(dataJson) + 2) // 預留空間
buf.Write(meta)
buf.WriteByte('\n') // 換行
buf.Write(dataJson)
buf.WriteByte('\n') // 換行
}
// 執行 Bulk 請求
bulkReq := opensearchapi.BulkRequest{
Body: &buf,
Index: indexName,
}
// 設置超時
ctx, cancel := context.WithTimeout(context.Background(), 30*2*time.Second)
defer cancel()
res, err := bulkReq.Do(ctx, client)
if err != nil {
fmt.Println("happend err", err)
}
defer res.Body.Close()
}
索引構建
除ivfpq索引,其他類型索引創建時index.knn.offline.construction默認為
false
,即在線索引,無需手動構建。在觸發ivfpq索引構建前需注意:在創建ivfpq索引時,需將index.knn.offline.construction顯式指定為
true
,且在發起構建時務必確保已寫入足夠的數據量,必須大于256條且超過nlist的30倍。手動觸發索引構建完成后,后續可正常寫入和查詢,無需再次構建索引。
觸發構建
構建索引,可以使用client.Transport.Perform
封裝HTTP POST請求。以構建索引vector_ivfpq_test
為例:
func buildIndex() {
indexName := "vector_ivfpq_test"
vectorField := "vector1"
bodyBuild := map[string]interface{}{
"indexName": indexName,
"fieldName": vectorField,
"removeOldIndex": "true",
}
url := "/_plugins/_vector/index/build"
buf := new(bytes.Buffer)
if err := json.NewEncoder(buf).Encode(bodyBuild); err != nil {
fmt.Println("Encode error ", err)
return
}
buildIndexRequest, err := http.NewRequest("POST", url, buf)
if err != nil {
fmt.Println("Build err ", err)
return
}
buildIndexRequest.Header.Set("Accept", "application/json")
buildIndexRequest.Header.Set("Content-Type", "application/json")
res, err := client.Transport.Perform(buildIndexRequest)
if err != nil {
fmt.Println("Err ", err)
return
}
defer res.Body.Close()
response := opensearchapi.Response{
StatusCode: res.StatusCode,
Body: res.Body,
Header: res.Header,
}
fmt.Println("Response:", response.String())
// 定義結構體解析結果
type BuildResponse struct {
Payload []string `json:"payload"`
}
var taskRes BuildResponse
if err := json.NewDecoder(response.Body).Decode(&taskRes); err != nil {
fmt.Println("Decode error:", err)
return
}
fmt.Println("TaskId: ", taskRes.Payload[0])
}
參數說明
參數 | 是否必填 | 說明 |
indexName | 是 | 表名稱,例如 |
fieldName | 是 | 針對哪個字段構建索引,例如 |
removeOldIndex | 是 | 構建索引時,是否刪除舊的索引。取值如下:
|
查看索引狀態
可以使用client.Transport.Perform
封裝HTTP GET請求。以查看索引vector_ivfpq_test
為例:
func getIndexStatus() {
indexName := "vector_ivfpq_test"
vectorField := "vector1"
bodyBuild := map[string]interface{}{
"indexName": indexName,
"fieldName": vectorField,
"taskIds": "[]",
}
url := "/_plugins/_vector/index/tasks"
buf := new(bytes.Buffer)
if err := json.NewEncoder(buf).Encode(bodyBuild); err != nil {
fmt.Println("Err ", err)
return
}
req, err := http.NewRequest("GET", url, buf)
if err != nil {
fmt.Println("Err ", err)
}
req.Header.Set("Accept", "application/json")
req.Header.Set("Content-Type", "application/json")
res, err := client.Transport.Perform(req)
if err != nil {
fmt.Println("Err ", err)
}
defer res.Body.Close()
response := opensearchapi.Response{
StatusCode: res.StatusCode,
Body: res.Body,
Header: res.Header,
}
responseStr := response.String()
fmt.Println("Response:", responseStr)
// 定義結構體解析結果
type GetIndexResponse struct {
Payload []string `json:"payload"`
}
var indexRes GetIndexResponse
if err := json.NewDecoder(response.Body).Decode(&indexRes); err != nil {
fmt.Println("Decode error:", err)
return
}
indexResStr := indexRes.Payload[0]
fmt.Println("GetIndex Build Status response:", indexResStr)
// 如果不是這個狀態,程序可以循環 sleep 等待
contains := strings.Contains(indexResStr, "stage: FINISH")
if contains {
fmt.Println("Index Build Success:")
}
}
終止構建
可以使用client.Transport.Perform
封裝HTTP POST請求。以終止索引vector_ivfpq_test
的構建為例:
func abortIndex() {
indexName := "vector_ivfpq_test"
vectorField := "vector1"
bodyBuild := map[string]interface{}{
"indexName": indexName,
"fieldName": vectorField,
"taskIds": "[\"default_vector_ivfpq_test_vector1\"]",
}
url := "/_plugins/_vector/index/tasks/abort"
buf := new(bytes.Buffer)
if err := json.NewEncoder(buf).Encode(bodyBuild); err != nil {
fmt.Println("Encode error ", err)
return
}
abortReq, err := http.NewRequest("POST", url, buf)
if err != nil {
fmt.Println("Abort index err ", err)
return
}
abortReq.Header.Set("Accept", "application/json")
abortReq.Header.Set("Content-Type", "application/json")
res, err := client.Transport.Perform(abortReq)
if err != nil {
fmt.Println("")
return
}
defer res.Body.Close()
response := opensearchapi.Response{
StatusCode: res.StatusCode,
Body: res.Body,
Header: res.Header,
}
fmt.Println("Abort index response:", response.String())
}
數據查詢
純向量數據查詢
純向量數據的查詢可以通過knn
結構實現。以查詢索引vector_test
為例:
func knnSearch() {
indexName := "vector_test"
vectorField := "vector1"
knnString := fmt.Sprintf(`{
"size": 10,
"query": {
"knn": {
"%s": {
"vector": %s,
"k": 10
}
}
}
}`, vectorField, randomDenseVector(3))
content := strings.NewReader(knnString)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
search := opensearchapi.SearchRequest{
Index: []string{indexName},
Body: content,
}
searchResponse, err := search.Do(ctx, client)
if err != nil {
fmt.Println("Knn err ", err)
return
}
defer searchResponse.Body.Close()
// fmt.Println("knn result ", searchResponse.String())
if searchResponse.IsError() {
handlerCommonError(searchResponse)
return
}
var responseBody KnnResponse
if err := json.NewDecoder(searchResponse.Body).Decode(&responseBody); err != nil {
fmt.Println("Knn result Decode err ", err)
return
}
// 打印每個文檔的 ID 和得分
for _, hit := range responseBody.Hits.Hits {
fmt.Printf("ID: %s, Score: %f\n", hit.ID, hit.Score)
}
}
返回指定字段
如果需要在查詢時返回指定字段,可以指定 "_source": ["field1", "field2"]
或使用"_source": true
返回非向量的全部字段。以查詢索引vector_test
為例,使用方法如下:
func knnSearchHnswWithSource() {
indexName := "vector_test"
vectorField := "vector1"
knnString := fmt.Sprintf(`{
"size": 10,
"_source": ["field1"],
"query": {
"knn": {
"%s": {
"vector": %s,
"k": 10
}
}
}
}`, vectorField, randomDenseVector(3))
content := strings.NewReader(knnString)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
search := opensearchapi.SearchRequest{
Index: []string{indexName},
Body: content,
}
searchResponse, err := search.Do(ctx, client)
if err != nil {
fmt.Println("Knn err ", err)
return
}
defer searchResponse.Body.Close()
// fmt.Println("knn result ", searchResponse.String())
if searchResponse.IsError() {
handlerCommonError(searchResponse)
return
}
var responseBody KnnResponse
if err := json.NewDecoder(searchResponse.Body).Decode(&responseBody); err != nil {
fmt.Println("Knn result Decode err ", err)
return
}
for _, hit := range responseBody.Hits.Hits {
fmt.Printf("ID: %s, Score: %f, Field1 %d \n", hit.ID, hit.Score, hit.Source.Field1)
}
}
返回結果如下:
{
"took" : 35,
"timed_out" : false,
"terminated_early" : false,
"num_reduce_phases" : 0,
"_shards" : {
"total" : 2,
"successful" : 2,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 3,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "vector_test",
"_id" : "2",
"_score" : 1.0,
"_source" : {
"field1" : 1
}
},
{
"_index" : "vector_test",
"_id" : "1",
"_score" : 0.25,
"_source" : {
"field1" : 1
}
},
{
"_index" : "vector_test",
"_id" : "3",
"_score" : 0.14285715,
"_source" : {
"field1" : 2
}
}
]
}
}
hsnw算法查詢
以查詢hsnw索引vector_test
為例:
func knnSearchHnsw() {
indexName := "vector_test"
vectorField := "vector1"
knnString := fmt.Sprintf(`{
"size": 10,
"query": {
"knn": {
"%s": {
"vector": %s,
"k": 10
}
}
},
"ext": {"lvector": {"ef_search": "100"}}
}`, vectorField, randomDenseVector(3))
content := strings.NewReader(knnString)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
search := opensearchapi.SearchRequest{
Index: []string{indexName},
Body: content,
}
searchResponse, err := search.Do(ctx, client)
if err != nil {
fmt.Println("Knn err ", err)
return
}
defer searchResponse.Body.Close()
// fmt.Println("Knn result ", searchResponse.String())
if searchResponse.IsError() {
handlerCommonError(searchResponse)
return
}
var responseBody KnnResponse
if err := json.NewDecoder(searchResponse.Body).Decode(&responseBody); err != nil {
fmt.Println("Knn result Decode err", err)
return
}
for _, hit := range responseBody.Hits.Hits {
fmt.Printf("ID: %s, Score: %f\n", hit.ID, hit.Score)
}
}
ivfpq算法查詢
以查詢ivfpq索引vector_ivfpq_test
為例:
func knnSearchIvfpq() {
indexName := "vector_test"
vectorField := "vector1"
knnString := fmt.Sprintf(`{
"size": 10,
"query": {
"knn": {
"%s": {
"vector": %s,
"k": 10
}
}
},
"ext": {"lvector": {"nprobe": "60", "reorder_factor": "5"}}
}`, vectorField, "[2.21, 3.31, 4.41]")
content := strings.NewReader(knnString)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
search := opensearchapi.SearchRequest{
Index: []string{indexName},
Body: content,
}
searchResponse, err := search.Do(ctx, client)
if err != nil {
fmt.Println("Knn err", err)
return
}
defer searchResponse.Body.Close()
// fmt.Println("Knn result", searchResponse.String())
if searchResponse.IsError() {
handlerCommonError(searchResponse)
return
}
var responseBody KnnResponse
if err := json.NewDecoder(searchResponse.Body).Decode(&responseBody); err != nil {
fmt.Println("Knn result Decode err", err)
return
}
for _, hit := range responseBody.Hits.Hits {
fmt.Printf("ID: %s, Score: %f\n", hit.ID, hit.Score)
}
}
稀疏向量查詢
以查詢稀疏向量索引vector_sparse_test
為例:
func knnSearchSparseVector() {
indexName := "vector_sparse_test"
vectorField := "vector1"
knnString := fmt.Sprintf(`{
"size": 10,
"query": {
"knn": {
"%s": {
"vector": %s,
"k": 10
}
}
}
}`, vectorField, randomSparseVector())
content := strings.NewReader(knnString)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
search := opensearchapi.SearchRequest{
Index: []string{indexName},
Body: content,
}
searchResponse, err := search.Do(ctx, client)
if err != nil {
fmt.Println("Knn err ", err)
return
}
defer searchResponse.Body.Close()
// fmt.Println("Knn result ", searchResponse.String())
if searchResponse.IsError() {
handlerCommonError(searchResponse)
return
}
var responseBody KnnResponse
if err := json.NewDecoder(searchResponse.Body).Decode(&responseBody); err != nil {
fmt.Println("Knn result Decode err ", err)
return
}
for _, hit := range responseBody.Hits.Hits {
fmt.Printf("ID: %s, Score: %f\n", hit.ID, hit.Score)
}
}
融合查詢
向量列的查詢可與普通列的查詢條件結合,并返回綜合的查詢結果。在實際業務使用時, Post_Filter近似查詢通常能獲取更相似的檢索結果。
Pre-Filter近似查詢
通過在knn查詢結構內部添加過濾器filter,并指定filter_type參數的值為pre_filter
,可實現先過濾結構化數據,再查詢向量數據。
目前結構化過濾數據的上限為10,000條。
以查詢索引vector_test
為例:
func preFilterKnnSearch() {
indexName := "vector_test"
vectorField := "vector1"
knnString := fmt.Sprintf(`{
"size": 10,
"query": {
"knn": {
"%s": {
"vector": %s,
"k": 10,
"filter": {
"range": {
"field1": {
"gte": 1
}
}
}
}
}
},
"ext": {"lvector": {"filter_type": "pre_filter"}}
}`, vectorField, randomDenseVector(3))
content := strings.NewReader(knnString)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
search := opensearchapi.SearchRequest{
Index: []string{indexName},
Body: content,
}
searchResponse, err := search.Do(ctx, client)
if err != nil {
fmt.Println("Knn err", err)
return
}
defer searchResponse.Body.Close()
// fmt.Println("Knn result", searchResponse.String())
if searchResponse.IsError() {
handlerCommonError(searchResponse)
return
}
var responseBody KnnResponse
if err := json.NewDecoder(searchResponse.Body).Decode(&responseBody); err != nil {
fmt.Println("Knn result Decode err", err)
return
}
for _, hit := range responseBody.Hits.Hits {
fmt.Printf("ID: %s, Score: %f\n", hit.ID, hit.Score)
}
}
Post-Filter近似查詢
通過在knn查詢結構內部添加過濾器filter,并指定filter_type參數的值為post_filter
,可實現先查詢向量數據,再過濾結構化數據。
在使用Post_Filter近似查詢時,可以適當將k的值設置大一些,以便獲取更多的向量數據再進行過濾。
以查詢索引vector_ivfpq_test
為例:
func postFilterKnnSearch() {
indexName := "vector_ivfpq_test"
vectorField := "vector1"
knnTopk := 1000
knnString := fmt.Sprintf(`{
"size": 10,
"query": {
"knn": {
"%s": {
"vector": %s,
"k": %d,
"filter": {
"range": {
"field1": {
"gte": 0
}
}
}
}
}
},
"ext": {"lvector": {"filter_type": "post_filter", "nprobe": "100", "reorder_factor": "1"}}
}`, vectorField, "[2.21, 3.31, 4.41]", knnTopk)
content := strings.NewReader(knnString)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
search := opensearchapi.SearchRequest{
Index: []string{indexName},
Body: content,
}
searchResponse, err := search.Do(ctx, client)
if err != nil {
fmt.Println("Knn err %s", err)
return
}
defer searchResponse.Body.Close()
// fmt.Println("Knn result %s", searchResponse.String())
if searchResponse.IsError() {
handlerCommonError(searchResponse)
return
}
var responseBody KnnResponse
if err := json.NewDecoder(searchResponse.Body).Decode(&responseBody); err != nil {
fmt.Println("Knn result Decode err %s", err)
return
}
// 打印每個文檔的 ID 和得分
for _, hit := range responseBody.Hits.Hits {
fmt.Printf("ID: %s, Score: %f\n", hit.ID, hit.Score)
}
}
常規用法
常規用法提供索引基礎的查詢、刪除等使用方法。
查詢所有索引及其數據量。
func catIndices() { req := opensearchapi.CatIndicesRequest{} ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() res, err := req.Do(ctx, client) if err != nil { fmt.Println("CatIndices err ", err) return } defer res.Body.Close() fmt.Println("CatIndices response:", res.String()) }
返回結果:
health status index uuid pri rep docs.count docs.deleted store.size pri.store.size green open vector_test vector_test 2 0 2 0 6.8kb 6.8kb
查詢指定索引的數據量。以查詢索引
vector_ivfpq_test
為例:func getIndexCount() { indexName := "vector_ivfpq_test" ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() req := opensearchapi.CountRequest{ Index: []string{indexName}, } res, err := req.Do(ctx, client) if err != nil { fmt.Println("Count err %s", err) return } defer res.Body.Close() fmt.Println("Count result %s", res.String()) type CountResponse struct { Count int `json:"count"` } var countResponse CountResponse if err := json.NewDecoder(res.Body).Decode(&countResponse); err != nil { fmt.Printf("Error decoding response: %s\n", err) return } fmt.Printf("Count result: %d\n", countResponse.Count) }
返回結果:
{ "count" : 2, "_shards" : { "total" : 2, "successful" : 2, "skipped" : 0, "failed" : 0 } }
查看索引創建信息。以查詢索引
vector_test
為例:func getIndex() { indexName := "vector_test" req := opensearchapi.IndicesGetRequest{ Index: []string{indexName}, } ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() res, err := req.Do(ctx, client) if err != nil { fmt.Println("DeleteByQuery err ", err) return } defer res.Body.Close() if res.IsError() { fmt.Printf("Error: %s\n", res.String()) return } fmt.Println("GetIndex response:", res.String()) var indexData map[string]interface{} if err := json.NewDecoder(res.Body).Decode(&indexData); err != nil { fmt.Println("Error parsing the response body:", err) return } // 如獲取 space_type if properties, ok := indexData[indexName].(map[string]interface{})["mappings"].(map[string]interface{})["properties"].(map[string]interface{})["vector1"].(map[string]interface{})["method"].(map[string]interface{})["space_type"]; ok { if spaceType, ok := properties.(string); ok { fmt.Printf("space_type: %s\n", spaceType) } } }
返回結果:
{ "vector_test" : { "aliases" : { }, "mappings" : { "_source" : { "excludes" : [ "vector1" ] }, "properties" : { "field1" : { "type" : "long" }, "vector1" : { "type" : "knn_vector", "dimension" : 3, "data_type" : "float", "method" : { "engine" : "lvector", "space_type" : "l2", "name" : "hnsw", "parameters" : { "ef_construction" : 200, "m" : 24 } } } } }, "settings" : { "index" : { "search" : { "slowlog" : { "level" : "DEBUG", "threshold" : { "fetch" : { "warn" : "1s", "trace" : "200ms", "debug" : "500ms", "info" : "800ms" }, "query" : { "warn" : "10s", "trace" : "500ms", "debug" : "1s", "info" : "5s" } } } }, "indexing" : { "slowlog" : { "level" : "DEBUG", "threshold" : { "index" : { "warn" : "10s", "trace" : "500ms", "debug" : "2s", "info" : "5s" } } } }, "number_of_shards" : "2", "provided_name" : "vector_test", "knn" : "true", "creation_date" : "1727169417350", "number_of_replicas" : "0", "uuid" : "vector_test", "version" : { "created" : "136287927" } } } } }
刪除整個索引。以刪除索引
vector_ivfpq_test
為例:func deleteIndex() { indexName := "vector_ivfpq_test" req := opensearchapi.IndicesDeleteRequest{ Index: []string{indexName}, } // 刪除索引需要設置較長的超時值 ctx, cancel := context.WithTimeout(context.Background(), 120*2*time.Second) defer cancel() rep, err := req.Do(ctx, client) if err != nil { fmt.Println("Delete err %s", err) } defer rep.Body.Close() fmt.Println("deleteIndex response:", rep.String()) }
通過查詢刪除符合查詢條件的指定數據。
func deleteByQuery() { indexName := "vector_test" key := "field1" value := 1 queryStr := fmt.Sprintf(`{ "query": { "term": { "%s": %d } } }`, key, value) content := strings.NewReader(queryStr) req := opensearchapi.DeleteByQueryRequest{ Index: []string{indexName}, Body: content, } ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() res, err := req.Do(ctx, client) if err != nil { fmt.Println("DeleteByQuery err %v", err) return } defer res.Body.Close() // fmt.Println("DeleteByQuery response:", res.String()) }