隨著不斷增長的監控指標與數據流量,監控系統變得越來越復雜,同時對監控系統的時效性提出了更高的要求。本文介紹基于TairTS輕松搭建高并發場景的秒級監控系統。
TairTS簡介
TairTS為Tair自研的Module,依托Tair(企業版)提供實時且高并發的查詢、寫入性能,支持對歷史時序數據的更新或累加,支持時間線級別的TTL設定,保證每條時間線都可以按時間窗口自動滾動,同時采用高效的Gorilla壓縮算法與特定存儲,極大降低存儲成本。更多信息,請參見TS。
秒級監控介紹
本文以上圖為例介紹秒級監控架構。Console發布秒級監控配置,對應APP接收配置后通過MQTT協議寫入到Collector中,Collector處理數據后寫入到Tair數據庫中。
高并發查詢場景
在高并發查詢場景中,TairTS不僅可以保證查詢的性能,還支持降采樣、屬性過濾、分批查詢、多種數值函數等條件下的聚合操作,滿足不同業務進行多維度篩選與查看,同時支持將批量查詢與聚合計算集成到單條命令中,減少網絡交互,實現毫秒級響應,幫助您及時定位問題。
高并發寫入場景
在高并發寫入場景中,隨著APP規模的增大,單點的Collector將會成為瓶頸。為此,TairTS支持對歷史時序數據的原地更新或累加,保障多Collector并發寫入正確性,同時節省內存空間。并發寫入的代碼示例如下:
import com.aliyun.tair.tairts.TairTs; import com.aliyun.tair.tairts.params.ExtsAggregationParams; import com.aliyun.tair.tairts.params.ExtsAttributesParams; import com.aliyun.tair.tairts.results.ExtsSkeyResult; import redis.clients.jedis.Jedis; public class test { protected static final String HOST = "127.0.0.1"; protected static final int PORT = 6379; public static void main(String[] args) { try { Jedis jedis = new Jedis(HOST, PORT, 2000 * 100); if (!"PONG".equals(jedis.ping())) { System.exit(-1); } TairTs tairTs = new TairTs(jedis); //Cluster模式時如下: //TairTsCluster tairTsCluster = new TairTsCluster(jedisCluster); String pkey = "cpu_load"; String skey1 = "app1"; long startTs = (System.currentTimeMillis() - 100000) / 1000 * 1000; long endTs = System.currentTimeMillis() / 1000 * 1000; String startTsStr = String.valueOf(startTs); String endTsStr = String.valueOf(endTs); tairTs.extsdel(pkey, skey1); long num = 5; //Collector A 并發更新。 for (int i = 0; i < num; i++) { double val = i; long ts = startTs + i*1000; String tsStr = String.valueOf(ts); ExtsAttributesParams params = new ExtsAttributesParams(); params.dataEt(1000000000); String addRet = tairTs.extsrawincr(pkey, skey1, tsStr, val, params); } ExtsAggregationParams paramsAgg = new ExtsAggregationParams(); paramsAgg.maxCountSize(10); paramsAgg.aggAvg(1000); System.out.println("Collector A并發更新后結果:"); ExtsSkeyResult rangeByteRet = tairTs.extsrange(pkey, skey1, startTsStr, endTsStr, paramsAgg); for (int i = 0; i < num; i++) { System.out.println(" ts: " + rangeByteRet.getDataPoints().get(i).getTs() + ", value: " + rangeByteRet.getDataPoints().get(i).getDoubleValue()); } //Collector B并發更新。 for (int i = 0; i < num; i++) { double val = i; long ts = startTs + i*1000; String tsStr = String.valueOf(ts); ExtsAttributesParams params = new ExtsAttributesParams(); params.dataEt(1000000000); String addRet = tairTs.extsrawincr(pkey, skey1, tsStr, val, params); } System.out.println("Collector B 并發更新后結果:"); rangeByteRet = tairTs.extsrange(pkey, skey1, startTsStr, endTsStr, paramsAgg); for (int i = 0; i < num; i++) { System.out.println(" ts: " + rangeByteRet.getDataPoints().get(i).getTs() + ", value: " + rangeByteRet.getDataPoints().get(i).getDoubleValue()); } } catch (Exception e) { e.printStackTrace(); } } }
執行結果:
Collector A并發更新后結果: ts: 1597049266000, value: 0.0 ts: 1597049267000, value: 1.0 ts: 1597049268000, value: 2.0 ts: 1597049269000, value: 3.0 ts: 1597049270000, value: 4.0 Collector B并發更新后結果: ts: 1597049266000, value: 0.0 ts: 1597049267000, value: 2.0 ts: 1597049268000, value: 4.0 ts: 1597049269000, value: 6.0 ts: 1597049270000, value: 8.0
文檔內容是否對您有幫助?