日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

基于RAG搭建知識庫在線問答

針對知識庫在線問答場景,AI搜索開放平臺提供完整的RAG開發鏈路搭建方法,整體鏈路包含數據預處理、檢索服務以及問答總結生成三大模塊。AI搜索開放平臺將各模塊可用的算法服務組件化,可靈活選擇各模塊使用的服務,如文檔解析、排序、問答總結服務等,快速生成開發代碼。AI搜索開放平臺以API形式透出服務,開發者只需將代碼下載到本地,根據本文操作步驟替換API_KEY、API調用地址、本地知識庫等信息,即可快速搭建基于RAG開發鏈路的知識庫問答應用。

技術原理

檢索增強生成RAG(Retrieval-Augmented Generation)結合了檢索技術和生成技術的人工智能方法,旨在提升模型生成內容的相關性、準確性和多樣性。處理生成任務時,RAG首先在大量外部數據或知識庫中檢索與輸入最相關的片段,然后將檢索到的信息與原始輸入一起輸入到大語言模型(LLM)中,作為提示或者上下文引導模型生成更精確和豐富的回答。這種方法允許模型在生成響應時不僅依賴于其內部的參數和訓練數據,還可以使用外部最新或特定領域的信息提升回答準確性。

基于rag智能問答技術實現圖-流程圖.jpg

應用場景

知識庫在線問答常用于企業內部知識庫檢索與總結、垂直領域的在線問答等業務場景,基于客戶的專業知識庫文檔,通過RAG(檢索增強生成)技術和LLM(大語言模型) ,理解和響應復雜的自然語言查詢,幫助企業客戶通過自然語言快速從PDF、WORD、表格、圖片文檔中檢索到所需信息。

image

前提條件

  • 開通AI搜索開放平臺服務,詳情請參見開通服務

  • 獲取服務調用地址和身份鑒權信息,詳情請參見獲取服務接入地址獲取API-KEY

    AI搜索開放平臺支持通過公網和VPC地址調用服務,且可通過VPC實現跨地域調用服務。目前支持上海、杭州、深圳、北京、張家口、青島地域的用戶,通過VPC地址調用AI搜索開放平臺的服務。

    image

  • 創建阿里云Elasticsearch(ES)實例,要求ES 8.5及以上版本,詳情請參見創建阿里云Elasticsearch實例。通過公網或私網訪問阿里云ES實例時,需要將待訪問設備的IP地址加入實例的訪問白名單中,詳情請參見配置實例公網或私網訪問白名單

  • Python版本3.7及以上,在開發環境中引入Python包依賴aiohttp 3.8.6、elasticsearch 8.14。

RAG開發鏈路搭建

說明

為方便用戶使用,AI搜索開放平臺提供四種類型的開發框架:

  • Java SDK。

  • Python SDK。

  • 如果業務已經使用LangChain開發框架,在開發框架中選擇LangChain。

  • 如果業務已經使用LlamaIndex開發框架,在開發框架中選擇LlamaIndex。

步驟一:完成服務選型和代碼下載

