日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

通過Go(OpenSearch Go)訪問向量引擎

Lindorm向量引擎支持向量數據檢索功能,兼容Elasticsearch協議,同時支持標量、向量、全文混合檢索功能。本文介紹基于Go語言,通過OpenSearch Go客戶端連接和使用向量引擎的方法。

前提條件

  • 已安裝Go環境,建議安裝Go 1.17及以上版本。

  • 已開通Lindorm向量引擎。如何開通,請參見開通向量引擎

  • 已開通Lindorm搜索引擎。具體操作,請參見開通指南

  • 已將客戶端的IP地址加入到Lindorm實例的白名單中。具體操作,請參見設置白名單

準備工作

安裝OpenSearch Go客戶端

您需要安裝OpenSearch Go客戶端,支持以下兩種方式:

  • 方式一:

    1. 修改go.mod文件配置,添加相關依賴,具體如下:

      module SearchDemo 
      
      go 1.23.0 //請替換為您的Go版本
      
      require (
      	github.com/opensearch-project/opensearch-go/v2 v2.3.0
      	github.com/google/uuid v1.6.0 // 非必須,示例代碼中生成uuid需要該項,請根據實際情況配置
      )
    2. 執行以下命令,更新go.mod文件。

      go mod tidy
  • 方式二:執行以下命令直接下載。

    go get github.com/opensearch-project/opensearch-go/v2@v2.3.0

連接搜索引擎

package main

import (
	"bufio"
	"bytes"
	"context"
	"encoding/json"
	"fmt"
	"github.com/opensearch-project/opensearch-go/v2"
	"github.com/opensearch-project/opensearch-go/v2/opensearchapi"
	"math/rand"
	"net/http"
	"strings"
	"time"
)

var client *opensearch.Client

func init() {
    // 替換為搜索引擎域名
	cfg := opensearch.Config{
		Addresses: []string{
			"http://ld-t4n5668xk31ui****-proxy-search-vpc.lindorm.aliyuncs.com:30070",
		},
		Username: "<Username>",
		Password: "<Password>",
	}
	var err error
	client, err = opensearch.NewClient(cfg)
	if err != nil {
		fmt.Println("Init client error %s", err)
	}
}

// 定義錯誤處理函數
func handlerCommonError(res *opensearchapi.Response) {
	var errorResponse map[string]interface{}
	if err := json.NewDecoder(res.Body).Decode(&errorResponse); err != nil {
		fmt.Printf("Error parsing the error response: %s\n", err)
		return
	}
	fmt.Printf("ERROR %v\n", errorResponse)
}

// 定義knnSearch返回的結構體
type Source struct {
	Field1 int `json:"field1"`
}
type KnnResponse struct {
	Hits struct {
		Hits []struct {
			ID     string  `json:"_id"`
			Score  float64 `json:"_score"`
			Source Source  `json:"_source"`
		} `json:"hits"`
	} `json:"hits"`
}

其中AddressesUsernamePassword分別為搜索引擎的連接地址、默認用戶名和默認密碼,如何獲取,請參見查看搜索引擎連接地址

創建向量索引

創建向量索引,其中vector1為向量列、field1為普通列。向量列及其相關參數必須在創建索引時通過mappings結構顯式指定。

hnsw類型索引

以創建索引vector_test為例:

func createHnswIndex() {
	indexName := "vector_test"
	vectorColumn := "vector1"
	indexBody := fmt.Sprintf(`
	{
	  "settings": {
		"index": {
		  "number_of_shards": 2,
		  "knn": true
		}
	  },
	  "mappings": {
		"_source": {
		  "excludes": ["%s"]
		},
		"properties": {
		  "%s": {
			"type": "knn_vector",
			"dimension": 3,
			"data_type": "float",
			"method": {
			  "engine": "lvector",
			  "name": "hnsw",
			  "space_type": "l2",
			  "parameters": {
				"m": 24,
				"ef_construction": 500
			  }
			}
		  },
		  "field1": {
			"type": "long"
		  }
		}
	  }
    }`, vectorColumn, vectorColumn)
	content := strings.NewReader(indexBody)
	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()
	
	req := opensearchapi.IndicesCreateRequest{
		Index: indexName,
		Body:  content,
	}
	
	res, err := req.Do(ctx, client)
	if err != nil {
		fmt.Println("Create index error: ", err)
		return
	}
	defer res.Body.Close()
	if res.IsError() {
		handlerCommonError(res)
		return
	}
	fmt.Println("Create response: ", res.String())
}

ivfpq類型索引

以創建索引vector_ivfpq_test為例:

