使用SLS日志監(jiān)控服務(wù)監(jiān)控RDS PostgreSQL實(shí)例會(huì)話(huà)
監(jiān)控實(shí)例會(huì)話(huà)有助于更好地了解當(dāng)前數(shù)據(jù)庫(kù)的狀態(tài),進(jìn)而排查數(shù)據(jù)庫(kù)性能問(wèn)題。本文介紹如何利用pg_stat_activity系統(tǒng)視圖、阿里云日志服務(wù)(SLS)和云監(jiān)控(CloudMonitor),實(shí)現(xiàn)對(duì)RDS PostgreSQL會(huì)話(huà)級(jí)別的監(jiān)控。通過(guò)這種方法,可以對(duì)系統(tǒng)的狀態(tài)和性能進(jìn)行分析,從而了解數(shù)據(jù)庫(kù)在過(guò)去一段時(shí)間內(nèi)的運(yùn)行情況。
背景信息
數(shù)據(jù)庫(kù)系統(tǒng)通常是較為龐大的應(yīng)用,當(dāng)其負(fù)載較高時(shí),往往需要耗費(fèi)大量的內(nèi)存、CPU、IO和網(wǎng)絡(luò)資源。RDS PostgreSQL采用進(jìn)程模型,每一個(gè)會(huì)話(huà)都對(duì)應(yīng)一個(gè)后臺(tái)進(jìn)程。對(duì)這些會(huì)話(huà)進(jìn)行監(jiān)控,有助于更好地了解當(dāng)前數(shù)據(jù)庫(kù)的狀態(tài),進(jìn)而排查數(shù)據(jù)庫(kù)的性能瓶頸。
pg_stat_activity系統(tǒng)視圖
pg_stat_activity
視圖提供了關(guān)于系統(tǒng)當(dāng)前正在運(yùn)行的所有會(huì)話(huà)的信息。通過(guò)定期查詢(xún)的方式,將會(huì)話(huà)信息保留下來(lái),相當(dāng)于每隔一段時(shí)間對(duì)系統(tǒng)進(jìn)行一次快照。這不僅有助于了解系統(tǒng)的狀態(tài),還能幫助排查數(shù)據(jù)庫(kù)的性能問(wèn)題。
會(huì)話(huà)信息參數(shù)
pg_stat_activity
輸出內(nèi)容中各參數(shù)的含義如下所示。
參數(shù)名稱(chēng) | 數(shù)據(jù)類(lèi)型 | 說(shuō)明 |
datid | oid | 進(jìn)程連接的數(shù)據(jù)庫(kù)OID。 |
datname | name | 進(jìn)程連接的數(shù)據(jù)庫(kù)名稱(chēng)。 |
pid | integer | 進(jìn)程ID。 |
leader_pid | integer | 并行處理或者應(yīng)用工作的領(lǐng)導(dǎo)進(jìn)程,如果沒(méi)有適用的領(lǐng)導(dǎo)進(jìn)程或工作者進(jìn)程,則該字段為 NULL。 |
usesysid | oid | 登錄到該進(jìn)程的用戶(hù)OID。 |
usename | name | 登錄到該進(jìn)程的用戶(hù)名。 |
application_name | text | 連接到該進(jìn)程的應(yīng)用程序名稱(chēng)。 |
client_addr | inet | 連接到該進(jìn)程的客戶(hù)端IP地址。 |
client_hostname | text | 連接到客戶(hù)端的主機(jī)名,通過(guò)反向DNS查找client_addr獲得。 |
client_port | integer | 客戶(hù)端使用的TCP端口號(hào),如果使用Unix套接字,取值為-1。 |
backend_start | timestamp with time zone | 進(jìn)程的開(kāi)始時(shí)間。 |
xact_start | timestamp with time zone | 進(jìn)程當(dāng)前事務(wù)的開(kāi)始時(shí)間,如果沒(méi)有活動(dòng)事務(wù),則為NULL。 |
query_start | timestamp with time zone | 當(dāng)前正在進(jìn)行的查詢(xún)的開(kāi)始時(shí)間。 |
state_change | timestamp with time zone | 最后一次更改狀態(tài)的時(shí)間。 |
wait_event_type | text | 進(jìn)程中正在等待的事件類(lèi)型。 |
wait_event | text | 進(jìn)程中正在等待的事件名稱(chēng),若無(wú)正在等待的事件,則取值為NULL。 |
state | text | 當(dāng)前進(jìn)程的整體狀態(tài)。 |
backend_xid | xid | 當(dāng)前進(jìn)程的頂層事務(wù)ID。 |
backend_xmin | xid | 當(dāng)前進(jìn)程的最小事務(wù)ID。 |
query_id | bigint | 此進(jìn)程最近一次查詢(xún)的ID。 |
query | text | 此進(jìn)程最近一次查詢(xún)的文本。 |
backend_type | text | 進(jìn)程的類(lèi)型。 |
會(huì)話(huà)信息采集SQL
信息采集SQL示例如下。其中leader_id
字段僅適用于RDS PostgreSQL13及以后的大版本,您可以根據(jù)實(shí)例的實(shí)際版本調(diào)整采集SQL。
SELECT
(
CASE
WHEN leader_pid is NULL THEN pid
ELSE leader_pid
END
) AS leader_pid,
(
CASE
WHEN state_change <= now() AND state != 'active' THEN extract(
epoch
FROM
state_change - query_start
)
ELSE extract(
epoch
FROM
now() - query_start
)
END
) AS query_duration,
(
CASE
WHEN wait_event_type is NULL THEN 'CPU'
ELSE coalesce(wait_event_type || '.' || wait_event, '')
END
) AS wait_entry,
query_id,
(
CASE
WHEN state = 'active' THEN 'running'
ELSE 'finished'
END
) AS execute_state,
query,
datname,
application_name,
client_hostname,
query_start
FROM
pg_stat_activity
WHERE
usename NOT IN ('aurora', 'replicator')
AND backend_type IN ('client backend','parallel worker');
前提條件
已創(chuàng)建RAM用戶(hù)并為其授予如下權(quán)限。具體操作,請(qǐng)參見(jiàn)創(chuàng)建RAM用戶(hù)并完成授權(quán)。
AliyunRDSFullAccess:管理云數(shù)據(jù)庫(kù)服務(wù)(RDS)的權(quán)限。
AliyunLogFullAccess:管理日志服務(wù)(Log)的權(quán)限。
AliyunCloudMonitorFullAccess:管理云監(jiān)控的權(quán)限。
已創(chuàng)建日志項(xiàng)目(Project)和日志庫(kù)(LogStore)。具體操作,請(qǐng)參見(jiàn)開(kāi)通日志服務(wù)。
已獲取RDS PostgreSQL實(shí)例目標(biāo)數(shù)據(jù)庫(kù)的賬號(hào)和密碼。具體操作,請(qǐng)參見(jiàn)賬號(hào)與權(quán)限。
操作步驟
采集
pg_stat_activity
系統(tǒng)視圖的會(huì)話(huà)信息,并定期將其發(fā)送至SLS的LogStore。本文結(jié)合SLS的Java SDK,提供一個(gè)定時(shí)采集并發(fā)送的示例。
安裝SLS的Java SDK,詳情請(qǐng)參見(jiàn)安裝Java SDK。
配置環(huán)境變量ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET,詳情請(qǐng)參見(jiàn)配置環(huán)境變量。
以Maven項(xiàng)目為例,在pom.xml中導(dǎo)入相關(guān)的Maven依賴(lài)。
<dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>aliyun-log</artifactId> <version>0.6.75</version> </dependency> <dependency> <groupId>org.postgresql</groupId> <artifactId>postgresql</artifactId> <version>42.2.18</version> </dependency> <dependency> <groupId>com.aliyun</groupId> <artifactId>tea-openapi</artifactId> <version>0.3.2</version> </dependency> <dependency> <groupId>com.aliyun</groupId> <artifactId>tea-console</artifactId> <version>0.0.1</version> </dependency> <dependency> <groupId>com.aliyun</groupId> <artifactId>tea-util</artifactId> <version>0.2.21</version> </dependency> <!-- 添加 Lombok 依賴(lài) --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.4</version> <!-- 確保使用最新版本 --> <scope>provided</scope> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>RELEASE</version> <scope>provided</scope> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.2.3</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> <version>1.2.3</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.30</version> </dependency> <dependency> <groupId>com.aliyun</groupId> <artifactId>rds20140815</artifactId> <version>5.0.1</version> </dependency> <dependency> <groupId>com.aliyun</groupId> <artifactId>alibabacloud-sls20201230</artifactId> <version>4.0.7</version> </dependency>
運(yùn)行如下示例程序,定期采集實(shí)例會(huì)話(huà)信息,并將其發(fā)送至SLS的LogStore。
package org.example; import com.aliyun.openservices.log.Client; import com.aliyun.openservices.log.common.LogItem; import com.aliyun.openservices.log.exception.LogException; import com.aliyun.openservices.log.request.PutLogsRequest; import java.sql.*; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public class PgMonitor { // PostgreSQL 連接信息 private static final String PG_URL = "<jdbc:postgresql://your-host:5432/mydb>"; private static final String PG_USER = "<your-user>"; private static final String PG_PASSWORD = "<your-passwd>"; // 阿里云日志服務(wù)信息 private static final String LOG_ENDPOINT = "<your-sls-endpoint>"; private static final String LOG_PROJECT = "<your-project>"; private static final String LOG_STORE = "<your-logStore>"; private static final String ACCESS_KEY_ID = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"); private static final String ACCESS_KEY_SECRET = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"); // 定時(shí)任務(wù)執(zhí)行間隔(分鐘) private static final int INTERVAL_MINUTES = 1; public static void main(String[] args) { ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); Runnable task = () -> { try { // 從 PostgreSQL 獲取活動(dòng)數(shù)據(jù) List<LogItem> logItems = fetchPgStatActivity(); // 將數(shù)據(jù)發(fā)送到阿里云日志服務(wù) sendLogsToSLS(logItems); } catch (Exception e) { e.printStackTrace(); } }; // 每隔 INTERVAL_MINUTES 分鐘執(zhí)行一次任務(wù) scheduler.scheduleAtFixedRate(task, 0, INTERVAL_MINUTES, TimeUnit.MINUTES); } private static List<LogItem> fetchPgStatActivity() throws SQLException { List<LogItem> logItems = new ArrayList<>(); Connection conn = DriverManager.getConnection(PG_URL, PG_USER, PG_PASSWORD); Statement stmt = conn.createStatement(); String query = "SELECT ( CASE WHEN leader_pid is NULL THEN pid ELSE leader_pid END ) as leader_pid, ( CASE WHEN state_change <= now() AND state != 'active' THEN extract( epoch from state_change - query_start ) ELSE extract( epoch from now() - query_start ) END ) AS query_duration, ( CASE WHEN wait_event_type is NULL THEN 'CPU' ELSE coalesce(wait_event_type || '.' || wait_event, '') END ) AS wait_entry, query_id, ( CASE WHEN state = 'active' THEN 'running' ELSE 'finished' END ) AS execute_state, query, datname, application_name, client_hostname, query_start FROM pg_stat_activity WHERE usename NOT IN ('aurora', 'replicator') AND backend_type IN ('client backend','parallel worker')"; ResultSet rs = stmt.executeQuery(query); while (rs.next()) { LogItem logItem = new LogItem(); logItem.PushBack("leader_pid", rs.getString("leader_pid")); logItem.PushBack("query_duration", rs.getString("query_duration")); logItem.PushBack("wait_entry", rs.getString("wait_entry")); logItem.PushBack("query", rs.getString("query")); logItem.PushBack("datname", rs.getString("datname")); logItem.PushBack("application_name", rs.getString("application_name")); logItem.PushBack("client_hostname", rs.getString("client_hostname")); logItem.PushBack("query_start", rs.getString("query_start")); logItems.add(logItem); } rs.close(); stmt.close(); conn.close(); return logItems; } private static void sendLogsToSLS(List<LogItem> logItems) throws LogException { Client client = new Client(LOG_ENDPOINT, ACCESS_KEY_ID, ACCESS_KEY_SECRET); String topic = "pg_stat_activity"; String source = "postgresql-monitor"; PutLogsRequest request = new PutLogsRequest(LOG_PROJECT, LOG_STORE, topic, source, logItems); client.PutLogs(request); } }
部分參數(shù)的含義如下,請(qǐng)用實(shí)際數(shù)值進(jìn)行替換。
參數(shù)
描述
示例
PG_URL
數(shù)據(jù)庫(kù)實(shí)例鏈接地址串。包含需要監(jiān)控的數(shù)據(jù)庫(kù)名稱(chēng)。
jdbc:postgresql://pgm-bp1c82mky1avip****.pg.rds.aliyuncs.com:5432/testdb01
PG_USER
數(shù)據(jù)庫(kù)賬號(hào)。
testdbuser
PG_PASSWORD
數(shù)據(jù)庫(kù)賬號(hào)密碼。
****
LOG_ENDPOINT
SLS服務(wù)入口,詳情請(qǐng)參見(jiàn)服務(wù)入口。
cn-hangzhou.log.aliyuncs.com
LOG_PROJECT
SLS項(xiàng)目(Project)名稱(chēng)。
rdspg-test
LOG_STORE
SLS日志庫(kù)(Logstore)名稱(chēng)。
rdspg-sls
運(yùn)行成功后,可以在SLS的LogStore中查詢(xún)到實(shí)例會(huì)話(huà)的日志信息。詳情請(qǐng)參見(jiàn)查詢(xún)和分析日志。
重要在SLS的LogStore中創(chuàng)建索引后,才能對(duì)日志數(shù)據(jù)進(jìn)行查詢(xún)和分析。
將SLS日志接入到云監(jiān)控。
登錄云監(jiān)控控制臺(tái),創(chuàng)建SLS日志的監(jiān)控指標(biāo),將SLS日志接入云監(jiān)控。SLS日志接入云監(jiān)控后,您可以創(chuàng)建監(jiān)控大盤(pán),通過(guò)監(jiān)控大盤(pán)查看指定監(jiān)控指標(biāo)的監(jiān)控圖表。詳情請(qǐng)參見(jiàn)管理SLS日志的監(jiān)控指標(biāo)。
(可選)通過(guò)RDS OpenAPI將慢日志和錯(cuò)誤日志發(fā)生到SLS。
通過(guò)監(jiān)控實(shí)例會(huì)話(huà)、慢日志和錯(cuò)誤日志信息,可以對(duì)系統(tǒng)的狀態(tài)和性能進(jìn)行分析,從而排查數(shù)據(jù)庫(kù)性能問(wèn)題。
本文結(jié)合SLS的Java SDK,通過(guò)RDS的DescribeSlowLogRecords - 查看慢日志明細(xì)和DescribeErrorLogs - 查看錯(cuò)誤日志接口,定期獲取RDS實(shí)例的慢查詢(xún)?nèi)罩竞湾e(cuò)誤日志,并將其發(fā)送至SLS。示例程序如下。
說(shuō)明您可以通過(guò)API幫助文檔中的調(diào)試按鈕,進(jìn)入該接口在OpenAPI門(mén)戶(hù)的調(diào)試地址,查看該接口的安裝及使用方法。
package org.example; import com.aliyun.openservices.log.Client; import com.aliyun.openservices.log.common.LogItem; import com.aliyun.openservices.log.exception.LogException; import com.aliyun.openservices.log.request.PutLogsRequest; import lombok.extern.slf4j.Slf4j; import java.time.*; import java.time.format.DateTimeFormatter; import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @Slf4j public class PgCollectLogOpt { private static final String LOG_ENDPOINT = "your-endpoint"; private static final String LOG_PROJECT = "your-project"; private static final String LOG_STORE = "your-log-store"; private static final String ACCESS_KEY_ID = "your-access-key-id"; private static final String ACCESS_KEY_SECRET = "your-access-key-secret"; private static final int INTERVAL_MINUTES = 10; private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm'Z'"); private static volatile String lastEndTime = getCurrentTime(); public static com.aliyun.rds20140815.Client createClient() throws Exception { com.aliyun.teaopenapi.models.Config config = new com.aliyun.teaopenapi.models.Config() .setAccessKeyId(ACCESS_KEY_ID) .setAccessKeySecret(ACCESS_KEY_SECRET); config.endpoint = "rds.aliyuncs.com"; return new com.aliyun.rds20140815.Client(config); } public static void main(String[] args) throws Exception { ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); executor.scheduleAtFixedRate(() -> { try { collectAndSendLogs(); } catch (Exception e) { log.error("Error during log collection and sending: ", e); } }, 0, INTERVAL_MINUTES, TimeUnit.MINUTES); } private static void collectAndSendLogs() throws Exception { com.aliyun.rds20140815.Client client = createClient(); String instanceId = "pgm-bp1nz4ed24u6679d"; String startTime = lastEndTime; String endTime = getNextEndTime(startTime); try { List<LogItem> slowLogs = getLogs(client, instanceId, startTime, endTime, LogType.SLOW); if (!slowLogs.isEmpty()) sendLogsToSLS(slowLogs); List<LogItem> errorLogs = getLogs(client, instanceId, startTime, endTime, LogType.ERROR); if (!errorLogs.isEmpty()) sendLogsToSLS(errorLogs); lastEndTime = endTime; } catch (Exception e) { log.error("Log collection error: ", e); } } private static List<LogItem> getLogs(com.aliyun.rds20140815.Client client, String instanceId, String startTime, String endTime, LogType logType) throws Exception { List<LogItem> logItems = new ArrayList<>(); int pageNumber = 1, totalPage; do { totalPage = fetchAndProcessLogs(client, instanceId, startTime, endTime, logType, pageNumber, logItems); pageNumber++; } while (pageNumber <= totalPage); return logItems; } private static int fetchAndProcessLogs(com.aliyun.rds20140815.Client client, String instanceId, String startTime, String endTime, LogType logType, int pageNumber, List<LogItem> logItems) throws Exception { if (logType == LogType.SLOW) { var request = new com.aliyun.rds20140815.models.DescribeSlowLogRecordsRequest() .setDBInstanceId(instanceId) .setStartTime(startTime) .setEndTime(endTime) .setPageNumber(pageNumber) .setPageSize(100); var response = client.describeSlowLogRecordsWithOptions(request, new com.aliyun.teautil.models.RuntimeOptions()); var items = response.getBody().getItems().getSQLSlowRecord(); items.forEach(item -> logItems.add(createLogItem(item.getExecutionStartTime(), item.getSQLText(), item.getHostAddress(), item.getDBName(), item.getQueryTimes().toString(), item.getLockTimes().toString()))); return (int) Math.ceil((double) response.getBody().getTotalRecordCount() / response.getBody().getPageRecordCount()); } else { var request = new com.aliyun.rds20140815.models.DescribeErrorLogsRequest() .setDBInstanceId(instanceId) .setStartTime(startTime) .setEndTime(endTime) .setPageNumber(pageNumber) .setPageSize(100); var response = client.describeErrorLogsWithOptions(request, new com.aliyun.teautil.models.RuntimeOptions()); var items = response.getBody().getItems().getErrorLog(); items.forEach(item -> logItems.add(createLogItem(item.getCreateTime(), item.getErrorInfo()))); return (int) Math.ceil((double) response.getBody().getTotalRecordCount() / response.getBody().getPageRecordCount()); } } private static LogItem createLogItem(String utcTime, String... fields) { LogItem logItem = new LogItem(); String collectTime = convertToBeijingTime(utcTime); logItem.PushBack("collectTime", collectTime); String[] fieldNames = {"sql", "hostAddress", "dbName", "queryTimes", "lockTimes", "errorInfo"}; for (int i = 0; i < fields.length; i++) { logItem.PushBack(fieldNames[i], fields[i]); } return logItem; } private static String convertToBeijingTime(String utcTime) { Instant instant = Instant.parse(utcTime); ZonedDateTime utcDateTime = instant.atZone(ZoneId.of("UTC")); ZonedDateTime beijingDateTime = utcDateTime.withZoneSameInstant(ZoneId.of("Asia/Shanghai")); return DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssXXX").format(beijingDateTime); } private static void sendLogsToSLS(List<LogItem> logItems) throws LogException { Client slsClient = new Client(LOG_ENDPOINT, ACCESS_KEY_ID, ACCESS_KEY_SECRET); PutLogsRequest request = new PutLogsRequest(LOG_PROJECT, LOG_STORE, "pg_stat_activity", "postgresql-monitor", logItems); slsClient.PutLogs(request); } private static String getCurrentTime() { return formatToIsoInstantWithoutMillis(Instant.now().minus(Duration.ofMinutes(10)).atZone(ZoneId.of("UTC"))); } private static String getNextEndTime(String startTime) { Instant startInstant = Instant.parse(startTime.replace("Z", ":00Z")); return formatToIsoInstantWithoutMillis(startInstant.atZone(ZoneId.of("UTC")).plusMinutes(10).plusSeconds(1)); } private static String formatToIsoInstantWithoutMillis(ZonedDateTime zdt) { return DATE_FORMATTER.format(zdt.truncatedTo(ChronoUnit.MINUTES)); } private enum LogType { SLOW, ERROR; } }
部分參數(shù)的含義如下,請(qǐng)用實(shí)際數(shù)值進(jìn)行替換。
參數(shù)
描述
示例
LOG_ENDPOINT
SLS服務(wù)入口,詳情請(qǐng)參見(jiàn)服務(wù)入口。
cn-hangzhou.log.aliyuncs.com
LOG_PROJECT
SLS項(xiàng)目(Project)名稱(chēng)。
rdspg-test
LOG_STORE
SLS日志庫(kù)(Logstore)名稱(chēng)。
rdspg-sls
instanceId
數(shù)據(jù)庫(kù)實(shí)例ID。
pgm-bp1c82mky1av****
在云監(jiān)控中,根據(jù)監(jiān)控的指標(biāo)創(chuàng)建報(bào)警規(guī)則,當(dāng)監(jiān)控指標(biāo)達(dá)到報(bào)警條件時(shí),云監(jiān)控自動(dòng)給您發(fā)送報(bào)警通知。詳情請(qǐng)參見(jiàn)創(chuàng)建報(bào)警規(guī)則。