根據知識庫和業務需要,選擇RAG鏈路中需要使用的算法服務以及開發框架,本文以Python SDK開發框架為例介紹如何搭建RAG鏈路。

  1. 登錄AI搜索開放平臺控制臺

  2. 選擇上海地域,切換到AI搜索開放平臺,切換到目標空間。

    說明
    • 目前僅支持在上海開通AI搜索開放平臺功能。

    • 支持杭州、深圳、北京、張家口、青島地域的用戶,通過VPC地址跨地域調用AI搜索開放平臺的服務。

  3. 在左側導航欄選擇場景中心,選擇RAG場景-知識庫在線問答右側的進入

    0726-rag控制臺截圖更新.png

  4. 根據服務信息結合業務特點,從下拉列表中選擇所需服務,服務詳情頁面可查看服務詳細信息。

    說明
    • 通過API調用RAG鏈路中的算法服務時,需要提供服務ID(service_id),如文檔內容解析服務的ID為ops-document-analyze-001。

    • 從服務列表中切換服務后,生成代碼中的service_id會同步更新。當代碼下載到本地環境后,您仍可以更改service_id,調用對應服務。

    環節

    服務說明

    文檔內容解析

    文檔內容解析服務(ops-document-analyze-001):提供通用文檔解析服務,支持從非結構化文檔(文本、表格、圖片等)中提取標題、分段等邏輯層級結構,以結構化格式輸出。

    圖片內容解析

    • 圖片內容理解服務(ops-image-analyze-vlm-001):可基于多模態大模型對圖片內容進行解析理解以及文字識別,解析后的文本可用于圖片檢索問答場景。

    • 圖片文本識別服務(ops-image-analyze-ocr-001):使用OCR能力進行圖片文字識別,解析后的文本可用于圖片檢索問答場景。

    文檔切片

    文檔切片服務(ops-document-split-001):提供通用文本切片服務,支持基于文檔段落、文本語義、指定規則,對HTML、Markdown、txt格式的結構化數據進行拆分,同時支持以富文本形式提取文檔中的代碼、圖片以及表格。

    文本向量化

    • OpenSearch文本向量化服務-001(ops-text-embedding-001):提供多語言(40+)文本向量化服務,輸入文本最大長度300,輸出向量維度1536維。

    • OpenSearch通用文本向量化服務-002(ops-text-embedding-002):提供多語言(100+)文本向量化服務,輸入文本最大長度8192,輸出向量維度1024維。

    • OpenSearch文本向量化服務-中文-001(ops-text-embedding-zh-001):提供中文文本向量化服務,輸入文本最大長度1024,輸出向量維度768維。

    • OpenSearch文本向量化服務-英文-001(ops-text-embedding-en-001):提供英文文本向量化服務,輸入文本最大長度512,輸出向量維度768維。

    文本稀疏向量化

    提供將文本數據轉化為稀疏向量形式表達的服務,稀疏向量存儲空間更小,常用于表達關鍵詞和詞頻信息,可與稠密向量搭配進行混合檢索,提升檢索效果。

    OpenSearch文本稀疏向量化服務(ops-text-sparse-embedding-001):提供多語言(100+)文本向量化服務,輸入文本最大長度8192。

    查詢分析

    查詢分析服務001(ops-query-analyze-001)提供通用的Query分析服務,可基于大語言模型對用戶輸入的Query進行意圖理解,以及相似問題擴展。

    搜索引擎

    • 阿里云Elasticsearch:基于開源Elasticsearch構建的全托管云服務,100%兼容開源功能的同時,支持開箱即用、按需付費。

      說明

      搜索引擎服務選擇阿里云Elasticsearch時,由于兼容性問題,文本稀疏向量化服務不可用,建議使用文本向量化相關服務。

    • OpenSearch-向量檢索版是阿里巴巴自主研發的大規模分布式向量檢索引擎,支持多種向量檢索算法,高精度下性能表現優異,能完成大規模高性價比的索引構建和檢索,同時,索引支持水平拓展與合并,支持索引流式構建、即增即查、數據實時動態更新。

      說明

      如您需要使用OpenSearch-向量檢索版引擎方案,可以在RAG鏈路中替換引擎配置及代碼。

    排序服務

    BGE重排模型(ops-bge-reranker-larger):通用文檔打分服務,支持根據query與文檔內容的相關性,按分數由高到低對文檔排序,并輸出打分結果。

    大模型

    • OpenSearch-通義千問-Turbo(ops-qwen-turbo):以qwen-turbo大規模語言模型為模型底座,進行有監督的模型微調,強化檢索增強,減少有害性。

    • 通義千問-Turbo(qwen-turbo):通義千問超大規模語言模型,支持中文、英文等不同語言輸入,詳情請參見通義千問大語言模型介紹

    • 通義千問-Plus(qwen-plus):通義千問超大規模語言模型的增強版,支持中文、英文等不同語言輸入,詳情請參見通義千問大語言模型介紹

    • 通義千問-Max(qwen-max):通義千問千億級別超大規模語言模型,支持中文、英文等不同語言輸入,詳情請參見通義千問大語言模型介紹

  5. 完成服務選型后,單擊配置完成,進入代碼查詢查看和下載代碼。

    按照應用調用RAG鏈路時的運行流程,將代碼分為離線文檔處理和在線用戶問答處理兩部分:

    流程

    作用

    說明

    離線文檔處理

    負責文檔處理,包含文檔解析、圖片提取、切片、向量化以及將文檔處理結果寫入ES索引。

    使用主函數document_pipeline_execute完成以下流程,可通過文檔URL或Base64編碼輸入待處理文檔。

    1. 文檔解析,服務調用請參見文檔解析API

      • 調用異步文檔解析接口,從文檔URL地址中提取文檔內容,或者從Base64編碼文件中進行解碼。

      • 通過create_async_extraction_task函數構造解析任務,通過poll_task_result函數輪詢任務完成狀態。

    2. 圖片提取,服務調用請參見圖片內容提取API

      • 調用異步圖片解析接口,從圖片URL地址中提取圖片內容,或者從Base64編碼文件中進行解碼。

      • 通過create_image_analyze_task函數創建圖片解析任務,通過get_image_analyze_task_status函數獲取圖片解析任務狀態。

    3. 文檔切片,服務調用請參見文檔切片API

      • 調用文檔切片接口,將解析后的文檔按指定策略切片。

      • 通過document_split函數進行文檔切片,包含文檔切片和富文本內容解析兩部分。

    4. 文本向量化,服務調用請參見文本向量化API

      • 調用文本稠密向量表示接口,將切片后的文本向量化。

      • 通過text_embedding函數計算每個切片的embedding向量。

    5. 寫入ES,服務調用請參見使用Elasticsearch的向量近鄰檢索(kNN)功能

      • 創建ES索引配置,包含指定向量字段embedding和文檔內容字段content。

        重要

        創建ES索引時,系統會刪除已有同名索引。為避免誤刪同名索引,請更改代碼中的索引名稱。

      • 通過helpers.async_bulk函數將向量化結果批量寫入ES索引。

    在線問答處理

    負責處理用戶在線查詢,包含生成查詢向量、查詢分析、檢索相關文檔切片、排序檢索結果以及根據檢索結果生成回答。

    使用主函數query_pipeline_execute完成以下流程,對用戶查詢進行處理并輸出回答。

    1. query向量化,服務調用請參見文本向量化API

      • 調用文本稠密向量表示接口,將用戶的查詢轉換為向量。

      • 通過text_embedding函數生成查詢向量。

    2. 調用查詢分析服務,請參見查詢分析API

      調用查詢分析接口,通過分析歷史消息識別用戶提問意圖及生成相似問題。

    3. 搜索embedding切片,服務調用請參見使用Elasticsearch的向量近鄰檢索(kNN)功能

      • 使用ES檢索索引中與查詢向量相近的文檔切片。

      • 通過AsyncElasticsearch的search接口,結合KNN查詢進行相似度檢索。

    4. 調用排序服務,請參見排序API

      • 調用排序服務接口,對檢索到的相關切片打分排序。

      • 通過documents_ranking函數,根據用戶查詢對文檔內容打分排序。

    5. 調用大模型生成答案,服務調用請參見答案生成API

      調用大模型服務,使用檢索結果和用戶查詢通過llm_call函數生成最終答案。

    分別選擇代碼查詢下的文檔處理流程在線問答流程,單擊復制代碼或者下載文件,將代碼下載到本地。

