RAG知識(shí)庫(kù)在線(xiàn)問(wèn)答
通過(guò)RAG(檢索增強(qiáng)生成)技術(shù)和LLM(大語(yǔ)言模型)的結(jié)合,助力企業(yè)客戶(hù)從海量文檔中快速檢索信息,本文為您介紹具體的操作流程。
背景信息
基于RAG(Retrieval-Augmented Generation)的文檔問(wèn)答常用于企業(yè)內(nèi)部知識(shí)庫(kù)檢索與總結(jié)、垂直領(lǐng)域的在線(xiàn)問(wèn)答等業(yè)務(wù)場(chǎng)景。基于客戶(hù)的專(zhuān)業(yè)知識(shí)庫(kù)文檔,通過(guò)RAG(檢索增強(qiáng)生成)技術(shù)和LLM(大語(yǔ)言模型),理解和響應(yīng)復(fù)雜的自然語(yǔ)言查詢(xún),幫助企業(yè)客戶(hù)通過(guò)自然語(yǔ)言快速?gòu)腜DF、WORD、表格、圖片文檔中檢索到所需信息。
前提條件
已創(chuàng)建DLF 2.0數(shù)據(jù)目錄。如未創(chuàng)建,詳情請(qǐng)參見(jiàn)創(chuàng)建數(shù)據(jù)目錄。
說(shuō)明如果是RAM用戶(hù),在進(jìn)行數(shù)據(jù)操作之前,需要先授予相應(yīng)的資源權(quán)限。詳情請(qǐng)參見(jiàn)授權(quán)管理。
已開(kāi)通AI搜索開(kāi)放平臺(tái),并獲得API-KEY。
已開(kāi)通OpenSearch向量檢索版實(shí)例,實(shí)例的引擎版本需要不低于1.3.0。
操作流程
步驟1:載入Notebook案例
找到對(duì)應(yīng)的案例卡片,單擊卡片中的載入案例。
選擇載入到的工作空間和實(shí)例,單擊確認(rèn),進(jìn)入DataWorks數(shù)據(jù)開(kāi)發(fā)頁(yè)面。
步驟2:開(kāi)發(fā)環(huán)境準(zhǔn)備
在下方單元格中,包含了七個(gè)變量。請(qǐng)?jiān)谶\(yùn)行之前將變量替換為具體值,以確保運(yùn)行成功。
${your-search-platform-public-endpoint}
:您可以進(jìn)入AI搜索開(kāi)放平臺(tái),查看Endpoint。${your-search-platform-api-key}
:您可以進(jìn)入AI搜索開(kāi)放平臺(tái),查看API-KEY。您可以進(jìn)入OpenSearch引擎控制臺(tái),查看以下信息:
${your-opensearch-public_endpoint}
:OpenSearch實(shí)例的Endpoint。${your-opensearch-instance_id}
:OpenSearch實(shí)例的ID。${your-opensearch-table-name}
:OpenSearch表名。${your-opensearch-instance-username}
:OpenSearch用戶(hù)名。${your-opensearch-instance-password}
:OpenSearch密碼。
# AI搜索開(kāi)放平臺(tái)配置
search_plat_host = "${your-search-platform-public-endpoint}"
search_plat_api_key = "${your-search-platform-api-key}"
# OpenSearch向量檢索版配置
os_vectorstore_host = "${your-opensearch-public_endpoint}"
os_vectorstore_instance_id = "${your-opensearch-instance_id}"
os_vectorstore_table_name = "${your-opensearch-table-name}"
os_vectorstore_user_name = "${your-opensearch-instance-username}"
os_vectorstore_user_password = "${your-opensearch-instance-password}"
# 輸入文檔url,示例文檔為opensearch產(chǎn)品說(shuō)明文檔
document_url = "http://bestwisewords.com/zh/open-search/search-platform/product-overview/introduction-to-search-platform"
在下方單元格中,包含了七個(gè)變量。請(qǐng)?jiān)谶\(yùn)行之前將變量替換為具體值,以確保運(yùn)行成功。
您可以在數(shù)據(jù)湖構(gòu)建服務(wù)(DLF)控制臺(tái),查看以下信息:
${dlf_region}
:DLF所在的可用區(qū),請(qǐng)您根據(jù)地域進(jìn)行選擇。支持的地域請(qǐng)參見(jiàn)地域及訪問(wèn)域名。${dlf_catalog_id}
:DLF數(shù)據(jù)目錄的ID。${dlf_catalog_name}
:DLF數(shù)據(jù)目錄的名稱(chēng)。${dlf_database_name}
:DLF數(shù)據(jù)庫(kù)的名稱(chēng)。${dlf_table_name}
:DLF數(shù)據(jù)表的名稱(chēng)。${dlf_catalog_accessKeyId}
:訪問(wèn)DLF服務(wù)所需的Access Key ID。獲取方法請(qǐng)參見(jiàn)創(chuàng)建AccessKey。${dlf_catalog_accessKeySecret}
:訪問(wèn)DLF服務(wù)所需的Access Key Secret。獲取方法請(qǐng)參見(jiàn)創(chuàng)建AccessKey。
# 請(qǐng)?jiān)谙路教顚?xiě)全局參數(shù)的值,再運(yùn)行代碼
dlf_region = "${dlf_region}" #請(qǐng)將 [dlf_region] 替換為您的DLF所在地域ID,例如cn-beijing
dlf_catalog_id = "${dlf_catalog_id}" #請(qǐng)將 [dlf_catalog_id] 替換為您DLF數(shù)據(jù)目錄ID
dlf_catalog_name = "myfirstcatalog" #請(qǐng)將 [dlf_catalog_name]替換為您的目標(biāo)DLF Catalog名稱(chēng);如果已進(jìn)行OpenLake一體化開(kāi)通,推薦填寫(xiě):"myfirstcatalog"
dlf_database_name = "opensearch_db" #請(qǐng)將 [dlf_database_name]替換為您的目標(biāo)DLF 數(shù)據(jù)庫(kù)名稱(chēng);如果已進(jìn)行OpenLake一體化開(kāi)通,推薦填寫(xiě):"opensearch_db"
dlf_table_name = "rag" #請(qǐng)將[dlf_table_name]替換成自定義表名稱(chēng),推薦填寫(xiě):"rag"
dlf_catalog_accessKeyId = "${dlf_catalog_accessKeyId}" #請(qǐng)將 [dlf_catalog_accessKeyId] 替換為您訪問(wèn)DLF服務(wù)所需的AccessKeyId
dlf_catalog_accessKeySecret = "${dlf_catalog_accessKeySecret}" #請(qǐng)將 [dlf_catalog_accessKeySecret] 替換為您訪問(wèn)DLF服務(wù)所需的AccessKeySecret
# DLF服務(wù)的Endpoint
dlf_endpoint = f"dlfnext-vpc.{dlf_region}.aliyuncs.com"
需要提前安裝Python 3.7及以上版本,請(qǐng)確保安裝相應(yīng)的Python版本。AI搜索開(kāi)放平臺(tái)和OpenSearch向量檢索服務(wù)相關(guān)的環(huán)境依賴(lài)如下。
! pip install -q alibabacloud_searchplat20240529 alibabacloud_ha3engine_vector openai
步驟3:開(kāi)發(fā)環(huán)境初始化
這部分是離線(xiàn)文檔處理和在線(xiàn)文檔問(wèn)答的公共代碼,包含導(dǎo)入依賴(lài)包、初始化搜索開(kāi)發(fā)平臺(tái)client,搜索引擎配置等。
import asyncio
import json
from operator import attrgetter
from typing import List
from alibabacloud_ha3engine_vector import models, client
from alibabacloud_ha3engine_vector.models import QueryRequest, SparseData
from Tea.exceptions import TeaException, RetryError
from alibabacloud_tea_openapi.models import Config
from alibabacloud_searchplat20240529.client import Client
from alibabacloud_searchplat20240529.models import GetDocumentSplitRequest, CreateDocumentAnalyzeTaskRequest, \
CreateDocumentAnalyzeTaskRequestDocument, GetDocumentAnalyzeTaskStatusRequest, GetDocumentSplitRequestDocument, \
GetTextEmbeddingRequest, GetTextEmbeddingResponseBodyResultEmbeddings, GetTextSparseEmbeddingRequest, \
GetTextSparseEmbeddingResponseBodyResultSparseEmbeddings, GetTextSparseEmbeddingResponseBodyResultSparseEmbeddingsEmbedding, \
GetImageAnalyzeTaskStatusResponse, CreateImageAnalyzeTaskRequest, GetImageAnalyzeTaskStatusRequest, \
CreateImageAnalyzeTaskRequestDocument, CreateImageAnalyzeTaskResponse,GetDocumentRankRequest, \
GetTextGenerationRequest, GetTextGenerationRequestMessages, GetQueryAnalysisRequest
from openai import OpenAI
workspace_name = "default"
service_id_config = {"document_analyze": "ops-document-analyze-001",
"split": "ops-document-split-001",
"image_analyze": "ops-image-analyze-ocr-001",
"rank": "ops-bge-reranker-larger",
"text_embedding": "ops-text-embedding-002",
"text_sparse_embedding": "ops-text-sparse-embedding-001",
"llm": "ops-qwen-turbo",
"query_analyze": "ops-query-analyze-001"}
# 生成AI搜索開(kāi)放平臺(tái)client
config = Config(bearer_token=search_plat_api_key, endpoint=search_plat_host, protocol="http", read_timeout=30000)
ops_client = Client(config=config)
openAIclient = OpenAI(
api_key=search_plat_api_key,
base_url="http://" + search_plat_host + "/compatible-mode/v1/"
)
步驟4:離線(xiàn)文檔處理并寫(xiě)入Paimon表
離線(xiàn)文檔處理主要通過(guò)搜索開(kāi)發(fā)平臺(tái)上的文檔理解、切片、和向量化等各類(lèi)服務(wù),將文檔數(shù)據(jù)解析和提取、文檔切片、切片Embedding并最終推送到Paimon表中。新的數(shù)據(jù)寫(xiě)入Paimon表之后,OpenSearch可以實(shí)時(shí)感知并構(gòu)建實(shí)時(shí)索引,實(shí)現(xiàn)數(shù)據(jù)寫(xiě)入后的秒級(jí)實(shí)時(shí)查詢(xún)。
# 初始化Paimon表信息
import os
import pandas as pd
import pyarrow as pa
from paimon_python_java import Catalog
from paimon_python_api import Schema
# create catalog
catalog_options = {
'metastore': 'dlf-paimon',
'dlf.endpoint': dlf_endpoint,
'dlf.region': dlf_region ,
'dlf.catalog.id': dlf_catalog_id,
'dlf.catalog.accessKeyId': dlf_catalog_accessKeyId ,
'dlf.catalog.accessKeySecret': dlf_catalog_accessKeySecret,
}
catalog = Catalog.create(catalog_options)
pa_schema = pa.schema([
('id', pa.string()),
('title', pa.string()),
('text', pa.string()),
('url', pa.string()),
('embedding', pa.string()),
('token_id', pa.string()),
('token_weight', pa.string())
])
schema = Schema(pa_schema=pa_schema)
# 創(chuàng)建Paimon表
catalog.create_table(f'{dlf_database_name}.{dlf_table_name}', schema, True)
import os
import pandas as pd
import pyarrow as pa
async def write_to_paimon(doc_list):
# get table
table = catalog.get_table(f"{dlf_database_name}.{dlf_table_name}")
# write data
# 1. Create table write and commit
write_builder = table.new_batch_write_builder()
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
# convert data source to arrow RecordBatch
df = pd.DataFrame(doc_list)
record_batch = pa.RecordBatch.from_pandas(df, pa_schema)
# 3. Write data. You can write many times.
table_write.write_arrow_batch(record_batch)
# 4. Commit data. If you commit, you cannot write more data.
commit_messages = table_write.prepare_commit()
table_commit.commit(commit_messages)
# 5. Close resources.
table_write.close()
table_commit.close()
async def poll_doc_analyze_task_result(ops_client, task_id, service_id, interval=5):
while True:
request = GetDocumentAnalyzeTaskStatusRequest(task_id=task_id)
response = await ops_client.get_document_analyze_task_status_async(workspace_name, service_id, request)
status = response.body.result.status
if status == "PENDING":
await asyncio.sleep(interval)
elif status == "SUCCESS":
return response
else:
print("error: " + response.body.result.error)
raise Exception("document analyze task failed")
def is_analyzable_url(url:str):
if not url:
return False
image_extensions = {'.jpg', '.jpeg', '.png', '.bmp', '.tiff'}
return url.lower().endswith(tuple(image_extensions))
async def image_analyze(ops_client, url):
try:
print("image analyze :" + url)
if url.startswith("http://"):
url = "https:" + url
if not is_analyzable_url(url):
print(url + " is not analyzable.")
return url
image_analyze_service_id = service_id_config["image_analyze"]
document = CreateImageAnalyzeTaskRequestDocument(
url=url,
)
request = CreateImageAnalyzeTaskRequest(document=document)
response: CreateImageAnalyzeTaskResponse = ops_client.create_image_analyze_task(workspace_name, image_analyze_service_id, request)
task_id = response.body.result.task_id
while True:
request = GetImageAnalyzeTaskStatusRequest(task_id=task_id)
response: GetImageAnalyzeTaskStatusResponse = ops_client.get_image_analyze_task_status(workspace_name, image_analyze_service_id, request)
status = response.body.result.status
if status == "PENDING":
await asyncio.sleep(5)
elif status == "SUCCESS":
return url + response.body.result.data.content
else:
print("image analyze error: " + response.body.result.error)
return url
except Exception as e:
print(f"image analyze Exception : {e}")
def chunk_list(lst, chunk_size):
for i in range(0, len(lst), chunk_size):
yield lst[i:i + chunk_size]
async def document_pipeline_execute(document_url: str = None, document_base64: str = None, file_name: str = None):
# 生成opensearch開(kāi)發(fā)平臺(tái)client
config = Config(bearer_token=search_plat_api_key,endpoint=search_plat_host,protocol="http")
ops_client = Client(config=config)
# Step 1: 文檔解析
document_analyze_request = CreateDocumentAnalyzeTaskRequest(document=CreateDocumentAnalyzeTaskRequestDocument(url=document_url, content=document_base64,file_name=file_name, file_type='html'))
document_analyze_response = await ops_client.create_document_analyze_task_async(workspace_name=workspace_name,service_id=service_id_config["document_analyze"],request=document_analyze_request)
print("document_analyze task_id:" + document_analyze_response.body.result.task_id)
extraction_result = await poll_doc_analyze_task_result(ops_client, document_analyze_response.body.result.task_id, service_id_config["document_analyze"])
print("document_analyze done")
document_content = extraction_result.body.result.data.content
content_type = extraction_result.body.result.data.content_type
# Step 2: 文檔切片
document_split_request = GetDocumentSplitRequest(
GetDocumentSplitRequestDocument(content=document_content, content_type=content_type))
document_split_result = await ops_client.get_document_split_async(workspace_name, service_id_config["split"],
document_split_request)
print("document-split done, chunks count: " + str(len(document_split_result.body.result.chunks))
+ " rich text count:" + str(len(document_split_result.body.result.rich_texts)))
# Step 3: 文本向量化
# 提取切片結(jié)果。圖片切片會(huì)通過(guò)圖片解析服務(wù)提取出文本內(nèi)容
doc_list = ([{"id": chunk.meta.get("id"), "content": chunk.content} for chunk in document_split_result.body.result.chunks]
+ [{"id": chunk.meta.get("id"), "content": chunk.content} for chunk in document_split_result.body.result.rich_texts if chunk.meta.get("type") != "image"]
+ [{"id": chunk.meta.get("id"), "content": await image_analyze(ops_client,chunk.content)} for chunk in document_split_result.body.result.rich_texts if chunk.meta.get("type") == "image"]
)
chunk_size = 32 # 一次最多允許計(jì)算32個(gè)embedding
all_text_embeddings: List[GetTextEmbeddingResponseBodyResultEmbeddings] = []
for chunk in chunk_list([text["content"] for text in doc_list], chunk_size):
response = await ops_client.get_text_embedding_async(workspace_name,service_id_config["text_embedding"],GetTextEmbeddingRequest(chunk))
all_text_embeddings.extend(response.body.result.embeddings)
all_text_sparse_embeddings: List[GetTextSparseEmbeddingResponseBodyResultSparseEmbeddings] = []
for chunk in chunk_list([text["content"] for text in doc_list], chunk_size):
response = await ops_client.get_text_sparse_embedding_async(workspace_name,service_id_config["text_sparse_embedding"],GetTextSparseEmbeddingRequest(chunk,input_type="document",return_token=True))
all_text_sparse_embeddings.extend(response.body.result.sparse_embeddings)
paimon_doc_list = []
for i in range(len(doc_list)):
paimon_doc = {}
paimon_doc["id"] = doc_list[i]["id"]
paimon_doc["title"] = ""
paimon_doc["text"] = doc_list[i]["content"]
paimon_doc["url"] = document_url
paimon_doc["embedding"] = ','.join([str(embedding) for embedding in all_text_embeddings[i].embedding])
sorted_sparse_embedding_result = sorted(all_text_sparse_embeddings[i].embedding, key=attrgetter('token_id'))
token_ids = [str(sparse_embedding.token_id) for sparse_embedding in sorted_sparse_embedding_result]
token_wights = [str(sparse_embedding.weight) for sparse_embedding in sorted_sparse_embedding_result]
paimon_doc["token_id"] = ','.join(token_ids)
paimon_doc["token_weight"] = ','.join(token_wights)
paimon_doc_list.append(paimon_doc)
print("text-embedding done")
for doc in paimon_doc_list:
print(doc)
# Step 4: 寫(xiě)入Paimon表
await write_to_paimon(paimon_doc_list)
配置項(xiàng) | 說(shuō)明 | 是否必填 | 備注 |
dlf.endpoint | DLF服務(wù)的Endpoint。 | 是 | 詳情請(qǐng)參見(jiàn)地域及訪問(wèn)域名。 |
dlf.region | DLF所在區(qū)域。 | 是 | 詳情請(qǐng)參見(jiàn)地域及訪問(wèn)域名。 說(shuō)明 請(qǐng)和dlf.catalog.endpoint選擇的地域保持一致。 |
dlf.catalog.id | DLF數(shù)據(jù)目錄ID。 | 是 | 請(qǐng)?jiān)跀?shù)據(jù)湖構(gòu)建控制臺(tái)的數(shù)據(jù)目錄列表中查看Catalog ID。 |
dlf.tokenCache.meta.accessKeyId | 訪問(wèn)DLF服務(wù)所需的Access Key ID。 | 是 | 獲取方法請(qǐng)參見(jiàn)創(chuàng)建AccessKey。 |
dlf.tokenCache.meta.accessKeySecret | 訪問(wèn)DLF服務(wù)所需的Access Key Secret。 | 是 | 獲取方法請(qǐng)參見(jiàn)創(chuàng)建AccessKey。 |
dlf.user.name | DLF數(shù)據(jù)目錄的Owner | 是 | 請(qǐng)?jiān)跀?shù)據(jù)湖構(gòu)建控制臺(tái)上查看數(shù)據(jù)目錄詳細(xì)信息中的Owner,具體操作請(qǐng)參見(jiàn)查看數(shù)據(jù)目錄。 |
執(zhí)行以下腳本,運(yùn)行離線(xiàn)異步任務(wù)。
if __name__ == "__main__":
import nest_asyncio # 如果在Jupyter notebook中運(yùn)行,反注釋這兩行
nest_asyncio.apply() # 如果在Jupyter notebook中運(yùn)行,反注釋這兩行
asyncio.run(document_pipeline_execute(document_url))
步驟5:從Paimon表創(chuàng)建索引
在DataWorks左側(cè)欄中單擊數(shù)據(jù)目錄,找到之前創(chuàng)建的Paimon表,然后右鍵單擊管理OpenSearch向量索引,選擇ID為{your-opensearch-instance_id}
的OpenSearch實(shí)例,創(chuàng)建名為{your-opensearch-table-name}
的索引表,等待索引創(chuàng)建完成,即可進(jìn)入下一步。
步驟6:在線(xiàn)文檔問(wèn)答
將Query轉(zhuǎn)化為Embedding,使用OpenSearch向量檢索版提供的向量檢索能力找到與Query相似的文檔切片,組裝成Prompt,交給大模型進(jìn)行答案生成。
async def os_vector_store_retrieve(query_emb, query_sparse_emb: List[GetTextSparseEmbeddingResponseBodyResultSparseEmbeddingsEmbedding]): os_vector_store_config = models.Config( endpoint=os_vectorstore_host, instance_id=os_vectorstore_instance_id, protocol="http", access_user_name=os_vectorstore_user_name, access_pass_word=os_vectorstore_user_password ) # 初始化OpenSearch向量引擎客戶(hù)端 os_vector_store = client.Client(os_vector_store_config) sorted_sparse_embedding_result = sorted(query_sparse_emb, key=attrgetter('token_id')) token_ids = [sparse_embedding.token_id for sparse_embedding in sorted_sparse_embedding_result] token_wights = [sparse_embedding.weight for sparse_embedding in sorted_sparse_embedding_result] sparseData = SparseData(indices=token_ids, values=token_wights) request = QueryRequest(table_name=os_vectorstore_table_name, vector=query_emb, sparse_data=sparseData, include_vector=True, output_fields=["id", "text"], top_k=5) result = await os_vector_store.query_async(request) jsonResult = json.loads(result.body) search_results = [result['fields']['text'] for result in jsonResult['result']] return search_results async def get_query_result(query): # 獲取query的稠密向量 query_emb_result = await ops_client.get_text_embedding_async(workspace_name, service_id_config["text_embedding"], GetTextEmbeddingRequest(input=[query], input_type="query")) query_emb = query_emb_result.body.result.embeddings[0].embedding # 獲取query的稀疏向量 query_sparse_emb = await ops_client.get_text_sparse_embedding_async(workspace_name, service_id_config["text_sparse_embedding"], GetTextSparseEmbeddingRequest( input=[query], input_type="query")) # 使用query的稠密向量和稀疏向量去召回相關(guān)文檔 search_results = await os_vector_store_retrieve(query_emb,query_sparse_emb.body.result.sparse_embeddings[0].embedding) return search_results # 在線(xiàn)問(wèn)答,輸入是用戶(hù)問(wèn)題 async def query_pipeline_execute(user_query): # Step 1: query分析 query_analyze_response = ops_client.get_query_analysis(workspace_name, service_id_config["query_analyze"], GetQueryAnalysisRequest(query=user_query)) print("問(wèn)題:" + user_query) # Step 2: 召回文檔 all_query_results = [] user_query_results = await get_query_result(user_query) all_query_results.extend(user_query_results) rewrite_query_results = await get_query_result(query_analyze_response.body.result.query) all_query_results.extend(rewrite_query_results) for extend_query in query_analyze_response.body.result.queries: extend_query_result = await get_query_result(extend_query) all_query_results.extend(extend_query_result) # 對(duì)所有召回結(jié)果進(jìn)行去重 remove_duplicate_results = list(set(all_query_results)) # Step 3: 對(duì)去重后的召回文檔進(jìn)行重排序 rerank_top_k = 3 score_results = await ops_client.get_document_rank_async(workspace_name, service_id_config["rank"],GetDocumentRankRequest(remove_duplicate_results, user_query)) rerank_results = [remove_duplicate_results[item.index] for item in score_results.body.result.scores[:rerank_top_k]] # Step 4: 調(diào)用大模型回答問(wèn)題 docs = '\n'.join([f"<article>{s}</article>" for s in rerank_results]) completion = openAIclient.chat.completions.create( model = "ops-qwen-turbo", messages = [ {"role": "user", "content": f"""已知信息包含多個(gè)獨(dú)立文檔,每個(gè)文檔在<article>和</article>之間,已知信息如下:\n'''{docs}''' \n\n根據(jù)上述已知信息,詳細(xì)且有條理地回答用戶(hù)的問(wèn)題。確保答案充分回答了問(wèn)題并且正確使用了已知信息。如果信息不足以回答問(wèn)題,請(qǐng)說(shuō)“根據(jù)已知信息無(wú)法回答該問(wèn)題”。 不要使用不在已知信息中的內(nèi)容生成答案,確保答案中每一個(gè)陳述在上述已知信息中有相應(yīng)內(nèi)容支撐。答案請(qǐng)使用中文。 \n問(wèn)題是:'''{user_query}'''"""""} ], stream=True ) print("\n答案:", end="") for resp in completion: print(resp.choices[0].delta.content.replace("**",""), end="")
設(shè)定輸入的查詢(xún)問(wèn)題。
# 用戶(hù)query: user_query = "AI搜索開(kāi)放平臺(tái)有什么特點(diǎn)?"
執(zhí)行以下腳本,運(yùn)行離線(xiàn)異步任務(wù)。
if __name__ == "__main__": import nest_asyncio # 如果在Jupyter notebook中運(yùn)行,反注釋這行 nest_asyncio.apply() # 如果在Jupyter notebook中運(yùn)行,反注釋這行 asyncio.run(query_pipeline_execute(user_query))