func createIvfPQIndex() {
	indexName := "vector_ivfpq_test"
	vectorColumn := "vector1"
	// parameters中的m與dimension一致
	dim := 3
	// 下方的nlist參數,少量數據集可以用1000,正式務必用10000
	indexBody := fmt.Sprintf(`
	{
	  "settings": {
		"index": {
		  "number_of_shards": 4,
		  "knn": true,
		  "knn.offline.construction": true
		}
	  },
	  "mappings": {
		"_source": {
		  "excludes": ["%s"]
		},
		"properties": {
		  "%s": {
			"type": "knn_vector",
			"dimension": %d,
			"data_type": "float",
			"method": {
			  "engine": "lvector",
			  "name": "ivfpq",
			  "space_type": "cosinesimil",
			  "parameters": {
				"m": %d,
				"nlist": 10000,
				"centroids_use_hnsw": true,
				"centroids_hnsw_m": 48,
				"centroids_hnsw_ef_construct": 500,
				"centroids_hnsw_ef_search": 200
			  }
			}
		  },
		  "field1": {
			"type": "long"
		  }
		}
	  }
	}`, vectorColumn, vectorColumn, dim, dim)
	content := strings.NewReader(indexBody)
	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()
	// 創建索引請求
	req := opensearchapi.IndicesCreateRequest{
		Index: indexName,
		Body:  content,
	}
	res, err := req.Do(ctx, client)

	if err != nil {
		fmt.Println("Create index error ", err)
		return
	}
	defer res.Body.Close()
	fmt.Println("Create response %s", res.String())
}
重要

創建ivfpq類型索引,您必須注意以下事項:

  • ivfpq中knn.offline.construction務必設置為true,后續需要寫入一定量的數據才能發起構建索引。

  • 使用ivfpq算法請將dimension替換業務真實向量維度,并將m值設置為與dimension相同的值。

稀疏向量索引

以創建索引vector_sparse_test為例:

func createSparseVectorIndex() {
	indexName := "vector_sparse_test"
	vectorColumn := "vector1"
	indexBody := fmt.Sprintf(`
	{
	  "settings": {
		"index": {
		  "number_of_shards": 2,
		  "knn": true
		}
	  },
	  "mappings": {
		"_source": {
		  "excludes": ["%s"]
		},
		"properties": {
		  "%s": {
			"type": "knn_vector",
			"data_type": "sparse_vector",
			"method": {
			  "engine": "lvector",
			  "name": "sparse_hnsw",
			  "space_type": "innerproduct",
			  "parameters": {
				"m": 24,
				"ef_construction": 200
			  }
			}
		  },
		  "field1": {
			"type": "long"
		  }
		}
	  }
	}`, vectorColumn, vectorColumn)
	content := strings.NewReader(indexBody)
	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()
	req := opensearchapi.IndicesCreateRequest{
		Index: indexName,
		Body:  content,
	}
	res, err := req.Do(ctx, client)
	if err != nil {
		fmt.Println("Create index error: ", err)
		return
	}
	defer res.Body.Close()
	if res.IsError() {
		handlerCommonError(res)
		return
	}
	fmt.Println("Create response: ", res.String())
}
重要

創建稀疏向量時,data_type參數必須填寫sparse_vector,name參數必須填寫sparse_hnsw,space_type參數必須填寫innerproduct

參數說明

通用參數

參數

是否必填

說明

type

索引列的類型。對于向量列,固定為knn_vector

dimension

向量數據的維度。取值范圍:[1,32768]。

data_type

向量數據的類型。目前支持以下類型:

  • float

  • float16

  • sparse_vector(僅稀疏向量索引支持該類型)

method.name

向量數據的索引算法。取值如下:

  • flat

  • hnsw

  • ivfpq

  • sparse_hnsw(僅稀疏向量索引支持該算法)

method.space_type

向量數據的距離算法。取值如下:

  • l2(默認值):歐式距離。

  • cosinesimil:余弦距離。

  • innerproduct:內積距離。

HNSW算法參數

參數

是否必填

說明

method.parameters.m

每一層圖的最大出邊數量。

取值范圍:[1,100]。默認值為16

method.parameters.ef_construction

索引構建時動態列表的長度。

取值范圍:[1,1000]。默認值為100

IVFPQ算法參數

參數

是否必填

說明

method.parameters.m

量化中子空間的數量。取值范圍:[2,32768]。默認值為16

重要

創建ivfpq類型索引時,該參數值必須與通用參數dimension的值相同。

method.parameters.nlist

聚類中心的數量。

取值范圍:[2, 1000000]。默認值為10000

method.parameters.centroids_use_hnsw

是否在聚類中心搜索時使用HNSW算法。