步驟二:本地環境適配和測試RAG開發鏈路

將代碼分別下載到本地的兩個文件后,例如online.py和offline.py,需要配置代碼中的關鍵參數。

類別

參數

說明

AI搜索開放平臺

api_key

API調用密鑰,獲取方式請參見管理API Key

aisearch_endpoint

API調用地址,獲取方式請參見獲取服務接入地址

說明

注意需要去掉“http://”。

支持通過公網和VPC兩種方式調用API。

workspace_name

AI搜索開放平臺

service_id

服務ID,為操作方便,可以分別在離線文檔處理(offline.py)和在線問答處理代碼(online.py)中,通過service_id_config配置各項服務以及ID。

image

ES搜索引擎

es_host

Elasticsearch(ES)實例訪問地址,通過公網或私網訪問阿里云ES實例時,需要先將待訪問設備的IP地址加入實例的訪問白名單中,詳情參見配置實例公網或私網訪問白名單

es_auth

訪問Elasticsearch實例時的賬號和密碼,賬號為elastic,密碼為您創建實例時設置的密碼。如果忘記密碼可重置,具體操作請參見重置實例訪問密碼

其他參數

如使用示例數據則無需修改

完成參數配置后即可在Python 3.7及以上版本環境中,先后運行offline.py離線文檔處理文件和online.py在線問答處理文件測試運行結果是否正確。

