配置環境變量
配置環境變量ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET。
重要
阿里云賬號AccessKey擁有所有API的訪問權限,建議您使用RAM用戶進行API訪問或日常運維,具體操作,請參見創建RAM用戶。
創建AccessKey ID和AccessKey Secret,請參考創建AccessKey。
如果您使用的是RAM用戶的AccessKey,請確保主賬號已授權AliyunServiceRoleForOpenSearch服務關聯角色,請參考OpenSearch-行業算法版服務關聯角色,相關文檔參考訪問鑒權規則。
請不要將AccessKey ID和AccessKey Secret保存到工程代碼里,否則可能導致AccessKey泄露,威脅您賬號下所有資源的安全。
Linux和macOS系統配置方法:
執行以下命令,其中,
<access_key_id>
需替換為您RAM用戶的AccessKey ID,<access_key_secret>
替換為您RAM用戶的AccessKey Secret。export ALIBABA_CLOUD_ACCESS_KEY_ID=<access_key_id> export ALIBABA_CLOUD_ACCESS_KEY_SECRET=<access_key_secret>
Windows系統配置方法
新建環境變量文件,添加環境變量ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET,并寫入已準備好的AccessKey ID和AccessKey Secret。
重啟Windows系統生效。
引入依賴
pip install alibabacloud_tea_util
pip install alibabacloud_opensearch_util
pip install alibabacloud_credentials
BaseRequest參考:Python client 示例
推送示例
# -*- coding: utf-8 -*-
import time,os
from typing import Dict, Any
from Tea.exceptions import TeaException
from Tea.request import TeaRequest
from alibabacloud_tea_util import models as util_models
from BaseRequest import Config, Client
class opensearch:
def __init__(self, config: Config):
self.Clients = Client(config=config)
self.runtime = util_models.RuntimeOptions(
connect_timeout=10000,
read_timeout=10000,
autoretry=False,
ignore_ssl=False,
max_idle_conns=50,
max_attempts=3
)
self.header = {}
def docBulk(self, app_name: str, table_name: str, doc_content: list) -> Dict[str, Any]:
try:
response = self.Clients._request(method="POST",
pathname=f'/v3/openapi/apps/{app_name}/{table_name}/actions/bulk',query={},headers = self.header,
body=doc_content, runtime=self.runtime)
return response
except Exception as e:
print(e)
if __name__ == "__main__":
# 配置統一的請求入口 注意:host需要去掉http://
endpoint = "<endpoint>"
# 支持 protocol 配置 HTTPS/HTTP
endpoint_protocol = "HTTP"
# 用戶識別信息
# 從環境變量讀取配置的AccessKey ID和AccessKey Secret,
# 運行代碼示例前必須先配置環境變量,參考文檔上面“配置環境變量”步驟
access_key_id = os.environ.get("ALIBABA_CLOUD_ACCESS_KEY_ID")
access_key_secret = os.environ.get("ALIBABA_CLOUD_ACCESS_KEY_SECRET")
# 支持 type 配置 sts/access_key 鑒權. 其中 type 默認為 access_key 鑒權. 使用 sts 可配置 RAM-STS 鑒權.
# 備選參數為: sts 或者 access_key
auth_type = "sts"
# 如果使用 RAM-STS 鑒權, 請配置 security_token, 可使用 阿里云 AssumeRole 獲取 相關 STS 鑒權結構.
security_token = "<security_token>"
# 配置請求使用的通用信息.
#注意:security_token和type參數,如果不是子賬號需要省略
Configs = Config(endpoint=endpoint, access_key_id=access_key_id, access_key_secret=access_key_secret,
security_token=security_token, type=auth_type, protocol=endpoint_protocol)
# 創建 opensearch 實例
ops = opensearch(Configs)
app_name = "app_name"
table_name = "table_name"
# --------------- 文檔推送 ---------------
# timestamp 信息 用以增加對 文檔操作的保序能力. 系統會用該時間戳來作為同一主鍵文檔更新順序的判斷標準.
# 在沒有該timestamp項時,默認以文檔發送到OpenSearch的時間作為文檔更新時間進行操作。
document1 = {"cmd": "ADD", "timestamp": int(time.time() * 1000), "fields": {"id": "1", "title": "opensearch"}}
document2 = {"cmd": "ADD", "fields": {"id": 2, "describe": "123456"}}
#刪除記錄
deletedoc={"cmd": "DELETE", "fields": {"id": 2}}
#更新記錄
updatedoc={"cmd": "UPDATE", "fields": {"id": 2, "describe": "6666","title": "開放搜索"}}
documents = [document1,document2]
res5 = ops.docBulk(app_name=app_name, table_name=table_name, doc_content=documents)
print(res5)
說明
參考:數據處理
文檔內容是否對您有幫助?