取值如下:

  • true(默認值

  • false

method.parameters.centroids_hnsw_m

若在聚類中心搜索時使用HNSW算法,設定HNSW算法的每一層圖的最大出邊數量。

取值范圍:[1,100]。默認值為16

method.parameters.centroids_hnsw_ef_construct

若在聚類中心搜索時使用HNSW算法,設定HNSW算法在索引構建時動態列表的長度。

取值范圍:[1,1000]。默認值為100

method.parameters.centroids_hnsw_ef_search

若在聚類中心搜索時使用HNSW算法,設定HNSW算法在查詢時動態列表的長度。

取值范圍:[1,1000]。默認值為100

數據寫入

包含向量列的索引的數據寫入方式與普通索引的數據寫入方式一致。

單條寫入

您可以根據業務需要選擇插入寫或更新寫。

插入寫

若目標數據已存在,則不允許寫入。以寫入索引vector_test為例:

func writeDataCreate() {
	// 插入寫
	doc := strings.NewReader(`{
		"field1": 2,
		"vector1": [2.2, 2.3, 2.4]
	}`)

	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()

	req := opensearchapi.CreateRequest{
		Index:      "vector_test",
		DocumentID: "2",
		Body:       doc,
	}

	res, err := req.Do(ctx, client)
	if err != nil {
		fmt.Println("Write index error ", err)
	}
	defer res.Body.Close()
}

更新寫

以寫入索引vector_test為例:

func writeDataIndex() {
	// 更新寫
	doc := strings.NewReader(`{
		"field1": 1,
		"vector1": [1.2, 1.3, 1.4]
	}`)

	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()

	req := opensearchapi.IndexRequest{
		Index:      "vector_test",
		DocumentID: "1",
		Body:       doc,
	}
	res, err := req.Do(ctx, client)
	if err != nil {
		fmt.Println("Write index error ", err)
	}
        defer res.Body.Close()
}

批量寫入

通用寫入

_bulk寫入方式為指定JSON寫入。以寫入索引vector_test為例,具體如下:

// 人為指定json串
func writeDataBulk() {
	// bulk 寫入
	doc := strings.NewReader(`
		{ "index" : { "_index" : "vector_test", "_id" : "3" } }
		{ "field1" : 3, "vector1": [3.2, 3.3, 3.4] }
		{ "create" : { "_index" : "vector_test", "_id" : "4" } }
		{ "field1" : 4, "vector1": [4.2, 4.3, 4.4] }
		{ "delete" : { "_index" : "vector_test", "_id" : "2" } }
		{ "update" : {"_id" : "1", "_index" : "vector_test"} }
		{ "doc" : {"field1" : 3, "vector1": [2.21, 3.31, 4.41]} }
	`)

	// 確保每行后都有換行符
	var buffer bytes.Buffer
	scanner := bufio.NewScanner(doc)
	for scanner.Scan() {
		buffer.WriteString(scanner.Text())
		buffer.WriteString("\n")
	}

	res, err := client.Bulk(&buffer)
	fmt.Println(res.String())
	if err != nil {
		fmt.Println("Bulk write error ", err)
	}
	defer res.Body.Close()
}

您也可以定義數據寫入邏輯,通過_bulk一次性寫入大量數據。以寫入索引vector_ivfpq_test為例:

// 代碼樣例是隨機生成dim維度的向量,實際業務替換
// 生成格式 [0.7862035,0.9371029,0.50112325]
func randomDenseVector(dim int) string {
	random := rand.New(rand.NewSource(time.Now().UnixNano()))
	randomArray := make([]float32, dim)
	for i := range randomArray {
		randomArray[i] = random.Float32()
	}
	var formattedArray []string
	for _, num := range randomArray {
		formattedArray = append(formattedArray, fmt.Sprintf("%.8f", num))
	}
	result := "[" + strings.Join(formattedArray, ", ") + "]"
	return result
}

func bulkWriteDenseVector() {
	indexName := "vector_ivfpq_test"
	var buf bytes.Buffer
	// 添加多條數據,bulk寫入也要控制每次寫入的條數
	for i := 1; i <= 20; i++ {
		// 寫20次,每次寫入100條
		for j := (i-1)*100 + 1; j <= i*100; j++ {
			// id 可以根據業務自己指定,向量的random請替換為業務自己的。create可以替換為index
			meta := []byte(fmt.Sprintf(`{ "index": { "_index": "%s", "_id": "%d" } }`, indexName, j))
			data := map[string]interface{}{
				"vector1": randomDenseVector(3),
				"field1":  i,
			}
			dataJson, err := json.Marshal(data)
			if err != nil {
				fmt.Println("Error marshaling JSON:", err)
				continue
			}
			// 將操作寫入緩沖區
			buf.Grow(len(meta) + len(dataJson) + 2) // 預留空間
			buf.Write(meta)
			buf.WriteByte('\n') // 換行
			buf.Write(dataJson)
			buf.WriteByte('\n') // 換行
		}

		// 執行 Bulk 請求
		bulkReq := opensearchapi.BulkRequest{
			Body:  &buf,
			Index: indexName,
		}

		// 設置超時
		ctx, cancel := context.WithTimeout(context.Background(), 30*2*time.Second)
		defer cancel()

		res, err := bulkReq.Do(ctx, client)
		if err != nil {
			fmt.Println("happend err", err)
		}
		defer res.Body.Close()
	}
}

稀疏向量寫入

以寫入索引vector_sparse_test為例:

type Data struct {
	Indices []int     `json:"indices"`
	Values  []float32 `json:"values"`
}

/*
* 格式參考curl使用文檔,稀疏向量的格式說明
{"indices":[91,30,92],"values":[0.7862035,0.9371029,0.50112325]}
*/
func randomSparseVector() string {
	random := rand.New(rand.NewSource(time.Now().UnixNano()))
	// 隨機生成 indices
	sparsedim := random.Intn(16) + 2
	indices := make([]int, sparsedim)
	for i := range indices {
		indices[i] = random.Intn(100) // 生成 0-99 之間的隨機整數
	}
	// 隨機生成 values
	values := make([]float32, sparsedim)
	for i := range values {
		values[i] = random.Float32() // 生成 0.0-1.0 之間的隨機浮點數
	}
	// 創建數據結構
	data := Data{
		Indices: indices,
		Values:  values,
	}
	// 將 data 編碼為 JSON
	jsonData, err := json.Marshal(data)
	if err != nil {
		fmt.Println("Error marshaling to JSON:", err)
		return ""
	}
	// 打印生成的 JSON 字符串
	return string(jsonData)
}

func bulkWriteSparseVector() {
	indexName := "vector_sparse_test"
	var buf bytes.Buffer
	// 添加多條數據,bulk寫入也要控制每次寫入的條數,不能過大
	for i := 5; i <= 50; i++ {
		// id 可以根據業務自己指定,向量的random請替換為業務自己的。create可以替換為index
		meta := []byte(fmt.Sprintf(`{ "index": { "_index": "%s", "_id": "%d" } }`, indexName, i))
		data := map[string]interface{}{
			"vector1": randomSparseVector(),
			"field1":  i,
		}
		dataJson, err := json.Marshal(data)
		if err != nil {
			fmt.Println("Error marshaling JSON: ", err)
			continue
		}
		// 將操作寫入緩沖區
		buf.Grow(len(meta) + len(dataJson) + 2) // 預留空間
		buf.Write(meta)
		buf.WriteByte('\n') // 換行
		buf.Write(dataJson)
		buf.WriteByte('\n') // 換行
	}

	// 執行 Bulk 請求
	bulkReq := opensearchapi.BulkRequest{
		Body:  &buf,
		Index: indexName,
	}

	// 設置超時
	ctx, cancel := context.WithTimeout(context.Background(), 30*2*time.Second)
	defer cancel()

	res, err := bulkReq.Do(ctx, client)
	if err != nil {
		fmt.Println("happend err", err)
	}
	defer res.Body.Close()
}

索引構建

重要
  • 除ivfpq索引,其他類型索引創建時index.knn.offline.construction默認為false,即在線索引,無需手動構建。

  • 在觸發ivfpq索引構建前需注意:在創建ivfpq索引時,需將index.knn.offline.construction顯式指定為true,且在發起構建時務必確保已寫入足夠的數據量,必須大于256條且超過nlist的30倍。

  • 手動觸發索引構建完成后,后續可正常寫入和查詢,無需再次構建索引

觸發構建

構建索引,可以使用client.Transport.Perform封裝HTTP POST請求。以構建索引vector_ivfpq_test為例:

func buildIndex() {
	indexName := "vector_ivfpq_test"
	vectorField := "vector1"
	bodyBuild := map[string]interface{}{
		"indexName":      indexName,
		"fieldName":      vectorField,
		"removeOldIndex": "true",
	}
	url := "/_plugins/_vector/index/build"
	buf := new(bytes.Buffer)
	if err := json.NewEncoder(buf).Encode(bodyBuild); err != nil {
		fmt.Println("Encode error ", err)
		return
	}
	buildIndexRequest, err := http.NewRequest("POST", url, buf)
	if err != nil {
		fmt.Println("Build err ", err)
		return
	}
	buildIndexRequest.Header.Set("Accept", "application/json")
	buildIndexRequest.Header.Set("Content-Type", "application/json")
	res, err := client.Transport.Perform(buildIndexRequest)
	if err != nil {
		fmt.Println("Err ", err)
		return
	}
	defer res.Body.Close()
	response := opensearchapi.Response{
		StatusCode: res.StatusCode,
		Body:       res.Body,
		Header:     res.Header,
	}
  
	fmt.Println("Response:", response.String())
    // 定義結構體解析結果
	type BuildResponse struct {
		Payload []string `json:"payload"`
	}

	var taskRes BuildResponse
	if err := json.NewDecoder(response.Body).Decode(&taskRes); err != nil {
		fmt.Println("Decode error:", err)
		return
	}

	fmt.Println("TaskId: ", taskRes.Payload[0])
}

參數說明

參數

是否必填

說明

indexName

表名稱,例如vector_ivfpq_test

fieldName

針對哪個字段構建索引,例如vector1

removeOldIndex

構建索引時,是否刪除舊的索引。取值如下:

  • true:在觸發構建時,會刪除舊的索引數據,在構建完成后才能進行knn查詢。

    重要

    實際業務使用,建議設置為true

  • false(默認值):會保留舊的索引,但會影響檢索性能。

查看索引狀態

可以使用client.Transport.Perform封裝HTTP GET請求。以查看索引vector_ivfpq_test為例:

func getIndexStatus() {
	indexName := "vector_ivfpq_test"
	vectorField := "vector1"
	bodyBuild := map[string]interface{}{
		"indexName": indexName,
		"fieldName": vectorField,
		"taskIds":   "[]",
	}
	url := "/_plugins/_vector/index/tasks"
	buf := new(bytes.Buffer)
	if err := json.NewEncoder(buf).Encode(bodyBuild); err != nil {
		fmt.Println("Err ", err)
		return
	}
	req, err := http.NewRequest("GET", url, buf)
	if err != nil {
		fmt.Println("Err ", err)
	}
	req.Header.Set("Accept", "application/json")
	req.Header.Set("Content-Type", "application/json")
	res, err := client.Transport.Perform(req)
	if err != nil {
		fmt.Println("Err ", err)
	}
	defer res.Body.Close()
	response := opensearchapi.Response{
		StatusCode: res.StatusCode,
		Body:       res.Body,
		Header:     res.Header,
	}
	responseStr := response.String()
	fmt.Println("Response:", responseStr)
	// 定義結構體解析結果
	type GetIndexResponse struct {
		Payload []string `json:"payload"`
	}
	var indexRes GetIndexResponse
	if err := json.NewDecoder(response.Body).Decode(&indexRes); err != nil {
		fmt.Println("Decode error:", err)
		return
	}
	indexResStr := indexRes.Payload[0]
	fmt.Println("GetIndex Build Status response:", indexResStr)
    // 如果不是這個狀態,程序可以循環 sleep 等待
	contains := strings.Contains(indexResStr, "stage: FINISH")
	if contains {
		fmt.Println("Index Build Success:")
	}
}

終止構建

可以使用client.Transport.Perform封裝HTTP POST請求。以終止索引vector_ivfpq_test的構建為例:

func abortIndex() {
   indexName := "vector_ivfpq_test"
   vectorField := "vector1"
   bodyBuild := map[string]interface{}{
      "indexName": indexName,
      "fieldName": vectorField,
      "taskIds":   "[\"default_vector_ivfpq_test_vector1\"]",
   }
   url := "/_plugins/_vector/index/tasks/abort"
   buf := new(bytes.Buffer)
   if err := json.NewEncoder(buf).Encode(bodyBuild); err != nil {
      fmt.Println("Encode error ", err)
      return
   }
   abortReq, err := http.NewRequest("POST", url, buf)
   if err != nil {
      fmt.Println("Abort index err ", err)
      return
   }
   abortReq.Header.Set("Accept", "application/json")
   abortReq.Header.Set("Content-Type", "application/json")
   res, err := client.Transport.Perform(abortReq)
   if err != nil {
      fmt.Println("")
      return
   }
   defer res.Body.Close()
   response := opensearchapi.Response{
      StatusCode: res.StatusCode,
      Body:       res.Body,
      Header:     res.Header,
   }
   fmt.Println("Abort index response:", response.String())
}

數據查詢

純向量數據查詢

純向量數據的查詢可以通過knn結構實現。以查詢索引vector_test為例:

func knnSearch() {
	indexName := "vector_test"
	vectorField := "vector1"
	knnString := fmt.Sprintf(`{
	  "size": 10,
	  "query": {
		"knn": {
		  "%s": {
			"vector": %s,
			"k": 10
		  }
		}
	  }
	}`, vectorField, randomDenseVector(3))

	content := strings.NewReader(knnString)

	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()

	search := opensearchapi.SearchRequest{
		Index: []string{indexName},
		Body:  content,
	}
	searchResponse, err := search.Do(ctx, client)
	if err != nil {
		fmt.Println("Knn err ", err)
		return
	}
	defer searchResponse.Body.Close()
 	// fmt.Println("knn result ", searchResponse.String())
        if searchResponse.IsError() {
		handlerCommonError(searchResponse)
		return
	}
	var responseBody KnnResponse
	if err := json.NewDecoder(searchResponse.Body).Decode(&responseBody); err != nil {
		fmt.Println("Knn result Decode err ", err)
		return
	}
	// 打印每個文檔的 ID 和得分
	for _, hit := range responseBody.Hits.Hits {
		fmt.Printf("ID: %s, Score: %f\n", hit.ID, hit.Score)
	}
}

返回指定字段

如果需要在查詢時返回指定字段,可以指定 "_source": ["field1", "field2"] 或使用"_source": true 返回非向量的全部字段。以查詢索引vector_test為例,使用方法如下:

func knnSearchHnswWithSource() {
	indexName := "vector_test"
	vectorField := "vector1"
	knnString := fmt.Sprintf(`{
	  "size": 10,
      "_source": ["field1"],
	  "query": {
		"knn": {
		  "%s": {
			"vector": %s,
			"k": 10
		  }
		}
	  }
	}`, vectorField, randomDenseVector(3))

	content := strings.NewReader(knnString)
	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()

	search := opensearchapi.SearchRequest{
		Index: []string{indexName},
		Body:  content,
	}
	searchResponse, err := search.Do(ctx, client)
	if err != nil {
		fmt.Println("Knn err ", err)
		return
	}
	defer searchResponse.Body.Close()
	// fmt.Println("knn result ", searchResponse.String())
	if searchResponse.IsError() {
		handlerCommonError(searchResponse)
		return
	}
	var responseBody KnnResponse
	if err := json.NewDecoder(searchResponse.Body).Decode(&responseBody); err != nil {
		fmt.Println("Knn result Decode err ", err)
		return
	}

	for _, hit := range responseBody.Hits.Hits {
		fmt.Printf("ID: %s, Score: %f, Field1 %d \n", hit.ID, hit.Score, hit.Source.Field1)
	}
}

返回結果如下:

單擊展開返回結果

{
  "took" : 35,
  "timed_out" : false,
  "terminated_early" : false,
  "num_reduce_phases" : 0,
  "_shards" : {
    "total" : 2,
    "successful" : 2,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 3,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "vector_test",
        "_id" : "2",
        "_score" : 1.0,
        "_source" : {
          "field1" : 1
        }
      },
      {
        "_index" : "vector_test",
        "_id" : "1",
        "_score" : 0.25,
        "_source" : {
          "field1" : 1
        }
      },
      {
        "_index" : "vector_test",
        "_id" : "3",
        "_score" : 0.14285715,
        "_source" : {
          "field1" : 2
        }
      }
    ]
  }
}