如知識庫文檔為AI搜索開放平臺介紹,對文檔提問:AI搜索開放平臺可以做什么?

運行結果如下:

  • 離線文檔處理結果

    raglixian.jpg.png

  • 在線問答處理結果

    ragzaxian.png

  • 查看源碼文件

    offine.py
    # RAG離線鏈路-ElasticSearch引擎
    
    # 環境需求:
    # Python版本:3.7及以上
    # ES集群版本:8.9及以上:如果是阿里云ES需要提前開通并設置訪問ip白名單 http://bestwisewords.com/zh/es/user-guide/configure-a-public-or-private-ip-address-whitelist-for-an-elasticsearch-cluster
    
    # 包需求:
    # pip install alibabacloud_searchplat20240529
    # pip install elasticsearch
    
    # AI搜索開放平臺配置
    aisearch_endpoint = "xxx.platform-cn-shanghai.opensearch.aliyuncs.com"
    api_key = "OS-xxx"
    workspace_name = "default"
    service_id_config = {"extract": "ops-document-analyze-001",
                         "split": "ops-document-split-001",
                         "text_embedding": "ops-text-embedding-001",
                         "text_sparse_embedding": "ops-text-sparse-embedding-001",
                         "image_analyze": "ops-image-analyze-ocr-001"}
    
    # ES配置
    es_host = 'http://es-cn-xxx.public.elasticsearch.aliyuncs.com:9200'
    es_auth = ('elastic', 'xxx')
    
    
    # 輸入文檔url,示例文檔為AI搜索開放平臺產品說明文檔
    document_url = "http://bestwisewords.com/zh/open-search/search-platform/product-overview/introduction-to-search-platform?spm=a2c4g.11186623.0.0.7ab93526WDzQ8z"
    
    import asyncio
    from typing import List
    from elasticsearch import AsyncElasticsearch
    from elasticsearch import helpers
    from alibabacloud_tea_openapi.models import Config
    from alibabacloud_searchplat20240529.client import Client
    from alibabacloud_searchplat20240529.models import GetDocumentSplitRequest, CreateDocumentAnalyzeTaskRequest, \
        CreateDocumentAnalyzeTaskRequestDocument, GetDocumentAnalyzeTaskStatusRequest, \
        GetDocumentSplitRequestDocument, GetTextEmbeddingRequest, GetTextEmbeddingResponseBodyResultEmbeddings, \
        GetTextSparseEmbeddingRequest, GetTextSparseEmbeddingResponseBodyResultSparseEmbeddings, \
        CreateImageAnalyzeTaskRequestDocument, CreateImageAnalyzeTaskRequest, CreateImageAnalyzeTaskResponse, \
        GetImageAnalyzeTaskStatusRequest, GetImageAnalyzeTaskStatusResponse
    
    
    async def poll_task_result(ops_client, task_id, service_id, interval=5):
        while True:
            request = GetDocumentAnalyzeTaskStatusRequest(task_id=task_id)
            response = await ops_client.get_document_analyze_task_status_async(workspace_name, service_id, request)
            status = response.body.result.status
            if status == "PENDING":
                await asyncio.sleep(interval)
            elif status == "SUCCESS":
                return response
            else:
                raise Exception("document analyze task failed")
    
    
    def is_analyzable_url(url:str):
        if not url:
            return False
        image_extensions = {'.jpg', '.jpeg', '.png', '.bmp', '.tiff'}
        return url.lower().endswith(tuple(image_extensions))
    
    
    async def image_analyze(ops_client, url):
        try:
            print("image analyze :" + url)
            if url.startswith("http://"):
                url = "https:" + url
            if not is_analyzable_url(url):
                print(url + " is not analyzable.")
                return url
            image_analyze_service_id = service_id_config["image_analyze"]
            document = CreateImageAnalyzeTaskRequestDocument(
                url=url,
            )
            request = CreateImageAnalyzeTaskRequest(document=document)
            response: CreateImageAnalyzeTaskResponse = ops_client.create_image_analyze_task(workspace_name, image_analyze_service_id, request)
            task_id = response.body.result.task_id
            while True:
                request = GetImageAnalyzeTaskStatusRequest(task_id=task_id)
                response: GetImageAnalyzeTaskStatusResponse = ops_client.get_image_analyze_task_status(workspace_name, image_analyze_service_id, request)
                status = response.body.result.status
                if status == "PENDING":
                    await asyncio.sleep(5)
                elif status == "SUCCESS":
                    return url + response.body.result.data.content
                else:
                    print("image analyze error: " + response.body.result.error)
                    return url
        except Exception as e:
            print(f"image analyze Exception : {e}")
    
    
    def chunk_list(lst, chunk_size):
        for i in range(0, len(lst), chunk_size):
            yield lst[i:i + chunk_size]
    
    
    async def write_to_es(doc_list):
        es = AsyncElasticsearch(
            [es_host],
            basic_auth=es_auth,
            verify_certs=False,  # 不使用SSL證書校驗
            request_timeout=30,
            max_retries=10,
            retry_on_timeout=True
        )
        index_name = 'dense_vertex_index'
    
        # 刪除已有索引
        if await es.indices.exists(index=index_name):
            await es.indices.delete(index=index_name)
    
        # 創建向量索引, 指定embedding字段為dense_vector, content字段為text, source字段為keyword
        index_mappings = {
            "mappings": {
                "properties": {
                    "emb": {
                        "type": "dense_vector",
                        "index": True,
                        "similarity": "cosine",
                        "dims": 1536  # 根據embedding模型輸出維度修改
                    },
                    "content": {
                        "type": "text"
                    },
                    "source_doc": {
                        "type": "keyword"
                    }
                }
            }
        }
        await es.indices.create(index=index_name, body=index_mappings)
    
        # 上傳embedding結果到上一步創建好的索引
        actions = []
        for i, doc in enumerate(doc_list):
            action = {
                "_index": index_name,
                "_id": doc['id'],
                "_source": {
                    "emb": doc['embedding'],
                    "content": doc['content'],
                    "source_doc": document_url
                }
            }
            actions.append(action)
    
        try:
            await helpers.async_bulk(es, actions)
        except Exception as e:
            for error in e.errors:
                print(error)
    
        # 確認上傳成功
        await asyncio.sleep(2)
        query = {
            "query": {
                "ids": {
                    "values": [doc_list[0]["id"]]
                }
            }
        }
        res = await es.search(index=index_name, body=query)
        if len(res['hits']['hits']) > 0:
            print("ES write success")
        await es.close()
    
    
    async def document_pipeline_execute(document_url: str = None, document_base64: str = None, file_name: str = None):
    
        # 生成opensearch開發平臺client
        config = Config(bearer_token=api_key, endpoint=aisearch_endpoint, protocol="http")
        ops_client = Client(config=config)
    
        # Step 1: 文檔解析
        document_analyze_request = CreateDocumentAnalyzeTaskRequest(
            document=CreateDocumentAnalyzeTaskRequestDocument(url=document_url, content=document_base64,
                                                              file_name=file_name, file_type='html'))
        document_analyze_response = await ops_client.create_document_analyze_task_async(workspace_name=workspace_name,
                                                                                        service_id=service_id_config[
                                                                                            "extract"],
                                                                                        request=document_analyze_request)
        print("document-analyze task_id:" + document_analyze_response.body.result.task_id)
        extraction_result = await poll_task_result(ops_client, document_analyze_response.body.result.task_id,
                                                   service_id_config["extract"])
        print("document-analyze done")
        document_content = extraction_result.body.result.data.content
        content_type = extraction_result.body.result.data.content_type
        # Step 2: 文檔切片
        document_split_request = GetDocumentSplitRequest(
            GetDocumentSplitRequestDocument(content=document_content, content_type=content_type))
        document_split_result = await ops_client.get_document_split_async(workspace_name, service_id_config["split"],
                                                                          document_split_request)
        print("document-split done, chunks count: " + str(len(document_split_result.body.result.chunks))
              + " rich text count:" + str(len(document_split_result.body.result.rich_texts)))
    
        # Step 3: 文本向量化
        # 提取切片結果。圖片切片會通過圖片解析服務提取出文本內容
        doc_list = ([{"id": chunk.meta.get("id"), "content": chunk.content} for chunk in
                     document_split_result.body.result.chunks]
                    + [{"id": chunk.meta.get("id"), "content": chunk.content} for chunk in
                       document_split_result.body.result.rich_texts if chunk.meta.get("type") != "image"]
                    + [{"id": chunk.meta.get("id"), "content": await image_analyze(ops_client, chunk.content)} for chunk in
                       document_split_result.body.result.rich_texts if chunk.meta.get("type") == "image"]
                    )
    
        chunk_size = 32  # 一次最多允許計算32個embedding
        all_text_embeddings: List[GetTextEmbeddingResponseBodyResultEmbeddings] = []
        for chunk in chunk_list([text["content"] for text in doc_list], chunk_size):
            response = await ops_client.get_text_embedding_async(workspace_name, service_id_config["text_embedding"],
                                                                 GetTextEmbeddingRequest(chunk))
            all_text_embeddings.extend(response.body.result.embeddings)
    
        all_text_sparse_embeddings: List[GetTextSparseEmbeddingResponseBodyResultSparseEmbeddings] = []
        for chunk in chunk_list([text["content"] for text in doc_list], chunk_size):
            response = await ops_client.get_text_sparse_embedding_async(workspace_name,
                                                                        service_id_config["text_sparse_embedding"],
                                                                        GetTextSparseEmbeddingRequest(chunk,
                                                                                                      input_type="document",
                                                                                                      return_token=True))
            all_text_sparse_embeddings.extend(response.body.result.sparse_embeddings)
    
        for i in range(len(doc_list)):
            doc_list[i]["embedding"] = all_text_embeddings[i].embedding
            doc_list[i]["sparse_embedding"] = all_text_sparse_embeddings[i].embedding
    
        print("text-embedding done")
    
        # Step 4: 寫入ElasticSearch存儲引擎
        await write_to_es(doc_list)
    
    
    if __name__ == "__main__":
        # 運行異步任務
        #    import nest_asyncio # 如果在Jupyter notebook中運行,反注釋這兩行
        #    nest_asyncio.apply() # 如果在Jupyter notebook中運行,反注釋這兩行
        asyncio.run(document_pipeline_execute(document_url))
        # asyncio.run(document_pipeline_execute(document_base64="eHh4eHh4eHg...", file_name="attention.pdf")) #另外一種調用方式
    
    online.py
    # RAG在線鏈路-ElasticSearch引擎
    
    # 環境需求:
    # Python版本:3.7及以上
    # ES集群版本:8.9以上(如果是阿里云ES 需要提前開通并設置訪問ip白名單 http://bestwisewords.com/zh/es/user-guide/configure-a-public-or-private-ip-address-whitelist-for-an-elasticsearch-cluster)
    
    # 包需求:
    # pip install alibabacloud_searchplat20240529
    # pip install elasticsearch
    
    # OpenSearch搜索開發工作臺配置
    api_key = "OS-xxx"
    aisearch_endpoint = "xxx.platform-cn-shanghai.opensearch.aliyuncs.com"
    workspace_name = "default"
    service_id_config = {
        "rank": "ops-bge-reranker-larger",
        "text_embedding": "ops-text-embedding-001",
        "text_sparse_embedding": "ops-text-sparse-embedding-001",
        "llm": "ops-qwen-turbo",
        "query_analyze": "ops-query-analyze-001"
    
    }
    
    # ES配置
    es_host = 'http://es-cn-xxx.public.elasticsearch.aliyuncs.com:9200'
    es_auth = ('elastic', 'xxx')
    
    # 用戶query:
    user_query = "OpenSearch搜索開發平臺可以做什么?"
    
    
    import asyncio
    from elasticsearch import AsyncElasticsearch
    from alibabacloud_tea_openapi.models import Config
    from alibabacloud_searchplat20240529.client import Client
    from alibabacloud_searchplat20240529.models import GetTextEmbeddingRequest,  \
        GetDocumentRankRequest, GetTextGenerationRequest, GetTextGenerationRequestMessages, \
        GetQueryAnalysisRequest
    
    # 生成opensearch開發平臺client
    config = Config(bearer_token=api_key, endpoint=aisearch_endpoint, protocol="http")
    ops_client = Client(config=config)
    
    
    async def es_retrieve(query):
        es = AsyncElasticsearch(
            [es_host],
            basic_auth=es_auth,
            verify_certs=False,
            request_timeout=30,
            max_retries=10,
            retry_on_timeout=True
        )
        index_name = 'dense_vertex_index'
        # query向量化
        query_emb_result = await ops_client.get_text_embedding_async(workspace_name, service_id_config["text_embedding"],
                                                                     GetTextEmbeddingRequest(input=[query],
                                                                                             input_type="query"))
        query_emb = query_emb_result.body.result.embeddings[0].embedding
        query = {
            "field": "emb",
            "query_vector": query_emb,
            "k": 5,  # 返回文檔切片數量
            "num_candidates": 100  # HNSW搜索參數efsearch
        }
    
        res = await es.search(index=index_name, knn=query)
        search_results = [item['_source']['content'] for item in res['hits']['hits']]
        await es.close()
        return search_results
    
    
    # 在線問答流水線,輸入是用戶問題
    async def query_pipeline_execute():
    
        # Step 1: query分析
        query_analyze_response = ops_client.get_query_analysis(workspace_name, service_id_config['query_analyze'],
                                                               GetQueryAnalysisRequest(query=user_query))
        print("query analysis rewrite result:" + query_analyze_response.body.result.query)
    
        # Step 2: 召回文檔
        all_query_results = []
        user_query_results = await es_retrieve(user_query)
        all_query_results.extend(user_query_results)
        rewrite_query_results = await es_retrieve(query_analyze_response.body.result.query)
        all_query_results.extend(rewrite_query_results)
        for extend_query in query_analyze_response.body.result.queries:
            extend_query_result = await es_retrieve(extend_query)
            all_query_results.extend(extend_query_result)
        # 對所有召回結果進行去重
        remove_duplicate_results = list(set(all_query_results))
    
        # Step 3: 對召回文檔進行重排序
        rerank_top_k = 8
        score_results = await ops_client.get_document_rank_async(workspace_name, service_id_config["rank"],GetDocumentRankRequest(remove_duplicate_results, user_query))
        rerank_results = [remove_duplicate_results[item.index] for item in score_results.body.result.scores[:rerank_top_k]]
    
        # Step 4: 調用大模型回答問題
        docs = '\n'.join([f"<article>{s}</article>" for s in rerank_results])
        messages = [
            GetTextGenerationRequestMessages(role="system", content="You are a helpful assistant."),
            GetTextGenerationRequestMessages(role="user",
                                             content=f"""已知信息包含多個獨立文檔,每個文檔在<article>和</article>之間,已知信息如下:\n'''{docs}'''
                                             \n\n根據上述已知信息,詳細且有條理地回答用戶的問題。確保答案充分回答了問題并且正確使用了已知信息。如果信息不足以回答問題,請說“根據已知信息無法回答該問題”。
                                             不要使用不在已知信息中的內容生成答案,確保答案中每一個陳述在上述已知信息中有相應內容支撐。答案請使用中文。
                                             \n問題是:'''{user_query}'''""""")
        ]
        response = await ops_client.get_text_generation_async(workspace_name, service_id_config["llm"],
                                                              GetTextGenerationRequest(messages=messages))
        print("大模型最終回答: ", response.body.result.text)
    
    
    if __name__ == "__main__":
        #    import nest_asyncio # 如果在Jupyter notebook中運行,反注釋這兩行
        #    nest_asyncio.apply() # 如果在Jupyter notebook中運行,反注釋這兩行
        asyncio.run(query_pipeline_execute())
    

常見問題

代碼運行期間,由于資源未及時釋放可能出現Unclosed connector相關提示,無需處理。