本文介紹在AI搜索開放平臺進行多模態數據預處理流程。
應用場景
多模態數據預處理場景提供非結構化文檔及圖片處理方案,全鏈路分別由文檔解析服務、圖片解析服務、文檔切分服務、文本向量化服務、文本稀疏向量化服務組成,您將體驗到完整的數據處理流程。以上服務均需使用AI搜索開放平臺的API服務,將按照實際調用產生費用。
前提條件
開通AI搜索開放平臺服務,詳情請參見開通服務。
獲取服務調用地址和身份鑒權信息,詳情請參見獲取服務接入地址、獲取API-KEY。
AI搜索開放平臺支持通過公網和VPC地址調用服務,且可通過VPC實現跨地域調用服務。目前支持上海、杭州、深圳、北京、張家口、青島地域的用戶,通過VPC地址調用AI搜索開放平臺的服務。
多模態數據預處理鏈路搭建
為方便用戶使用,AI搜索開放平臺提供三種類型的開發框架:
Python SDK。
如果業務已經使用LangChain開發框架,在開發框架中選擇LangChain。
如果業務已經使用LlamaIndex開發框架,在開發框架中選擇LlamaIndex。
步驟一:完成服務選型和代碼下載
本文以Python SDK開發框架為例介紹如何搭建多模態數據預處理鏈路。
登錄AI搜索開放平臺控制臺。
選擇上海地域,切換到AI搜索開放平臺,切換到目標空間。
說明目前僅支持在上海開通AI搜索開放平臺功能。
支持杭州、深圳、北京、張家口、青島地域的用戶,通過VPC地址跨地域調用AI搜索開放平臺的服務。
在左側導航欄選擇場景中心,選擇多模態數據預處理場景-數據解析和向量化右側的進入。
根據服務信息結合業務特點,從下拉列表中選擇所需服務,服務詳情頁面可查看服務詳細信息。
說明通過API調用多模態數據預處理鏈路中的算法服務時,需要提供服務ID(service_id),如文檔內容解析服務的ID為ops-document-analyze-001。
從服務列表中切換服務后,生成代碼中的service_id會同步更新。當代碼下載到本地環境后,您仍可以更改service_id,調用對應服務。
環節
服務說明
文檔內容解析
文檔內容解析服務(ops-document-analyze-001):提供通用文檔解析服務,支持從非結構化文檔(文本、表格、圖片等)中提取標題、分段等邏輯層級結構,以結構化格式輸出。
圖片內容解析
圖片內容理解服務(ops-image-analyze-vlm-001):可基于多模態大模型對圖片內容進行解析理解以及文字識別,解析后的文本可用于圖片檢索問答場景。
圖片文本識別服務(ops-image-analyze-ocr-001):使用OCR能力進行圖片文字識別,解析后的文本可用于圖片檢索問答場景。
文檔切片
文檔切片服務(ops-document-split-001):提供通用文本切片服務,支持基于文檔段落、文本語義、指定規則,對HTML、Markdown、txt格式的結構化數據進行拆分,同時支持以富文本形式提取文檔中的代碼、圖片以及表格。
文本向量化
OpenSearch文本向量化服務-001(ops-text-embedding-001):提供多語言(40+)文本向量化服務,輸入文本最大長度300,輸出向量維度1536維。
OpenSearch通用文本向量化服務-002(ops-text-embedding-002):提供多語言(100+)文本向量化服務,輸入文本最大長度8192,輸出向量維度1024維。
OpenSearch文本向量化服務-中文-001(ops-text-embedding-zh-001):提供中文文本向量化服務,輸入文本最大長度1024,輸出向量維度768維。
OpenSearch文本向量化服務-英文-001(ops-text-embedding-en-001):提供英文文本向量化服務,輸入文本最大長度512,輸出向量維度768維。
文本稀疏向量化
提供將文本數據轉化為稀疏向量形式表達的服務,稀疏向量存儲空間更小,常用于表達關鍵詞和詞頻信息,可與稠密向量搭配進行混合檢索,提升檢索效果。
OpenSearch文本稀疏向量化服務(ops-text-sparse-embedding-001):提供多語言(100+)文本向量化服務,輸入文本最大長度8192。
完成服務選型后,單擊配置完成,進入代碼查詢查看和下載代碼,按照應用調用數據預處理鏈路時的運行流程:
作用 | 說明 |
負責文檔處理,包含文檔解析/圖片解析、文檔切片、文本向量化。 | 使用主函數document_pipeline_execute完成以下流程,可通過文檔URL或Base64編碼輸入待處理文檔。
|
選擇代碼查詢下的文檔解析與向量化,單擊復制代碼或者下載文件,將代碼下載到本地。
步驟二:本地環境適配和測試數據預處理開發鏈路
將代碼下載到本地文件后,需要配置代碼中的關鍵參數。
類別 | 參數 | 說明 |
AI搜索開放平臺 | api_key | API調用密鑰,獲取方式請參見管理API Key。 |
aisearch_endpoint | API調用地址,獲取方式請參見獲取服務接入地址。 說明 注意需要去掉“http://”。 支持通過公網和VPC兩種方式調用API。 | |
workspace_name | AI搜索開放平臺中的空間名稱。 | |
service_id | 服務ID,為操作方便,可以通過service_id_config配置各項服務以及ID。 |
完成參數配置后即可在Python 3.8.1及以上版本環境中運行代碼,測試結果是否正確。
如在代碼中對AI搜索開放平臺介紹進行數據預處理,運行結果如下:
文檔解析與向量化文件:
# 多模態數據處理鏈路
# 環境需求:
# Python版本:3.7及以上
# 包需求:
# pip install alibabacloud_searchplat20240529
# AI搜索開放平臺配置
aisearch_endpoint = "xxx.platform-cn-shanghai.opensearch.aliyuncs.com"
api_key = "OS-xxx"
workspace_name = "default"
service_id_config = {"document_analyze": "ops-document-analyze-001",
"split": "ops-document-split-001",
"text_embedding": "ops-text-embedding-001",
"text_sparse_embedding": "ops-text-sparse-embedding-001",
"image_analyze": "ops-image-analyze-ocr-001"}
# 輸入文檔url,示例文檔為opensearch產品說明文檔
document_url = "http://bestwisewords.com/zh/open-search/search-platform/product-overview/introduction-to-search-platform?spm=a2c4g.11186623.0.0.7ab93526WDzQ8z"
import asyncio
from operator import attrgetter
from typing import List
from Tea.exceptions import TeaException, RetryError
from alibabacloud_tea_openapi.models import Config
from alibabacloud_searchplat20240529.client import Client
from alibabacloud_searchplat20240529.models import GetDocumentSplitRequest, CreateDocumentAnalyzeTaskRequest, \
CreateDocumentAnalyzeTaskRequestDocument, GetDocumentAnalyzeTaskStatusRequest, \
GetDocumentSplitRequestDocument, GetTextEmbeddingRequest, GetTextEmbeddingResponseBodyResultEmbeddings, \
GetTextSparseEmbeddingRequest, GetTextSparseEmbeddingResponseBodyResultSparseEmbeddings, \
GetImageAnalyzeTaskStatusResponse, CreateImageAnalyzeTaskRequest, GetImageAnalyzeTaskStatusRequest, \
CreateImageAnalyzeTaskRequestDocument, CreateImageAnalyzeTaskResponse
async def poll_doc_analyze_task_result(ops_client, task_id, service_id, interval=5):
while True:
request = GetDocumentAnalyzeTaskStatusRequest(task_id=task_id)
response = await ops_client.get_document_analyze_task_status_async(workspace_name, service_id, request)
status = response.body.result.status
if status == "PENDING":
await asyncio.sleep(interval)
elif status == "SUCCESS":
return response
else:
print("error: " + response.body.result.error)
raise Exception("document analyze task failed")
def is_analyzable_url(url:str):
if not url:
return False
image_extensions = {'.jpg', '.jpeg', '.png', '.bmp', '.tiff'}
return url.lower().endswith(tuple(image_extensions))
async def image_analyze(ops_client, url):
try:
print("image analyze :" + url)
if url.startswith("http://"):
url = "https:" + url
if not is_analyzable_url(url):
print(url + " is unanalysable.")
return url
image_analyze_service_id = service_id_config["image_analyze"]
document = CreateImageAnalyzeTaskRequestDocument(
url=url,
)
request = CreateImageAnalyzeTaskRequest(document=document)
response: CreateImageAnalyzeTaskResponse = ops_client.create_image_analyze_task(workspace_name, image_analyze_service_id, request)
task_id = response.body.result.task_id
while True:
request = GetImageAnalyzeTaskStatusRequest(task_id=task_id)
response: GetImageAnalyzeTaskStatusResponse = ops_client.get_image_analyze_task_status(workspace_name, image_analyze_service_id, request)
status = response.body.result.status
if status == "PENDING":
await asyncio.sleep(5)
elif status == "SUCCESS":
return url + response.body.result.data.content
else:
print("image analyze error: " + response.body.result.error)
return url
except Exception as e:
print(f"image analyze Exception : {e}")
def chunk_list(lst, chunk_size):
for i in range(0, len(lst), chunk_size):
yield lst[i:i + chunk_size]
async def document_pipeline_execute(document_url: str = None, document_base64: str = None, file_name: str = None):
# 生成opensearch開發平臺client
config = Config(bearer_token=api_key,endpoint=aisearch_endpoint,protocol="http")
ops_client = Client(config=config)
# Step 1: 文檔解析/圖片解析
document_analyze_request = CreateDocumentAnalyzeTaskRequest(document=CreateDocumentAnalyzeTaskRequestDocument(url=document_url, content=document_base64,file_name=file_name, file_type='html'))
document_analyze_response = await ops_client.create_document_analyze_task_async(workspace_name=workspace_name,service_id=service_id_config["document_analyze"],request=document_analyze_request)
print("document_analyze task_id:" + document_analyze_response.body.result.task_id)
extraction_result = await poll_doc_analyze_task_result(ops_client, document_analyze_response.body.result.task_id, service_id_config["document_analyze"])
print("document_analyze done")
document_content = extraction_result.body.result.data.content
content_type = extraction_result.body.result.data.content_type
# Step 2: 文檔切片
document_split_request = GetDocumentSplitRequest(
GetDocumentSplitRequestDocument(content=document_content, content_type=content_type))
document_split_result = await ops_client.get_document_split_async(workspace_name, service_id_config["split"],
document_split_request)
print("document-split done, chunks count: " + str(len(document_split_result.body.result.chunks))
+ " rich text count:" + str(len(document_split_result.body.result.rich_texts)))
# Step 3: 文本向量化
# 提取切片結果。圖片切片會通過圖片解析服務提取出文本內容
doc_list = ([{"id": chunk.meta.get("id"), "content": chunk.content} for chunk in document_split_result.body.result.chunks]
+ [{"id": chunk.meta.get("id"), "content": chunk.content} for chunk in document_split_result.body.result.rich_texts if chunk.meta.get("type") != "image"]
+ [{"id": chunk.meta.get("id"), "content": await image_analyze(ops_client,chunk.content)} for chunk in document_split_result.body.result.rich_texts if chunk.meta.get("type") == "image"]
)
chunk_size = 32 # 一次最多允許計算32個embedding
all_text_embeddings: List[GetTextEmbeddingResponseBodyResultEmbeddings] = []
for chunk in chunk_list([text["content"] for text in doc_list], chunk_size):
response = await ops_client.get_text_embedding_async(workspace_name,service_id_config["text_embedding"],GetTextEmbeddingRequest(chunk))
all_text_embeddings.extend(response.body.result.embeddings)
all_text_sparse_embeddings: List[GetTextSparseEmbeddingResponseBodyResultSparseEmbeddings] = []
for chunk in chunk_list([text["content"] for text in doc_list], chunk_size):
response = await ops_client.get_text_sparse_embedding_async(workspace_name,service_id_config["text_sparse_embedding"],GetTextSparseEmbeddingRequest(chunk,input_type="document",return_token=True))
all_text_sparse_embeddings.extend(response.body.result.sparse_embeddings)
for i in range(len(doc_list)):
doc_list[i]["embedding"] = all_text_embeddings[i].embedding
doc_list[i]["sparse_embedding"] = all_text_sparse_embeddings[i].embedding
print("text-embedding done.")
if __name__ == "__main__":
# 運行異步任務
# import nest_asyncio # 如果在Jupyter notebook中運行,反注釋這兩行
# nest_asyncio.apply() # 如果在Jupyter notebook中運行,反注釋這兩行
asyncio.run(document_pipeline_execute(document_url))
# asyncio.run(document_pipeline_execute(document_base64="eHh4eHh4eHg...", file_name="attention.pdf")) #另外一種調用方式