hsnw算法查詢

以查詢hsnw索引vector_test為例:

func knnSearchHnsw() {
	indexName := "vector_test"
	vectorField := "vector1"
	knnString := fmt.Sprintf(`{
	  "size": 10,
	  "query": {
		"knn": {
		  "%s": {
			"vector": %s,
			"k": 10
		  }
		}
	  },
     "ext": {"lvector": {"ef_search": "100"}}
	}`, vectorField, randomDenseVector(3))

	content := strings.NewReader(knnString)

	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()

	search := opensearchapi.SearchRequest{
		Index: []string{indexName},
		Body:  content,
	}
	searchResponse, err := search.Do(ctx, client)
	if err != nil {
		fmt.Println("Knn err ", err)
		return
	}
	defer searchResponse.Body.Close()
	// fmt.Println("Knn result ", searchResponse.String())
	if searchResponse.IsError() {
		handlerCommonError(searchResponse)
		return
	}
	var responseBody KnnResponse
	if err := json.NewDecoder(searchResponse.Body).Decode(&responseBody); err != nil {
		fmt.Println("Knn result Decode err", err)
		return
	}
	for _, hit := range responseBody.Hits.Hits {
		fmt.Printf("ID: %s, Score: %f\n", hit.ID, hit.Score)
	}
}

