本文通過示例代碼指導您使用多值模型寫入數據。詳細示例請參見Aliyun TSDB SDK for Java中的test/example/ExampleOfMultiField和test/com.aliyun.hitsdb.client/TestHiTSDBClientMultiFieldFeatures。
說明 多值模型僅在0.2.0版本及以上的SDK支持。關于最新SDK版本的說明及下載,請參見SDK的《版本說明》。之前版本的SDK所支持的多值模型實際是模擬多值模型,不鼓勵繼續使用。之前通過MultiValue 類寫入的數據并不能通過最新的 SDK 多值模型相關類 (MultiField) 進行查詢。
多值模型的數據寫入
示例代碼:
/*
* 通過 multiFieldPutSync() API。
* 該 API 支持單點和多點(List)寫入。
*
* 下面是必須寫入時必須提供的信息:
* Metric: 數據指標的類別。相當于 influxdb 的 Measurement。
* Fields: 數據指標的度量信息(相當于指標下的子類別)。即一個 metric 支持多個 fields。如 metric 為 wind,該 metric 可以有多個 fields:direction, speed, direction, description 和 temperature。
* Timestamp: 數據點的時間戳
* Tags: 時間線額外的信息,如"型號=ABC123"、"出廠編號=1234567890"等。
*/
MultiFieldPoint multiFieldPoint = MultiFieldPoint.metric("wind")
.field("speed", 45.2)
.field("level", 1.2)
.field("direction", "S")
.field("description", "Breeze")
.tag("sensor", "95D8-7913")
.tag("city", "hangzhou")
.tag("province", "zhejiang")
.timestamp(1537170208L)
.build();
// 同步寫入
tsdb.multiFieldPutSync(multiFieldPoint);
多值模型的異步寫入
注意:多值模型異步僅在 0.2.1 版本及以上的 SDK 支持。關于最新 SDK 版本的說明及下載,請參見SDK的《版本說明》。
不使用回調方法
默認的TSDBConfig沒有設置多值異步寫入回調方法。示例代碼:
TSDBConfig config = TSDBConfig.address("127.0.0.1", 8242)
// 批次寫入時每次提交數據量
.batchPutSize(500)
// 單值異步寫入線程數
.batchPutConsumerThreadCount(1)
// 多值異步寫入緩存隊列長度
.multiFieldBatchPutBufferSize(10000)
// 多值異步寫入線程數
.multiFieldBatchPutConsumerThreadCount(1)
.config();
// 特別注意,TSDB只需初始化一次即可
TSDB tsdb = TSDBClientFactory.connect(config);
MultiFieldPoint point = MultiFieldPoint
.metric("test-test-test")
.tag("a", "1")
.tag("b", "2")
.timestamp(System.currentTimeMillis())
.field("f1", Math.random())
.field("f2", Math.random())
.build();
tsdb.multiFieldPut(point);
// 特別注意,在應用生命周期內不用關閉TSDB實例,直到應用結束時關閉即可
tsdb.close();
}
使用普通回調方法
普通回調方法僅報告當寫入成功時的數據點列表,或者當寫入失敗時的數據點列表以及對應的寫入異常。示例代碼:
// 創建一個普通回調方法,用于處理寫入成功或者失敗后的情況
AbstractMultiFieldBatchPutCallback callback = new MultiFieldBatchPutCallback() {
@Override
public void response(String address, List<MultiFieldPoint> points, Result result) {
int count = num.addAndGet(points.size());
System.out.println(result);
System.out.println("已處理" + count + "個點");
}
final AtomicInteger num = new AtomicInteger();
@Override
public void failed(String address, List<MultiFieldPoint> points, Exception ex) {
System.err.println("業務回調出錯!" + points.size() + " error!");
ex.printStackTrace();
}
};
TSDBConfig config = TSDBConfig.address("127.0.0.1", 8242)
.httpConnectTimeout(90)
// 批次寫入時每次提交數據量
.batchPutSize(500)
// 單值異步寫入線程數
.batchPutConsumerThreadCount(1)
// 多值異步寫入緩存隊列長度
.multiFieldBatchPutBufferSize(10000)
// 多值異步寫入線程數
.multiFieldBatchPutConsumerThreadCount(1)
// 多值異步寫入回調方法
.listenMultiFieldBatchPut(callback)
.config();
// 特別注意,TSDB只需初始化一次即可
TSDB tsdb = TSDBClientFactory.connect(config);
MultiFieldPoint point = MultiFieldPoint
.metric("test-test-test")
.tag("a", "1")
.tag("b", "2")
.timestamp(System.currentTimeMillis())
.field("f1", Math.random())
.field("f2", Math.random())
.build();
tsdb.multiFieldPut(point);
// 特別注意,在應用生命周期內不用關閉TSDB實例,直到應用結束時關閉即可
tsdb.close();
}
使用摘要回調方法
摘要回調方法,當寫入成功時,報告這次寫入成功時的數據點列表,以及對應的寫入摘要情況,包括成功點數,失敗點數;當寫入失敗時,報告這次寫入失敗時的數據點列表以及對應的寫入異常。示例代碼:
// 創建一個摘要回調方法,用于處理寫入成功或者失敗后的情況
AbstractMultiFieldBatchPutCallback callback = new MultiFieldBatchPutSummaryCallback() {
final AtomicInteger num = new AtomicInteger();
@Override
public void response(String address, List<MultiFieldPoint> points, SummaryResult result) {
int count = num.addAndGet(points.size());
System.out.println(result);
System.out.println("已處理" + count + "個點");
}
@Override
public void failed(String address, List<MultiFieldPoint> points, Exception ex) {
System.err.println("業務回調出錯!" + points.size() + " error!");
ex.printStackTrace();
}
};
TSDBConfig config = TSDBConfig.address("127.0.0.1", 8242)
.httpConnectTimeout(90)
// 批次寫入時每次提交數據量
.batchPutSize(500)
// 單值異步寫入線程數
.batchPutConsumerThreadCount(1)
// 多值異步寫入緩存隊列長度
.multiFieldBatchPutBufferSize(10000)
// 多值異步寫入線程數
.multiFieldBatchPutConsumerThreadCount(1)
// 多值異步寫入回調方法
.listenMultiFieldBatchPut(callback)
.config();
// 特別注意,TSDB只需初始化一次即可
TSDB tsdb = TSDBClientFactory.connect(config);
MultiFieldPoint point = MultiFieldPoint
.metric("test-test-test")
.tag("a", "1")
.tag("b", "2")
.timestamp(System.currentTimeMillis())
.field("f1", Math.random())
.field("f2", Math.random())
.build();
tsdb.multiFieldPut(point);
// 特別注意,在應用生命周期內不用關閉TSDB實例,直到應用結束時關閉即可
tsdb.close();
}
使用詳情回調方法
詳情回調方法,當寫入成功時,報告這次寫入成功時的數據點列表,以及對應的寫入詳情,包括寫入失敗的點以及對應錯誤信息等;當寫入失敗時,報告這次寫入失敗時的數據點列表以及對應的寫入異常。示例代碼:
// 創建一個詳情回調方法,用于處理寫入成功或者失敗后的情況
MultiFieldBatchPutDetailsCallback callback = new MultiFieldBatchPutDetailsCallback() {
final AtomicInteger num = new AtomicInteger();
@Override
public void response(String address, List<MultiFieldPoint> points, DetailsResult result) {
int count = num.addAndGet(points.size());
System.out.println(result);
System.out.println("已處理" + count + "個點");
}
@Override
public void failed(String address, List<MultiFieldPoint> points, Exception ex) {
System.err.println("業務回調出錯!" + points.size() + " error!");
ex.printStackTrace();
}
};
TSDBConfig config = TSDBConfig.address("127.0.0.1", 8242)
.httpConnectTimeout(90)
// 批次寫入時每次提交數據量
.batchPutSize(500)
// 單值異步寫入線程數
.batchPutConsumerThreadCount(1)
// 多值異步寫入緩存隊列長度
.multiFieldBatchPutBufferSize(10000)
// 多值異步寫入線程數
.multiFieldBatchPutConsumerThreadCount(1)
// 多值異步寫入回調方法
.listenMultiFieldBatchPut(callback)
.config();
// 特別注意,TSDB只需初始化一次即可
TSDB tsdb = TSDBClientFactory.connect(config);
MultiFieldPoint point = MultiFieldPoint
.metric("test-test-test")
.tag("a", "1")
.tag("b", "2")
.timestamp(System.currentTimeMillis())
.field("f1", Math.random())
.field("f2", Math.random())
.build();
tsdb.multiFieldPut(point);
// 特別注意,在應用生命周期內不用關閉TSDB實例,直到應用結束時關閉即可
tsdb.close();
}