通過客戶端訪問阿里云Elasticsearch
本文介紹如何使用PHP、Python、Java和Go等語言訪問Elasticsearch實例或Serverless應(yīng)用,并為您提供示例代碼和注意事項。
準(zhǔn)備工作
創(chuàng)建Elasticsearch實例或Serverless應(yīng)用。 具體操作,請參見創(chuàng)建阿里云Elasticsearch實例和創(chuàng)建Serverless應(yīng)用。
安裝對應(yīng)語言的Elasticsearch客戶端。
建議創(chuàng)建與Elasticsearch相同版本的客戶端,避免出現(xiàn)兼容性問題。Elasticsearch與客戶端版本兼容性的詳細(xì)信息,請參見Compatibility。
Elasticsearch GO客戶端:Elasticsearch Go Client。
說明使用Go語言連接阿里云Elasticsearch前,您需要先安裝Go編譯環(huán)境,詳細(xì)信息請參見The Go Programming Language。本文示例使用Go 1.19.1版本。
Elasticsearch Java客戶端:Elasticsearch Java API Client。
說明Java客戶端類型包括:Transport Client、Low Level REST Client、High Level REST Client和Java API Client,各類型的代碼示例,請參見概述。本文以6.7版本的High Level REST Client為例。
由于Java Transport Client通過TCP與Elasticsearch進(jìn)行通信,當(dāng)客戶端與不同版本的Elasticsearch通信時,會存在兼容性問題,所以官方在高版本集群中已棄用Transport Client。如果您已經(jīng)創(chuàng)建了,使用Transport Client 5.5或5.6版本與5.5或5.6版本的Elasticsearch集群建立連接時會提示NoNodeAvailableException的錯誤,推薦您使用Transport Client 5.3.3或Java Low Level REST Client訪問Elasticsearch集群,以保障版本的兼容性。
Elasticsearch PHP客戶端:Elasticsearch PHP Client。
說明Elasticsearch的PHP客戶端提供的默認(rèn)連接池并不適合云上環(huán)境。阿里云Elasticsearch提供了負(fù)載均衡的域名服務(wù),因此PHP客戶端訪問程序必須使用SimpleConnectionPool作為連接池,否則在觸發(fā)阿里云Elasticsearch重啟操作時會出現(xiàn)訪問連接異常的問題。同時,PHP客戶端訪問程序必須具備訪問連接失敗重連的機(jī)制。因為在PHP客戶端訪問程序使用SimpleConnectionPool作為連接池后,不排除在觸發(fā)阿里云Elasticsearch重啟操作時還會出現(xiàn)訪問連接異常問題(例如,提示No enabled connection)。
Elasticsearch Python客戶端:Elasticsearch Python Client。
更多Elasticsearch客戶端:Elasticsearch Clients。
開啟Elasticsearch實例的自動創(chuàng)建索引功能。具體操作請參見配置YML參數(shù)。
配置Elasticsearch實例或Serverless應(yīng)用白名單,確保網(wǎng)絡(luò)互通。
如果運(yùn)行代碼的服務(wù)器與Elasticsearch實例在同一專有網(wǎng)絡(luò)VPC(Virtual Private Cloud)中,可通過阿里云Elasticsearch實例的私網(wǎng)地址進(jìn)行連通。連通前,需要確保VPC私網(wǎng)訪問白名單(默認(rèn)為0.0.0.0/0)中已添加了服務(wù)器的私網(wǎng)IP地址。
如果運(yùn)行代碼的服務(wù)器在公網(wǎng)環(huán)境下,可通過Elasticsearch的公網(wǎng)地址進(jìn)行連通。開啟Elasticsearch的公網(wǎng)地址,并將服務(wù)器的公網(wǎng)IP地址添加到Elasticsearch實例或Serverless應(yīng)用公網(wǎng)地址訪問白名單中。具體操作,請參見配置Elasticsearch實例公網(wǎng)或私網(wǎng)訪問白名單和配置Serverless應(yīng)用公網(wǎng)訪問白名單。
重要如果您使用的是WIFI、寬帶等網(wǎng)絡(luò),需要將公網(wǎng)出口的跳板機(jī)IP地址配置進(jìn)去。建議您通過瀏覽器訪問cip.cc查詢。
您也可以將白名單配置為0.0.0.0/0,允許所有IPv4地址訪問Elasticsearch。此配置會導(dǎo)致實例完全暴露在公網(wǎng)中,增加安全風(fēng)險,配置前請確認(rèn)您是否可以接受這個風(fēng)險。
如果未配置白名單或白名單配置錯誤,系統(tǒng)會提示連接超時
Timeout connecting
。如果您需要通過客戶端訪問Kibana節(jié)點(diǎn),還需要配置Kibana的訪問白名單。詳細(xì)信息,請參見配置Kibana公網(wǎng)或私網(wǎng)訪問白名單和配置Serverless應(yīng)用公網(wǎng)訪問白名單。
示例代碼
常見客戶端訪問阿里云Elasticsearch實例的代碼示例。
//以Go 1.19.1版本為例。
package main
import (
"log"
"github.com/elastic/go-elasticsearch/v7"
)
func main() {
cfg := elasticsearch.Config {
Addresses: []string{
"<YourEsHost>",
},
Username: "<UserName>",
Password: "<YourPassword>",
}
es, err := elasticsearch.NewClient(cfg)
if err != nil {
log.Fatalf("Error creating the client: %s", err)
}
res, err := es.Info()
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
defer res.Body.Close()
log.Println(res)
}
//以6.7版本的High Level REST Client為例。
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class RestClientTest67 {
private static final RequestOptions COMMON_OPTIONS;
static {
RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
// 默認(rèn)緩存限制為100MB,此處修改為30MB。
builder.setHttpAsyncResponseConsumerFactory(
new HttpAsyncResponseConsumerFactory
.HeapBufferedResponseConsumerFactory(30 * 1024 * 1024));
COMMON_OPTIONS = builder.build();
}
public static void main(String[] args) {
// 阿里云Elasticsearch集群需要basic auth驗證。
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
//訪問用戶名和密碼為您創(chuàng)建阿里云Elasticsearch實例時設(shè)置的用戶名和密碼,也是Kibana控制臺的登錄用戶名和密碼。
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("<UserName>", "<YourPassword>"));
// 通過builder創(chuàng)建rest client,配置http client的HttpClientConfigCallback。
// 單擊所創(chuàng)建的Elasticsearch實例ID,在基本信息頁面獲取公網(wǎng)地址,即為ES集群地址。
RestClientBuilder builder = RestClient.builder(new HttpHost("<YourEsHost>", 9200, "http"))
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
});
// RestHighLevelClient實例通過REST low-level client builder進(jìn)行構(gòu)造。
RestHighLevelClient highClient = new RestHighLevelClient(builder);
try {
// 創(chuàng)建request。
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("<YourEsField1>", "<YourEsFieldValue1>");
jsonMap.put("<YourEsField2>", "<YourEsFieldValue2>");
IndexRequest indexRequest = new IndexRequest("<YourEsIndex>", "<YourEsType>", "<YourEsId>").source(jsonMap);
// 同步執(zhí)行,并使用自定義RequestOptions(COMMON_OPTIONS)。
IndexResponse indexResponse = highClient.index(indexRequest, COMMON_OPTIONS);
long version = indexResponse.getVersion();
System.out.println("Index document successfully! " + version);
highClient.close();
} catch (IOException ioException) {
// 異常處理。
}
}
}
<?php
require 'vendor/autoload.php';
use Elasticsearch\ClientBuilder;
$client = ClientBuilder::create()->setHosts([
[
'host' => '<YourEsHost>',
'port' => '9200',
'scheme' => 'http',
'user' => '<UserName>',
'pass' => '<YourPassword>'
]
])->setConnectionPool('\Elasticsearch\ConnectionPool\SimpleConnectionPool', [])
->setRetries(10)->build();
$indexParams = [
'index' => '<YourEsIndex>',
'type' => '<YourEsType>',
'id' => '<YourEsId>',
'body' => ['<YourEsField>' => '<YourEsFieldValue>'],
'client' => [
'timeout' => 10,
'connect_timeout' => 10
]
];
$indexResponse = $client->index($indexParams);
print_r($indexResponse);
$searchParams = [
'index' => '<YourEsIndex>',
'type' => '<YourEsType>',
'body' => [
'query' => [
'match' => [
'<YourEsField>' => '<YourEsFieldValue>'
]
]
],
'client' => [
'timeout' => 10,
'connect_timeout' => 10
]
];
$searchResponse = $client->search($searchParams);
print_r($searchResponse);
?>
from elasticsearch import Elasticsearch, RequestsHttpConnection
import certifi
es = Elasticsearch(
['<YourEsHost>'],
http_auth=('<UserName>', '<YourPassword>'),
port=9200,
use_ssl=False
)
res = es.index(index="<YourEsIndex>", doc_type="<YourEsType>", id=<YourEsId>, body={"<YourEsField1>": "<YourEsFieldValue1>", "<YourEsField2>": "<YourEsFieldValue2>"})
res = es.get(index="<YourEsIndex>", doc_type="<YourEsType>", id=<YourEsId>)
print(res['_source'])
以上代碼示例只展示了HTTP連接方式,如果ES集群開啟了HTTPS協(xié)議,代碼示例中需要將use_ssl改為True,并加上verify_certs=True,其余不變。即:
es = Elasticsearch(
['<YourEsHost>'],
http_auth=('<UserName>', '<YourPassword>'),
port=9200,
use_ssl=True,
verify_certs=True
)
常見客戶端訪問Serverless應(yīng)用的代碼示例。
Elasticsearch Serverless僅支持索引相關(guān)的API,其他API無權(quán)限操作。
Elasticsearch Serverless的Python API兼容社區(qū)版ES 7.10 Python索引級別操作。更多信息,請參見 Elasticsearch API。
//以下代碼示例為如何使用BulkRequest進(jìn)行批量寫入和SearchRequest。
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.*;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class RestClientTest710 {
public static void main(String[] args) {
// 阿里云Elasticsearch Serverless集群需要basic auth驗證。
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
//訪問用戶名和密碼為您創(chuàng)建阿里云Elasticsearch Serverless實例時設(shè)置的用戶名和密碼,也是Kibana控制臺的登錄用戶名和密碼。
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("<UserName>", "<YourPassword>"));
// 通過builder創(chuàng)建rest client,配置http client的HttpClientConfigCallback。
// 單擊所創(chuàng)建的Elasticsearch Serverless實例ID,在基本信息頁面獲取公網(wǎng)地址,即為ES集群地址。
RestClientBuilder builder = RestClient.builder(new HttpHost("<YourEsHost>", 9200, "http"))
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
});
// RestHighLevelClient實例通過REST low-level client builder進(jìn)行構(gòu)造。
RestHighLevelClient highClient = new RestHighLevelClient(builder);
try {
//批量寫入
//創(chuàng)建BulkRequest 對象,并使用 timeout() 方法設(shè)置了請求的超時時間為 "10s"
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.timeout("10s");
// 創(chuàng)建request。
Map<String, Object> jsonMap = new HashMap<>();
// field_01、field_02為字段名,value_01、value_02為對應(yīng)的值。
jsonMap.put("{field_01}", "{value_01}");
jsonMap.put("{field_02}", "{value_02}");
IndexRequest indexRequest = new IndexRequest("{index_name}").id("{doc_id}").source(jsonMap);
bulkRequest.add(indexRequest);
Map<String, Object> jsonMap2 = new HashMap<>();
jsonMap2.put("{field_01}", "{value_01}");
jsonMap2.put("{field_02}", "{value_02}");
IndexRequest indexRequest2 = new IndexRequest("{index_name}").id("{doc_id}").source(jsonMap2);
bulkRequest.add(indexRequest2);
BulkResponse bulkResponse = highClient.bulk(bulkRequest, RequestOptions.DEFAULT);
System.out.println(bulkResponse.hasFailures());
//查詢所有文檔
//指定搜索的索引
SearchRequest searchRequest = new SearchRequest("{index_name}");
//構(gòu)建了一個查詢對象
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
//創(chuàng)建一個匹配所有文檔的查詢
searchSourceBuilder.query(QueryBuilders.matchAllQuery());
//將查詢對象設(shè)置為 SearchRequest 的源
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = highClient.search(searchRequest, RequestOptions.DEFAULT);
//從 SearchResponse 中獲取搜索結(jié)果的數(shù)組 SearchHit[] searchHits。
SearchHit[] searchHits = searchResponse.getHits().getHits();
//遍歷 searchHits 數(shù)組,對每一個搜索命中的文檔進(jìn)行處理。在此示例中,使用 getSourceAsString() 方法將文檔的源數(shù)據(jù)以字符串形式打印出來。
for (SearchHit hit : searchHits) {
System.out.println(hit.getSourceAsString());
}
//查詢單個文檔
GetRequest getRequest = new GetRequest("{index_name}", "{doc_id}");
GetResponse getResponse = highClient.get(getRequest, RequestOptions.DEFAULT);
//打印文檔的內(nèi)容
System.out.println(getResponse.getSourceAsString());
//返回的全部內(nèi)容和命令式一樣的
System.out.println(getResponse);
highClient.close();
} catch (IOException ioException) {
// 異常處理。
}
}
}
# coding="utf-8"
from elasticsearch import Elasticsearch
import time
import string
import random
def stru_doc(index_name,doc_type="_doc",count=100):
doc = []
for i in range(count):
_id = ''.join([random.choice(letters) for _ in range(17)])
i_doc = [{"create": {"_index": index_name, "_id": _id, "_type": doc_type}},
{"field1": "測試創(chuàng)建文檔dd汽車時代,如果你萬一開車撞了什么東西,千萬千萬要下車查看檢查,否則如果撞到的是人,就算是逃逸,只要構(gòu)成重傷,負(fù)主責(zé)以上,就構(gòu)成交通肇事罪哦。",
"filed2": "2017年12月01日之后使用公共鏡像創(chuàng)建的ECS實例,默認(rèn)預(yù)裝云助手客戶端。如果您的實例是2017年12月01日之前購買的,若需要使用云助手相關(guān)功能,請自行安裝云助手客戶端。"
"不同操作系統(tǒng)可選擇的安裝方式如下表所示。",
"field3": "如果您選擇默認(rèn)安裝路徑,客戶端在Windows實例中的安裝目錄為 C:\ProgramData\aliyun\assist\。",
"filed4":"通過閱讀本文,您可以快速了解云服務(wù)器ECS的計費(fèi)項及其計費(fèi)方式、計費(fèi)組成、定價等主要計費(fèi)信息。",
"filed5":"說明 ECS資源的基礎(chǔ)計費(fèi)方式為包年包月和按量付費(fèi),針對不同的ECS資源,您可以根據(jù)需要結(jié)合其它優(yōu)惠的計費(fèi)方式來降低使用成本。更多信息請參見 計費(fèi)方式概述。",
"filed6":"貓不在袋子里了!我們很自豪地說我們參與了 Ai.com 域名的銷售。非常期待看到他們會用該域名做什么!IT之家 2 月 16 日消息,人工智能聊天機(jī)器人 ChatGPT 近日火爆全球,該技術(shù)的開發(fā)商 OpenAI 豪擲千金,將超優(yōu)質(zhì)域名 AI.com 鏈接跳轉(zhuǎn)到了 ChatGPT。眾所周知,OpenAI 背后的金主是科技巨頭微軟,目前必應(yīng)等產(chǎn)品已開始測試 ChatGPT,資金支持自然也不在話下。微軟此前還宣布,將擴(kuò)大與 OpenAI 的合作關(guān)系,后者將獲得微軟“多年、數(shù)十億美元”的投資,具體數(shù)額沒有披露,有媒體報道稱是 100 億美元。",
"@timestamp": int(time.time())}]
doc += i_doc
return doc
def bulk_request():
"""
持續(xù)寫入 不創(chuàng)建索引
:return:
"""
index_name = "ali_push_doc_%s" % time.strftime('%H%M%S')
body = {
"settings": {"number_of_replicas": 0,
"number_of_shards": 1}
}
doc_type = "_doc"
try:
create_index = es.indices.create(index=index_name, body=body)
print(create_index)
except Exception as index:
print(time.strftime('%H:%M:%s') + str(index))
time.sleep(20)
bulk_body = stru_doc(index_name)
try:
bulk_request = es.bulk(body=bulk_body, timeout='10s')
# print(time.strftime('%H:%M:%S') + 'push_test' + str(bulk_request))
except Exception as u:
print(u)
with open('push_doc_exception.txt', 'w') as f:
f.write(time.strftime('%H:%M:%S') + url + '\n' + str(u) + '\n' + str(bulk_body) + '\n')
es.indices.refresh()
doc_account = es.count(index=index_name)
print("indexcount:",doc_account)
if doc_account["count"] == 100:
for times in range(100):
try:
query_body = {
"query": {
"match_all": {}
}
}
search_request = es.search(query_body, index_name)
print(search_request)
except Exception as e:
with open('asyn_error.txt', 'a') as f:
f.write(time.strftime('%H:%M:%S') + str(e) + '\n' + str(e) + '\n')
else:
with open('asyn_error.txt', 'a') as f:
f.write(time.strftime('%H:%M:%S') + str(doc_account) + '\n')
if __name__ == '__main__':
url = <YourEsHost>
es = Elasticsearch(url, http_auth=(<UserName>, <YourPassword>))
letters = string.ascii_lowercase + string.digits
bulk_request()
使用代碼時,您需要將以下字段替換為對應(yīng)的值。
參數(shù) | 說明 |
<YourEsHost> | Elasticsearch實例的私網(wǎng)、公網(wǎng)地址,或Serverless應(yīng)用的公網(wǎng)訪問地址。可在Elasticsearch實例或Serverless應(yīng)用的基本信息區(qū)域獲取。 |
<UserName> | 輸入Elasticsearch實例的用戶名 |
<YourPassword> | 輸入Elasticsearch實例或Serverless應(yīng)用的用戶密碼。 如果忘記密碼,可在Elasticsearch實例詳情頁的安全配置中重置,或在Serverless應(yīng)用詳情頁的基本信息區(qū)域重置。更多信息,請參見重置Elasticsearch實例訪問密碼。 |
<YourEsIndex> | 索引名稱。 |
<YourEsType> | 文檔類型。 重要 Elasticsearch 7.0以下版本可自定義文檔類型,7.0及以上版本固定為 |
<YourEsId> | 文檔ID。 |
<YourEsField> | 字段名稱。 |
<YourEsFieldValue> | <YourEsField>的值。 |