ivfpq算法查詢

以查詢ivfpq索引vector_ivfpq_test為例:

func knnSearchIvfpq() {
	indexName := "vector_test"
	vectorField := "vector1"
	knnString := fmt.Sprintf(`{
	  "size": 10,
	  "query": {
		"knn": {
		  "%s": {
			"vector": %s,
			"k": 10
		  }
		}
	  },
      "ext": {"lvector": {"nprobe": "60", "reorder_factor": "5"}}
	}`, vectorField, "[2.21, 3.31, 4.41]")

	content := strings.NewReader(knnString)
	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()

	search := opensearchapi.SearchRequest{
		Index: []string{indexName},
		Body:  content,
	}
	searchResponse, err := search.Do(ctx, client)
	if err != nil {
		fmt.Println("Knn err", err)
		return
	}
	defer searchResponse.Body.Close()
	// fmt.Println("Knn result", searchResponse.String())
        if searchResponse.IsError() {
		handlerCommonError(searchResponse)
		return
	}
	var responseBody KnnResponse
	if err := json.NewDecoder(searchResponse.Body).Decode(&responseBody); err != nil {
		fmt.Println("Knn result Decode err", err)
		return
	}
	for _, hit := range responseBody.Hits.Hits {
		fmt.Printf("ID: %s, Score: %f\n", hit.ID, hit.Score)
	}
}

