寫入數據
TSDB SDK 使用 Point 類,表示一個時間點。一個 Point 對象表示一個時間序列(時間線)某個時刻上的數據。
構造時間點
Point(時間點)有多種構造方法,形式比較多樣化。本節提供三種構造時間點的方法示例。
Point 的構建至少需要一個 tag 鍵值對。
示例一
構建一個時間點。用單位為秒的時間戳表示時間,指定 Point 數據的 Metric 與多個 Tag。
// 以'秒'為時間戳
int timestamp = (int)(System.currentTimeMillis()/1000);
// 構造 Point
Point point = Point.metric("test1")
.tag("tagk1", "tagv1")
.tag("tagk2", "tagv2")
.tag("tagk3", "tagv3")
.timestamp(timestamp).value(123.456)
.build();
示例二
構建一個時間點。用單位為毫秒的時間戳表示時間,指定 Point 數據的 Metric,Tag 使用 Map 形式的鍵值對。
// 也可以'毫秒'為時間戳
long timestamp = System.currentTimeMillis();
// 使用 HashMap 表示 Tags
Map<String,String> tagsMap = new HashMap<String,String>();
tagsMap.put("tagk1", "tagv1");
tagsMap.put("tagk2", "tagv2");
// 構造 Point
Point point = Point.metric("test1")
.tag(tagsMap)
.value(timestamp,123.456)
.build();
示例三
構建一個時間點。使用java.util.Date
表示時間。
// 使用 java.util.Date 表示時間。
Point point = Point.metric("test1")
.tag("tagk1", "tagv1")
.value(new Date,123.456)
.build();
寫入數據
TSDB-Client 有兩種寫數據的方式:同步阻塞的寫數據和異步非阻塞的寫數據。
同步阻塞的寫數據
假設我們現在需要構建 500 個時間點提交給 TSDB。
示例代碼
List<Point> points = new ArrayList<Point>();
構建 Point
for(int i = 0; i<500; i++) {
long timestamp = System.currentTimeMillis();
Point point = Point.metric("test1")
.tag("tagk1", "tagv1")
.value(timestamp, Math.random())
.build();
// 手動打包數據
points.add(point);
}
// 手動打包后提交數據
tsdb.putSync(points)
出于寫入性能的考慮,同步寫的方式一般需要您手動將數據點打包成一批數據,并且建議這批數據包含500~1000個數據點。此外,也建議多個線程并發進行提交
同步寫的響應對象
您可以根據需要選擇返回的數據內容。
返回空對象,無內容。本質是調用
POST /api/put
。Result result = tsdb.putSync(ps);
返回提交概述。包含成功數和返回數。本質是調用
POST /api/put?summary=true
。SummaryResult summaryResult = tsdb.putSync(ps,SummaryResult.class);
返回提交概述。包含成功數、返回數和失敗原因。本質是調用
POST /api/put?details=true
。DetailsResult detailsResult = tsdb.putSync(ps,DetailsResult.class);
異步非阻塞的寫數據
異步寫數據的方式比較簡單,只要有 Point 就可以提交數據。Client 會自動幫助您批量地提交數據,不需要您手動打包。
如果沒有特殊需求,推薦您使用異步非阻塞的寫數據的方式。
示例代碼
// 構建 Point
Point point = Point.metric("test1")
.tag("tagk1", "tagv1")
.value(timestamp, Math.random())
.build();
// 直接提交數據
tsdb.put(point);
異步非阻塞寫數據的回調設置
當您使用異步寫數據的方式時,若需要獲取 Put 接口的響應結果,則需要設置回調接口。
示例代碼
final AtomicLong num = new AtomicLong();
// 回調對象
BatchPutCallback cb = new BatchPutCallback() {
@Override
public void response(String address, List<Point> input, Result output) {
long afterNum = num.addAndGet(input.size());
System.out.println("成功處理" + input.size() + ",已處理" + afterNum);
}
@Override
public void failed(String address, List<Point> input, Exception ex) {
ex.printStackTrace();
long afterNum = num.addAndGet(input.size());
System.out.println("失敗處理" + input.size() + ",已處理" + afterNum);
}
};
TSDBConfig config = TSDBConfig
.address("example.hitsdb.com", 8242)
.listenBatchPut(cb) // 設置回調接口
.config();
tsdb = TSDBClientFactory.connect(config);
TSDB Client 提供了以下四種類型的 Callback 接口:
BatchPutCallback,即數據寫入的極簡模式。對應的API調用形式為
POST /api/put
BatchPutSummaryCallback,即數據寫入的統計模式。對應的API調用形式為
POST /api/put?summary=true
BatchPutDetailsCallback,即數據寫入的詳細模式。對應的API調用形式為
POST /api/put?details=true
BatchPutIgnoreErrorsCallback, 即數據寫入的容錯模式。對應的API調用形式為
POST /api/put?ignoreErrors=true
您可以通過設置不同業務場景來注冊相應的回調接口。寫入模式與業務場景的說明,可參見寫入模式及其響應內容。
不要在回調方法中做耗時操作。若有此需要,可以再把操作交給其他工作線程執行。
回調方法
response()
與failed()
被調用的時機是不一樣的:response()
: 指的是寫入請求合法且被服務端處理后執行的回調。此時可以根據回調對象種類的不同,獲取一些不同的反饋信息。failed()
: 指的是寫入請求本身存在問題導致請求不合法(比如報文非法,觸發限流,鑒權失敗等等)被服務端拒絕服務時所執行的回調。
因此建議在實現Callback接口時,不要忘記實現
failed()
方法。如果不實現的話,則會執行默認的failed()
回調方法,其本身是一個空方法。以上關于回調接口的示例均使用的是單值模型寫入接口的例子。對于多值模型的寫入與此類似,但對應的回調接口變為下述名稱:
MultiFieldbatchPutCallback
MultiFieldbatchPutSummaryCallback
MultiFieldbatchPutDetailsCallback
MultiFieldbatchPutIgnoreErrorsCallback