教程:通過(guò)Java Native SDK連接并使用Lindorm時(shí)序引擎
本文介紹通過(guò)Java Native SDK連接并使用Lindorm時(shí)序引擎的方法。
前提條件
已安裝Java環(huán)境,要求安裝JDK 1.8及以上版本。
時(shí)序引擎版本為3.4.7及以上版本,如何查看或升級(jí)當(dāng)前版本,請(qǐng)參見(jiàn)時(shí)序引擎版本說(shuō)明和升級(jí)小版本。
已將客戶(hù)端IP地址添加至Lindorm白名單,具體操作請(qǐng)參見(jiàn)設(shè)置白名單。
已獲取云原生多模數(shù)據(jù)庫(kù) Lindorm時(shí)序引擎的連接地址,獲取方法請(qǐng)參見(jiàn)獲取連接串。
準(zhǔn)備工作
通過(guò)Java Native SDK連接Lindorm時(shí)序引擎前,需要安裝Java Native SDK。以1.0.0版本為例,您可以通過(guò)以下三種方式安裝Java Native SDK:
(推薦)在Maven項(xiàng)目中使用Lindorm TSDB Java SDK。在pom.xml文件的
dependencies
中添加以下依賴(lài)項(xiàng)。<dependency> <groupId>com.aliyun.lindorm</groupId> <artifactId>lindorm-tsdb-client</artifactId> <version>1.0.4</version> </dependency>
說(shuō)明Lindorm TSDB Java SDK提供了一個(gè)基于Maven的示例工程,您可以直接下載示例工程并在本地編譯和運(yùn)行,也可以以示例工程為基礎(chǔ)開(kāi)發(fā)您的項(xiàng)目工程。
在Eclipse項(xiàng)目中導(dǎo)入JAR包。
解壓下載的Java SDK開(kāi)發(fā)包。
將解壓后JAR包添加至Eclipse項(xiàng)目中。
在Eclipse中打開(kāi)您的項(xiàng)目,右鍵單擊該項(xiàng)目,選擇Properties。
在彈出的對(duì)話(huà)框中,單擊lindorm-tsdb-client-1.0.0.jar和lib文件中的JAR包。
,選擇解壓后的單擊Apply and Close。
在IntelliJ IDEA項(xiàng)目中導(dǎo)入JAR包。
解壓下載的Java SDK開(kāi)發(fā)包。
將解壓后JAR包添加至IntelliJ IDEA項(xiàng)目中。
在IntelliJ IDEA中打開(kāi)您的項(xiàng)目,在菜單欄單擊
。在Project Structure對(duì)話(huà)框的左邊選擇
。單擊右邊,選擇JARs or directories。
在彈出的對(duì)話(huà)框中,選擇解壓后的lindorm-tsdb-client-1.0.0.jar和lib文件中的JAR包,并單擊OK。
單擊Apply。
單擊OK。
Lindorm時(shí)序引擎的Java Native SDK各版本可以通過(guò)Maven中央倉(cāng)庫(kù)獲取,更多信息請(qǐng)參見(jiàn)Maven Repository。
Lindorm時(shí)序引擎的Java Native SDK各版本說(shuō)明請(qǐng)參見(jiàn)版本說(shuō)明。
操作步驟
創(chuàng)建數(shù)據(jù)庫(kù)實(shí)例。新建LindormTSDBClient時(shí),需要指定Lindorm時(shí)序引擎的連接地址,獲取方法請(qǐng)參見(jiàn)獲取連接串。
String url = "http://ld-bp17j28j2y7pm****-proxy-tsdb-pub.lindorm.rds.aliyuncs.com:8242"; // LindormTSDBClient線(xiàn)程安全,可以重復(fù)使用,無(wú)需頻繁創(chuàng)建和銷(xiāo)毀 ClientOptions options = ClientOptions.newBuilder(url).build(); LindormTSDBClient lindormTSDBClient = LindormTSDBFactory.connect(options);
創(chuàng)建數(shù)據(jù)庫(kù)demo和時(shí)序表sensor。關(guān)于創(chuàng)建數(shù)據(jù)庫(kù)和時(shí)序表的SQL語(yǔ)句說(shuō)明,請(qǐng)參見(jiàn)CREATE DATABASE和CREATE TABLE。
lindormTSDBClient.execute("CREATE DATABASE demo"); lindormTSDBClient.execute("demo","CREATE TABLE sensor (device_id VARCHAR TAG,region VARCHAR TAG,time BIGINT,temperature DOUBLE,humidity DOUBLE,PRIMARY KEY(device_id))");
在表中寫(xiě)入數(shù)據(jù)。
說(shuō)明默認(rèn)情況下,為了提高寫(xiě)入數(shù)據(jù)的性能,LindormTSDBClient通過(guò)異步攢批的方式進(jìn)行數(shù)據(jù)寫(xiě)入。如果需要通過(guò)同步的方式進(jìn)行數(shù)據(jù)寫(xiě)入,可以調(diào)用
write()
方法返回的CompletableFuture<WriteResult>
的join()
方法。int numRecords = 10; List<Record> records = new ArrayList<>(numRecords); long currentTime = System.currentTimeMillis(); for (int i = 0; i < numRecords; i++) { Record record = Record .table("sensor") .time(currentTime + i * 1000) .tag("device_id", "F07A1260") .tag("region", "north-cn") .addField("temperature", 12.1 + i) .addField("humidity", 45.0 + i) .build(); records.add(record); } CompletableFuture<WriteResult> future = lindormTSDBClient.write("demo", records); // 處理異步寫(xiě)入結(jié)果 future.whenComplete((r, ex) -> { // 處理寫(xiě)入失敗 if (ex != null) { System.out.println("Failed to write."); if (ex instanceof LindormTSDBException) { LindormTSDBException e = (LindormTSDBException) ex; System.out.println("Caught an LindormTSDBException, which means your request made it to Lindorm TSDB, " + "but was rejected with an error response for some reason."); System.out.println("Error Code: " + e.getCode()); System.out.println("SQL State: " + e.getSqlstate()); System.out.println("Error Message: " + e.getMessage()); } else { ex.printStackTrace(); } } else { System.out.println("Write successfully."); } }); // 這里作為示例, 簡(jiǎn)單同步處理寫(xiě)入結(jié)果 System.out.println(future.join());
查詢(xún)時(shí)序表的數(shù)據(jù)。關(guān)于查詢(xún)操作的SQL語(yǔ)句說(shuō)明,請(qǐng)參見(jiàn)基本查詢(xún)。
String sql = "select * from sensor limit 10"; ResultSet resultSet = lindormTSDBClient.query("demo", sql); try { // 處理查詢(xún)結(jié)果 QueryResult result = null; // 查詢(xún)結(jié)果使用分批的方式返回,默認(rèn)每批1000行 // 當(dāng)resultSet的next()方法返回為null,表示已經(jīng)讀取完所有的查詢(xún)結(jié)果 while ((result = resultSet.next()) != null) { List<String> columns = result.getColumns(); System.out.println("columns: " + columns); List<String> metadata = result.getMetadata(); System.out.println("metadata: " + metadata); List<List<Object>> rows = result.getRows(); for (int i = 0, size = rows.size(); i < size; i++) { List<Object> row = rows.get(i); System.out.println("row #" + i + " : " + row); } } } finally { // 查詢(xún)結(jié)束后,需確保調(diào)用ResultSet的close方法,以釋放IO資源 resultSet.close(); }
完整的代碼示例
import com.aliyun.lindorm.tsdb.client.ClientOptions;
import com.aliyun.lindorm.tsdb.client.LindormTSDBClient;
import com.aliyun.lindorm.tsdb.client.LindormTSDBFactory;
import com.aliyun.lindorm.tsdb.client.exception.LindormTSDBException;
import com.aliyun.lindorm.tsdb.client.model.QueryResult;
import com.aliyun.lindorm.tsdb.client.model.Record;
import com.aliyun.lindorm.tsdb.client.model.ResultSet;
import com.aliyun.lindorm.tsdb.client.model.WriteResult;
import com.aliyun.lindorm.tsdb.client.utils.ExceptionUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
public class QuickStart {
public static void main(String[] args) {
// 1.創(chuàng)建客戶(hù)端實(shí)例
String url = "http://ld-xxxx-proxy-tsdb-pub.lindorm.rds.aliyuncs.com:8242";
// LindormTSDBClient線(xiàn)程安全,可以重復(fù)使用,無(wú)需頻繁創(chuàng)建和銷(xiāo)毀
ClientOptions options = ClientOptions.newBuilder(url).build();
LindormTSDBClient lindormTSDBClient = LindormTSDBFactory.connect(options);
// 2.創(chuàng)建數(shù)據(jù)庫(kù)demo和表sensor
lindormTSDBClient.execute("CREATE DATABASE demo");
lindormTSDBClient.execute("demo","CREATE TABLE sensor (device_id VARCHAR TAG,region VARCHAR TAG,time BIGINT,temperature DOUBLE,humidity DOUBLE,PRIMARY KEY(device_id))");
// 3.寫(xiě)入數(shù)據(jù)
int numRecords = 10;
List<Record> records = new ArrayList<>(numRecords);
long currentTime = System.currentTimeMillis();
for (int i = 0; i < numRecords; i++) {
Record record = Record
.table("sensor")
.time(currentTime + i * 1000)
.tag("device_id", "F07A1260")
.tag("region", "north-cn")
.addField("temperature", 12.1 + i)
.addField("humidity", 45.0 + i)
.build();
records.add(record);
}
CompletableFuture<WriteResult> future = lindormTSDBClient.write("demo", records);
// 處理異步寫(xiě)入結(jié)果
future.whenComplete((r, ex) -> {
// 處理寫(xiě)入失敗
if (ex != null) {
System.out.println("Failed to write.");
Throwable throwable = ExceptionUtils.getRootCause(ex);
if (throwable instanceof LindormTSDBException) {
LindormTSDBException e = (LindormTSDBException) throwable;
System.out.println("Caught an LindormTSDBException, which means your request made it to Lindorm TSDB, "
+ "but was rejected with an error response for some reason.");
System.out.println("Error Code: " + e.getCode());
System.out.println("SQL State: " + e.getSqlstate());
System.out.println("Error Message: " + e.getMessage());
} else {
throwable.printStackTrace();
}
} else {
System.out.println("Write successfully.");
}
});
// 這里作為示例, 簡(jiǎn)單同步等待
System.out.println(future.join());
// 4.查詢(xún)數(shù)據(jù)
String sql = "select * from sensor limit 10";
ResultSet resultSet = lindormTSDBClient.query("demo", sql);
try {
// 處理查詢(xún)結(jié)果
QueryResult result = null;
// 查詢(xún)結(jié)果使用分批的方式返回,默認(rèn)每批1000行
// 當(dāng)resultSet的next()方法返回為null,表示已經(jīng)讀取完所有的查詢(xún)結(jié)果
while ((result = resultSet.next()) != null) {
List<String> columns = result.getColumns();
System.out.println("columns: " + columns);
List<String> metadata = result.getMetadata();
System.out.println("metadata: " + metadata);
List<List<Object>> rows = result.getRows();
for (int i = 0, size = rows.size(); i < size; i++) {
List<Object> row = rows.get(i);
System.out.println("row #" + i + " : " + row);
}
}
} finally {
// 查詢(xún)結(jié)束后,需確保調(diào)用ResultSet的close方法,以釋放IO資源
resultSet.close();
}
lindormTSDBClient.shutdown();
}
}