稀疏向量查詢

以查詢稀疏向量索引vector_sparse_test為例:

func knnSearchSparseVector() {
	indexName := "vector_sparse_test"
	vectorField := "vector1"
	knnString := fmt.Sprintf(`{
	  "size": 10,
	  "query": {
		"knn": {
		  "%s": {
			"vector": %s,
			"k": 10
		  }
		}
	  }
	}`, vectorField, randomSparseVector())

	content := strings.NewReader(knnString)
	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()

	search := opensearchapi.SearchRequest{
		Index: []string{indexName},
		Body:  content,
	}
	searchResponse, err := search.Do(ctx, client)
	if err != nil {
		fmt.Println("Knn err ", err)
		return
	}
	defer searchResponse.Body.Close()
	// fmt.Println("Knn result ", searchResponse.String())
	if searchResponse.IsError() {
		handlerCommonError(searchResponse)
		return
	}
	var responseBody KnnResponse
	if err := json.NewDecoder(searchResponse.Body).Decode(&responseBody); err != nil {
		fmt.Println("Knn result Decode err ", err)
		return
	}
	for _, hit := range responseBody.Hits.Hits {
		fmt.Printf("ID: %s, Score: %f\n", hit.ID, hit.Score)
	}
}

融合查詢

向量列的查詢可與普通列的查詢條件結合,并返回綜合的查詢結果。在實際業務使用時, Post_Filter近似查詢通常能獲取更相似的檢索結果。

