本文基于Java API Client 8.x版本,為您介紹Elasticsearch Java API的用法。
背景信息
Elasticsearch在7.17版本之前使用的Java客戶端是Java REST Client,從7.17版本開始Elastic官方將Java REST Client標記為棄用(deprecated),并推薦使用新版Java客戶端Java API Client。
Java API Client簡介
Java API Client是一個用于與Elasticsearch服務器進行通信的Java客戶端庫,幫助開發人員與Elasticsearch服務器進行通信,開發人員可以更加輕松地開發和維護代碼。
Java API Client主要包含三個部分:
ElasticsearchClient類:Java API Client的核心類,提供與Elasticsearch服務器進行通信的方法。該類封裝了底層的Transport通信,并提供了同步和異步調用、流式和函數式調用等方法。
JSON object mapper:處理數據序列化和反序列化的庫。JSON object mapper與Jackson無縫集成,可以將Java對象映射到JSON格式。
通用能力:提供了連接池、重試、JSON序列化等通用能力,提高了代碼的可讀性和可維護性,便于開發人員進行開發。
準備工作
安裝Java,要求JDK版本為1.8及以上。安裝方法請參見安裝JDK。
創建阿里云Elasticsearch實例,版本需大于或等于Java API Client的版本。本文創建一個8.x版本的實例,創建方法請參見創建阿里云Elasticsearch實例。
說明為了保證最大程度地使用新版客戶端的特性,推薦Java API Client版本與集群版本一致。
開啟阿里云Elasticsearch實例的自動創建索引功能,具體操作請參見配置YML參數。如果未開啟會提示如下報錯。
配置阿里云Elasticsearch實例的白名單,確保網絡互通。
如果運行Java代碼的服務器在公網環境下,可通過阿里云Elasticsearch實例的公網地址進行連通。連通前,需要開啟阿里云Elasticsearch實例的公網地址,并修改公網地址訪問白名單,將服務器的公網IP地址加入白名單中。具體操作步驟請參見配置實例公網或私網訪問白名單。
重要如果您的客戶端處在家庭網絡或公司局域網中,您需要將局域網的公網出口IP地址添加到白名單中,而非客戶端機器的內網機制。建議您通過瀏覽器訪問cip.cc獲取您當前使用的公網IP地址。
您也可以將白名單配置為0.0.0.0/0,允許所有IPv4地址訪問阿里云Elasticsearch實例。此配置會導致實例完全暴露在公網中,增加安全風險,配置前請確認您是否可以接受這個風險。
如果未配置白名單或白名單配置錯誤,系統會提示連接超時報錯(Timeout connecting)。
如果您需要通過客戶端訪問Kibana節點,還需要配置Kibana的訪問白名單,詳細信息請參見配置Kibana公網或私網訪問白名單。
如果運行Java代碼的服務器與阿里云Elasticsearch實例在同一專有網絡VPC(Virtual Private Cloud)中,可通過阿里云Elasticsearch實例的私網地址進行連通。連通前,需要確保VPC私網訪問白名單(默認為0.0.0.0/0)中已添加了服務器的內網IP地址。
客戶端和ES服務連接配置
Elasticsearch初始化了三種客戶端:
低級客戶端。
// Create the low-level client RestClient restClient = RestClient.builder( new HttpHost("localhost", 9200,"http")).build();
通信Transport,并利用JacksonJsonpMapper做數據的解析。
// Create the transport with a Jackson mapper ElasticsearchTransport transport = new RestClientTransport( restClient, new JacksonJsonpMapper());
阻塞的Java客戶端。
// And create the API client ElasticsearchClient elasticsearchClient = new ElasticsearchClient(transport); System.out.println("elasticsearchClient = " + elasticsearchClient);
pom依賴
使用時,您需要將pom依賴中的8.x版本號替換為具體的版本號。
<dependency>
<groupId>co.elastic.clients</groupId>
<artifactId>elasticsearch-java</artifactId>
<version>8.x</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.12.3</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.20.0</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.20.0</version>
</dependency>
Log4j可能存在遠程代碼執行漏洞,詳細信息請參見漏洞公告 | Apache Log4j2遠程代碼執行漏洞。
示例
以下示例代碼中帶{}
的參數需要替換為具體業務的參數。
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.indices.*;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.client.*;
import java.io.IOException;
public class RestClientTest {
public static void main(String[] args) {
// 阿里云Elasticsearch集群需要basic auth驗證。
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
//訪問用戶名和密碼為您創建阿里云Elasticsearch實例時設置的用戶名和密碼,也是Kibana控制臺的登錄用戶名和密碼。
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("{訪問用戶名}", "{訪問密碼}"));
// 通過builder創建rest client,配置http client的HttpClientConfigCallback。
// 單擊所創建的Elasticsearch實例ID,在基本信息頁面獲取公網地址,即為ES集群地址。
RestClient restClient = RestClient.builder(new HttpHost("{ES集群地址}", 9200, "http"))
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
}).build();
// 使用 Jackson 映射器創建傳輸
ElasticsearchTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
// 創建 API 客戶端
ElasticsearchClient elasticsearchClient = new ElasticsearchClient(transport);
//System.out.println("elasticsearchClient = " + elasticsearchClient);
// 創建、刪除索引和查看所有索引信息
// 創建了一個名為 "foo" 的別名,并將其設置為寫入別名 (write index),這意味著所有寫入操作(例如索引、更新、刪除文檔)將自動路由到這個別名所對應的索引上。
try {
//創建索引
CreateIndexResponse indexRequest = elasticsearchClient.indices().create(createIndexBuilder -> createIndexBuilder
.index("{index_name}")
.aliases("{foo}", aliasBuilder -> aliasBuilder
.isWriteIndex(true)
)
);
//檢查“indexRequest”請求的操作是否已被Elasticsearch集群確認
boolean acknowledged = indexRequest.acknowledged();
System.out.println("Index document successfully! " + acknowledged);
//刪除索引
DeleteIndexResponse deleteResponse = elasticsearchClient.indices().delete(createIndexBuilder -> createIndexBuilder
.index("{index_name}")
);
System.out.println("Delete document successfully! \n" + deleteResponse.toString());
//查看所有索引信息(health status index uuid pri rep)
IndicesResponse indicesResponse = elasticsearchClient.cat().indices();
indicesResponse.valueBody().forEach(info -> System.out.println(info.health() + "\t"+ info.status() + "\t" + info.index() + "\t" + info.uuid() +"\t" + info.pri() + "\t" + info.rep()));
transport.close();
restClient.close();
} catch (IOException ioException) {
// 異常處理。
}
}
}
高并發場景需要增加客戶端連接數,具體配置如下:
httpClientBuilder.setMaxConnTotal(500);
httpClientBuilder.setMaxConnPerRoute(300);
連接代碼示例:
String host = "localhost";
int port = 9200;
String username = "elastic";
String password = "passwd";
final int max_conn_total = 500;
final int max_conn_per_route = 300;
RestClient restClient = RestClient.builder(new HttpHost("localhost", 9200, "http"))
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
httpClientBuilder.setMaxConnTotal(max_conn_total);
httpClientBuilder.setMaxConnPerRoute(max_conn_per_route);
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
}).build();
更多Java API Client的使用特性,請參見Java API Client官方文檔。