日志服務提供多語言SDK,并且都支持日志服務消費接口,本文介紹消費日志的SDK示例。
前提條件?
已開通日志服務。更多信息,請參見開通日志服務。
已創建RAM用戶并完成授權。具體操作,請參見創建RAM用戶并完成授權。
已配置環境變量ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET。具體操作,請參見在Linux、macOS和Windows系統配置環境變量。
重要阿里云賬號的AccessKey擁有所有API的訪問權限,建議您使用RAM用戶的AccessKey進行API訪問或日常運維。
強烈建議不要把AccessKey ID和AccessKey Secret保存到工程代碼里,否則可能導致AccessKey泄露,威脅您賬號下所有資源的安全。
背景信息
調用PullLogs接口可以獲取指定游標(Cursor)位置的日志數據。日志服務支持Java、Python、Go等語言的應用作為消費者或消費組消費日志服務的數據。
日志服務SPL支持在實時消費、掃描查詢和Logtail采集三個日志服務場景中使用,更多信息,請參見SPL概述。
使用Java SDK消費
開始使用前,請確保已安裝日志服務Java SDK。具體操作,請參見安裝Java SDK。
SDK消費
本示例中,調用PullLogs接口讀取日志數據,完成普通消費的演示。
參數說明
參數名稱 | 類型 | 是否必選 | 說明 |
project | string | 是 | 日志服務Project名稱,更多信息,請參見管理Project。 |
logStore | string | 是 | 日志服務Logstore名稱,Logstore是日志服務中日志數據的采集、存儲和查詢單元。更多信息,請參見管理Logstore。 |
shardId | int | 是 | 日志庫的分區ID。 |
添加Maven依賴
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>aliyun-log</artifactId>
<version>0.6.99</version>
</dependency>
創建PullLogsDemo.java
文件
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.Consts;
import com.aliyun.openservices.log.common.LogGroupData;
import com.aliyun.openservices.log.common.Shard;
import com.aliyun.openservices.log.exception.LogException;
import com.aliyun.openservices.log.request.PullLogsRequest;
import com.aliyun.openservices.log.response.ListShardResponse;
import com.aliyun.openservices.log.response.PullLogsResponse;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class PullLogsDemo {
// 日志服務的服務接入點。此處以杭州為例,其它地域請根據實際情況填寫
private static final String endpoint = "cn-hangzhou.log.aliyuncs.com";
// 本示例從環境變量中獲取 AccessKey ID 和 AccessKey Secret。
private static final String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
private static final String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
// Project 名稱
private static final String project = "your_project";
// LogStore 名稱
private static final String logStore = "your_logstore";
public static void main(String[] args) throws Exception {
// 創建日志服務 Client
Client client = new Client(endpoint, accessKeyId, accessKeySecret);
// 查詢 LogStore 的 Shard
ListShardResponse resp = client.ListShard(project, logStore);
System.out.printf("%s has %d shards\n", logStore, resp.GetShards().size());
Map<Integer, String> cursorMap = new HashMap<Integer, String>();
for (Shard shard : resp.GetShards()) {
int shardId = shard.getShardId();
// 從頭開始消費,獲取游標。(如果是從尾部開始消費,使用 Consts.CursorMode.END)
cursorMap.put(shardId, client.GetCursor(project, logStore, shardId, Consts.CursorMode.BEGIN).GetCursor());
}
try {
while (true) {
// 從每個Shard中獲取日志
for (Shard shard : resp.GetShards()) {
int shardId = shard.getShardId();
PullLogsRequest request = new PullLogsRequest(project, logStore, shardId, 1000, cursorMap.get(shardId));
PullLogsResponse response = client.pullLogs(request);
// 日志都在日志組(LogGroup)中,按照邏輯拆分即可。
List<LogGroupData> logGroups = response.getLogGroups();
System.out.printf("Get %d logGroup from logStore:%s:\tShard:%d\n", logGroups.size(), logStore, shardId);
// 完成處理拉取的日志后,移動游標。
cursorMap.put(shardId, response.getNextCursor());
}
}
} catch (LogException e) {
System.out.println("error code :" + e.GetErrorCode());
System.out.println("error message :" + e.GetErrorMessage());
throw e;
}
}
}
SDK基于SPL消費
本示例中,調用PullLogs接口讀取日志數據,完成使用Java SDK基于SPL消費日志數據的演示。
參數說明
參數名稱 | 類型 | 是否必選 | 說明 |
project | string | 是 | 日志服務Project名稱,更多信息,請參見管理Project。 |
logStore | string | 是 | 日志服務Logstore名稱,Logstore是日志服務中日志數據的采集、存儲和查詢單元。更多信息,請參見管理Logstore。 |
shardId | int | 是 | 日志庫的分區ID。 |
添加Maven依賴
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>2.5.0</version>
</dependency>
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>aliyun-log</artifactId>
<version>0.6.99</version>
</dependency>
創建PullLogsWithSPLDemo.java
文件
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.*;
import com.aliyun.openservices.log.common.Consts;
import com.aliyun.openservices.log.exception.LogException;
import com.aliyun.openservices.log.request.PullLogsRequest;
import com.aliyun.openservices.log.response.ListShardResponse;
import com.aliyun.openservices.log.response.PullLogsResponse;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class PullLogsWithSPLDemo {
// 日志服務的服務接入點。此處以杭州為例,其它地域請根據實際情況填寫
private static final String endpoint = "cn-hangzhou.log.aliyuncs.com";
// 本示例從環境變量中獲取 AccessKey ID 和 AccessKey Secret。
private static final String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
private static final String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
// Project 名稱
private static final String project = "your_project";
// LogStore 名稱
private static final String logStore = "your_logstore";
public static void main(String[] args) throws Exception {
// 創建日志服務 Client
Client client = new Client(endpoint, accessKeyId, accessKeySecret);
// 查詢 LogStore 的 Shard
ListShardResponse resp = client.ListShard(project, logStore);
System.out.printf("%s has %d shards\n", logStore, resp.GetShards().size());
Map<Integer, String> cursorMap = new HashMap<Integer, String>();
for (Shard shard : resp.GetShards()) {
int shardId = shard.getShardId();
// 從頭開始消費,獲取游標。(如果是從尾部開始消費,使用 Consts.CursorMode.END)
cursorMap.put(shardId, client.GetCursor(project, logStore, shardId, Consts.CursorMode.BEGIN).GetCursor());
}
try {
while (true) {
// 從每個Shard中獲取日志
for (Shard shard : resp.GetShards()) {
int shardId = shard.getShardId();
PullLogsRequest request = new PullLogsRequest(project, logStore, shardId, 1000, cursorMap.get(shardId));
request.setQuery("* | where cast(body_bytes_sent as bigint) > 14000");
request.setPullMode("scan_on_stream");
PullLogsResponse response = client.pullLogs(request);
// 日志都在日志組(LogGroup)中,按照邏輯拆分即可。
List<LogGroupData> logGroups = response.getLogGroups();
System.out.printf("Get %d logGroup from logStore:%s:\tShard:%d\n", logGroups.size(), logStore, shardId);
// 完成處理拉取的日志后,移動游標。
cursorMap.put(shardId, response.getNextCursor());
}
}
} catch (LogException e) {
System.out.println("error code :" + e.GetErrorCode());
System.out.println("error message :" + e.GetErrorMessage());
throw e;
}
}
}