Pre-Filter近似查詢

通過在knn查詢結構內部添加過濾器filter,并指定filter_type參數的值為pre_filter,可實現先過濾結構化數據,再查詢向量數據。

說明

目前結構化過濾數據的上限為10,000條。

以查詢索引vector_test為例:

func preFilterKnnSearch() {
	indexName := "vector_test"
	vectorField := "vector1"
	knnString := fmt.Sprintf(`{
	  "size": 10,
	  "query": {
		"knn": {
		  "%s": {
			"vector": %s,
			"k": 10,
			"filter": {
			  "range": {
				"field1": {
				  "gte": 1
				}
			  }
			}
		  }
		}
	  },
     "ext": {"lvector": {"filter_type": "pre_filter"}}
	}`, vectorField, randomDenseVector(3))

	content := strings.NewReader(knnString)

	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()

	search := opensearchapi.SearchRequest{
		Index: []string{indexName},
		Body:  content,
	}
	searchResponse, err := search.Do(ctx, client)
	if err != nil {
		fmt.Println("Knn err", err)
		return
	}
	defer searchResponse.Body.Close()
	// fmt.Println("Knn result", searchResponse.String())
        if searchResponse.IsError() {
		handlerCommonError(searchResponse)
		return
	}
	var responseBody KnnResponse
	if err := json.NewDecoder(searchResponse.Body).Decode(&responseBody); err != nil {
		fmt.Println("Knn result Decode err", err)
		return
	}
	for _, hit := range responseBody.Hits.Hits {
		fmt.Printf("ID: %s, Score: %f\n", hit.ID, hit.Score)
	}
}

Post-Filter近似查詢

通過在knn查詢結構內部添加過濾器filter,并指定filter_type參數的值為post_filter,可實現先查詢向量數據,再過濾結構化數據。

說明

在使用Post_Filter近似查詢時,可以適當將k的值設置大一些,以便獲取更多的向量數據再進行過濾。

以查詢索引vector_ivfpq_test為例:

func postFilterKnnSearch() {
	indexName := "vector_ivfpq_test"
	vectorField := "vector1"
	knnTopk := 1000
	knnString := fmt.Sprintf(`{
	  "size": 10,
	  "query": {
		"knn": {
		  "%s": {
			"vector": %s,
			"k": %d,
	        "filter": {
			  "range": {
				"field1": {
				  "gte": 0
				}
			  }
			}
		  }
		}
	  },
      "ext": {"lvector": {"filter_type": "post_filter", "nprobe": "100", "reorder_factor": "1"}}
	}`, vectorField, "[2.21, 3.31, 4.41]", knnTopk)

	content := strings.NewReader(knnString)
	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()

	search := opensearchapi.SearchRequest{
		Index: []string{indexName},
		Body:  content,
	}
	searchResponse, err := search.Do(ctx, client)
	if err != nil {
		fmt.Println("Knn err %s", err)
		return
	}
	defer searchResponse.Body.Close()
	// fmt.Println("Knn result %s", searchResponse.String())
	if searchResponse.IsError() {
		handlerCommonError(searchResponse)
		return
	}
	var responseBody KnnResponse
	if err := json.NewDecoder(searchResponse.Body).Decode(&responseBody); err != nil {
		fmt.Println("Knn result Decode err %s", err)
		return
	}
	// 打印每個文檔的 ID 和得分
	for _, hit := range responseBody.Hits.Hits {
		fmt.Printf("ID: %s, Score: %f\n", hit.ID, hit.Score)
	}
}

常規用法

