當您需要將某個Prometheus實例的數據導出,進行自定義業務處理時,您可以選擇Prometheus數據投遞服務。本文介紹如何使用Prometheus數據投遞功能將實例數據投遞至Kafka并進行消費處理。
前提條件
已接入Prometheus實例,具體操作,請參見:
已部署投遞目標云消息隊列Kafka版,并創建Topic等資源。具體操作,請參見概述。
已開通事件總線EventBridge服務。具體操作,請參見開通事件總線EventBridge并授權。
注意事項
選擇專有網絡進行數據投遞時,如果Prometheus實例所在VPC與目標VPC不在同一個,您需要保證目標VPC內的vSwitch的IP已加入Prometheus實例的白名單中,否則會導致網絡不通。vSwitch的網段信息可以在專有網絡控制臺的交換機詳情頁面獲取。
支持數據投遞的數據源列表。
實例類型
說明
Prometheus for 云服務
除cloud-product-prometheus名稱開頭的免費實例
Prometheus for 容器服務
無
Prometheus for 應用監控
無
Prometheus for Flink Serverless
無
Prometheus for Kubernetes
無
通用
除通過OpenTelemetry地址上報上來的數據
步驟一:創建投遞任務
在左側導航欄,單擊數據投遞。
在數據投遞頁面,單擊頂部菜單欄選擇目標地域,然后單擊新建任務。
在對話框中輸入任務名稱和任務描述后,單擊確定。
在任務編輯頁面,配置數據源和投遞目標。
單擊+ 添加數據源,配置以下參數,然后單擊確定。
配置項
說明
示例
Prometheus實例
被投遞的Prometheus數據源。
c78cb8273c02*****
數據過濾
設置需要過濾的指標標簽,支持正則表達式。多個條件需要換行,條件需要同時滿足,才會投遞。
__name__=AliyunEcs_CPUUtilization|AliyunEcs_memory_usedutilization regionId=cn-hangzhou id=i-2ze0mxp.*
單擊添加目標,選擇目標類型為阿里云消息隊列Kafka版,請按照表單所需填寫其余信息,然后單擊確定。
配置完成后,在任務編輯頁面,單擊確定,然后單擊保存創建投遞任務。
步驟二:查看Prometheus監控數據
為減輕投遞目標的壓力,投遞至Kafka的Prometheus監控數據為經過Snappy標準化壓縮后的JsonArray數據。更多信息,請參見Snappy壓縮格式。
方式一:通過控制臺查看
在概覽頁面的資源分布區域,選擇地域。
在實例列表頁面,單擊目標實例名稱。
在左側導航欄,單擊Topic 管理,然后單擊目標Topic操作列的詳情,在云監控或消息查詢頁簽查看您導入的數據。
方式二:通過客戶端查看
初始化Kafka客戶端,請參見單Consumer訂閱消息。
在
KafkaConsumerDemo.java
文件中添加以下代碼。以下為初始化Kafka客戶端后,消費數據并使用Snappy解壓的示例:public static void main(String[] args) { // 請先初始化kafka consumer while (true){ try { ConsumerRecords<String, byte[]> records = consumer.poll(1000); //必須在下次poll之前消費完這些數據, 且總耗時不得超過SESSION_TIMEOUT_MS_CONFIG。 //建議開一個單獨的線程池來消費消息,然后異步返回結果。 for (ConsumerRecord<String, byte[]> record : records) { byte[] compressedData = record.value(); byte[] data = Snappy.uncompress(compressedData); System.out.println(new String(data)); } } catch (Exception e) { try { Thread.sleep(1000); } catch (Throwable ignore) { } e.printStackTrace(); } } }
編譯并運行
KafkaConsumerDemo.java
文件,您可看到以下JSON格式的指標數據輸出。[{ "instance": "*****", "pod": "*****", "rejected": "false", "type": "validate", "pod_name": "*****", "endpoint": "http-metrics", "__name__": "apiserver_admission_controller_admission_duration_seconds_bucket", "service": "kubernetes", "name": "*****", "namespace": "default", "le": "2.5", "job": "apiserver", "operation": "UPDATE", "value": "675.0", "timestamp": "1698732988354" }, { "instance": "*****", "pod": "*****", "rejected": "false", "type": "validate", "pod_name": "*****", "endpoint": "http-metrics", "__name__": "apiserver_admission_controller_admission_duration_seconds_bucket", "service": "kubernetes", "name": "*****", "namespace": "default", "le": "+Inf", "job": "apiserver", "operation": "UPDATE", "value": "675.0", "timestamp": "1698732988354" }, { "instance": "*****", "pod": "*****", "rejected": "false", "type": "validate", "pod_name": "*****", "endpoint": "http-metrics", "__name__": "apiserver_admission_controller_admission_duration_seconds_bucket", "service": "kubernetes", "name": "*****", "namespace": "default", "le": "0.005", "job": "apiserver", "operation": "UPDATE", "value": "1037.0", "timestamp": "1698732988519" }, { "instance": "*****", "pod": "*****", "rejected": "false", "type": "validate", "pod_name": "*****", "endpoint": "http-metrics", "__name__": "apiserver_admission_controller_admission_duration_seconds_bucket", "service": "kubernetes", "name": "*****", "namespace": "default", "le": "0.025", "job": "apiserver", "operation": "UPDATE", "value": "1037.0", "timestamp": "1698732988519" }]