常規用法提供索引基礎的查詢、刪除等使用方法。

  • 查詢所有索引及其數據量。

    func catIndices() {
    	req := opensearchapi.CatIndicesRequest{}
    	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    	defer cancel()
    	res, err := req.Do(ctx, client)
    	if err != nil {
    		fmt.Println("CatIndices err ", err)
    		return
    	}
    	defer res.Body.Close()
    	fmt.Println("CatIndices  response:", res.String())
    }

    返回結果:

    health status index        uuid        pri rep docs.count docs.deleted store.size pri.store.size
    green  open   vector_test  vector_test 2   0          2            0      6.8kb          6.8kb
  • 查詢指定索引的數據量。以查詢索引vector_ivfpq_test為例:

    func getIndexCount() {
    	indexName := "vector_ivfpq_test"
    	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    	defer cancel()
    	req := opensearchapi.CountRequest{
    		Index: []string{indexName},
    	}
    	res, err := req.Do(ctx, client)
    	if err != nil {
    		fmt.Println("Count err %s", err)
    		return
    	}
    	defer res.Body.Close()
    	fmt.Println("Count result %s", res.String())
    	
    	type CountResponse struct {
    		Count int `json:"count"`
    	}
    	var countResponse CountResponse
    	if err := json.NewDecoder(res.Body).Decode(&countResponse); err != nil {
    		fmt.Printf("Error decoding response: %s\n", err)
    		return
    	}
    	fmt.Printf("Count result: %d\n", countResponse.Count)
    }

    返回結果:

    {
      "count" : 2,
      "_shards" : {
        "total" : 2,
        "successful" : 2,
        "skipped" : 0,
        "failed" : 0
      }
    }
  • 查看索引創建信息。以查詢索引vector_test為例:

    func getIndex() {
    	indexName := "vector_test"
    	req := opensearchapi.IndicesGetRequest{
    		Index: []string{indexName},
    	}
    	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    	defer cancel()
    	res, err := req.Do(ctx, client)
    	if err != nil {
    		fmt.Println("DeleteByQuery err ", err)
    		return
    	}
    	defer res.Body.Close()
    	if res.IsError() {
    		fmt.Printf("Error: %s\n", res.String())
    		return
    	}
    	fmt.Println("GetIndex  response:", res.String())
    	var indexData map[string]interface{}
    	if err := json.NewDecoder(res.Body).Decode(&indexData); err != nil {
    		fmt.Println("Error parsing the response body:", err)
    		return
    	}
    
    	// 如獲取 space_type
    	if properties, ok := indexData[indexName].(map[string]interface{})["mappings"].(map[string]interface{})["properties"].(map[string]interface{})["vector1"].(map[string]interface{})["method"].(map[string]interface{})["space_type"]; ok {
    		if spaceType, ok := properties.(string); ok {
    			fmt.Printf("space_type: %s\n", spaceType)
    		}
    	}
    }

    返回結果:

    單擊展開返回結果

    {
      "vector_test" : {
        "aliases" : { },
        "mappings" : {
          "_source" : {
            "excludes" : [
              "vector1"
            ]
          },
          "properties" : {
            "field1" : {
              "type" : "long"
            },
            "vector1" : {
              "type" : "knn_vector",
              "dimension" : 3,
              "data_type" : "float",
              "method" : {
                "engine" : "lvector",
                "space_type" : "l2",
                "name" : "hnsw",
                "parameters" : {
                  "ef_construction" : 200,
                  "m" : 24
                }
              }
            }
          }
        },
        "settings" : {
          "index" : {
            "search" : {
              "slowlog" : {
                "level" : "DEBUG",
                "threshold" : {
                  "fetch" : {
                    "warn" : "1s",
                    "trace" : "200ms",
                    "debug" : "500ms",
                    "info" : "800ms"
                  },
                  "query" : {
                    "warn" : "10s",
                    "trace" : "500ms",
                    "debug" : "1s",
                    "info" : "5s"
                  }
                }
              }
            },
            "indexing" : {
              "slowlog" : {
                "level" : "DEBUG",
                "threshold" : {
                  "index" : {
                    "warn" : "10s",
                    "trace" : "500ms",
                    "debug" : "2s",
                    "info" : "5s"
                  }
                }
              }
            },
            "number_of_shards" : "2",
            "provided_name" : "vector_test",
            "knn" : "true",
            "creation_date" : "1727169417350",
            "number_of_replicas" : "0",
            "uuid" : "vector_test",
            "version" : {
              "created" : "136287927"
            }
          }
        }
      }
    }
  • 刪除整個索引。以刪除索引vector_ivfpq_test為例:

    func deleteIndex() {
    	indexName := "vector_ivfpq_test"
    	req := opensearchapi.IndicesDeleteRequest{
    		Index: []string{indexName},
    	}
    	// 刪除索引需要設置較長的超時值
    	ctx, cancel := context.WithTimeout(context.Background(), 120*2*time.Second)
    	defer cancel()
    	rep, err := req.Do(ctx, client)
    	if err != nil {
    		fmt.Println("Delete err %s", err)
    	}
    	defer rep.Body.Close()
    	fmt.Println("deleteIndex  response:", rep.String())
    }
  • 通過查詢刪除符合查詢條件的指定數據。

    func deleteByQuery() {
    	indexName := "vector_test"
    	key := "field1"
    	value := 1
    	queryStr := fmt.Sprintf(`{
    	  "query": {
    		"term": {
    		  "%s": %d
    		}
    	  }
    	}`, key, value)
    
    	content := strings.NewReader(queryStr)
    	req := opensearchapi.DeleteByQueryRequest{
    		Index: []string{indexName},
    		Body:  content,
    	}
    	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    	defer cancel()
    	res, err := req.Do(ctx, client)
    	
    	if err != nil {
    		fmt.Println("DeleteByQuery err %v", err)
    		return
    	}
    	defer res.Body.Close()
    	// fmt.Println("DeleteByQuery  response:", res.String())
    }