日志服務(wù)SLS
本文為您介紹如何使用日志服務(wù)SLS連接器。
背景信息
日志服務(wù)是針對(duì)日志類數(shù)據(jù)的一站式服務(wù)。日志服務(wù)可以幫助您快捷地完成數(shù)據(jù)采集、消費(fèi)、投遞以及查詢分析,提升運(yùn)維和運(yùn)營(yíng)效率,建立海量日志處理能力。
SLS連接器支持的信息如下。
類別 | 詳情 |
支持類型 | 源表和結(jié)果表 |
運(yùn)行模式 | 僅支持流模式 |
特有監(jiān)控指標(biāo) | 暫不適用 |
數(shù)據(jù)格式 | 暫無(wú) |
API種類 | SQL |
是否支持更新或刪除結(jié)果表數(shù)據(jù) | 不支持更新和刪除結(jié)果表數(shù)據(jù),只支持插入數(shù)據(jù)。 |
特色功能
SLS連接器源表支持直接讀取消息的屬性字段,支持的屬性字段如下。
字段名 | 字段類型 | 字段說明 |
__source__ | STRING METADATA VIRTUAL | 消息源。 |
__topic__ | STRING METADATA VIRTUAL | 消息主題。 |
__timestamp__ | BIGINT METADATA VIRTUAL | 日志時(shí)間。 |
__tag__ | MAP<VARCHAR, VARCHAR> METADATA VIRTUAL | 消息TAG。 對(duì)于屬性 |
前提條件
已創(chuàng)建日志服務(wù)Project和Logstore,詳情請(qǐng)參見創(chuàng)建Project和Logstore。
使用限制
僅實(shí)時(shí)計(jì)算引擎VVR 2.0.0及以上版本支持日志服務(wù)SLS連接器。
SLS連接器僅保證At-Least-Once語(yǔ)義。
僅實(shí)時(shí)計(jì)算引擎VVR 4.0.13及以上版本支持Shard數(shù)目變化觸發(fā)自動(dòng)Failover功能。
強(qiáng)烈建議不要設(shè)置Source并發(fā)度大于Shard個(gè)數(shù),不僅會(huì)造成資源浪費(fèi),且在8.0.5及更低版本中,如果后續(xù)Shard數(shù)目發(fā)生變化,自動(dòng)Failover功能可能會(huì)失效,造成部分Shard不被消費(fèi)。
語(yǔ)法結(jié)構(gòu)
CREATE TABLE sls_table(
a INT,
b INT,
c VARCHAR
) WITH (
'connector' = 'sls',
'endPoint' = '<yourEndPoint>',
'project' = '<yourProjectName>',
'logStore' = '<yourLogStoreName>',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}'
);
WITH參數(shù)
通用
參數(shù)
說明
數(shù)據(jù)類型
是否必填
默認(rèn)值
備注
connector
表類型。
String
是
無(wú)
固定值sls。
endPoint
EndPoint地址。
String
是
無(wú)
請(qǐng)?zhí)顚慡LS的私網(wǎng)服務(wù)地址,詳情請(qǐng)參見服務(wù)接入點(diǎn)。
說明實(shí)時(shí)計(jì)算Flink版默認(rèn)不具備訪問公網(wǎng)的能力,但阿里云提供的NAT網(wǎng)關(guān)可以實(shí)現(xiàn)VPC網(wǎng)絡(luò)與公網(wǎng)網(wǎng)絡(luò)互通,詳情請(qǐng)參見控制臺(tái)操作。
不建議跨公網(wǎng)訪問SLS。如確有需要,請(qǐng)使用HTTPS網(wǎng)絡(luò)傳輸協(xié)議并且開啟SLS全球加速服務(wù),詳情請(qǐng)參見管理傳輸加速。
project
SLS項(xiàng)目名稱。
String
是
無(wú)
無(wú)。
logStore
SLS LogStore或metricstore名稱。
String
是
無(wú)
logStore和metricstore是相同的消費(fèi)方式。
accessId
阿里云賬號(hào)的AccessKey ID。
String
是
無(wú)
詳情請(qǐng)參見如何查看AccessKey ID和AccessKey Secret信息?
重要為了避免您的AK信息泄露,建議您通過密鑰管理的方式填寫AccessKey ID取值,詳情請(qǐng)參見變量管理。
accessKey
阿里云賬號(hào)的AccessKey Secret。
String
是
無(wú)
詳情請(qǐng)參見如何查看AccessKey ID和AccessKey Secret信息?
重要為了避免您的AK信息泄露,建議您通過密鑰管理的方式填寫AccessKey Secret取值,詳情請(qǐng)參見變量管理。
源表獨(dú)有
參數(shù)
說明
數(shù)據(jù)類型
是否必填
默認(rèn)值
備注
enableNewSource
是否啟用實(shí)現(xiàn)了FLIP-27接口的新數(shù)據(jù)源。
Boolean
否
false
新數(shù)據(jù)源可以自動(dòng)適應(yīng)shard變化,同時(shí)盡可能保證shard在所有的source并發(fā)上分布均勻。
說明僅實(shí)時(shí)計(jì)算引擎VVR 8.0.9及以上版本支持該參數(shù)。
重要作業(yè)在該配置項(xiàng)發(fā)生變化后無(wú)法從狀態(tài)恢復(fù)。
可通過先設(shè)置配置項(xiàng)consumerGroup啟動(dòng)作業(yè),將消費(fèi)進(jìn)度記錄到SLS消費(fèi)組中,再將配置項(xiàng)consumeFromCheckpoint設(shè)為true后無(wú)狀態(tài)啟動(dòng)作業(yè),從而實(shí)現(xiàn)從歷史進(jìn)度繼續(xù)消費(fèi)。
shardDiscoveryIntervalMs
動(dòng)態(tài)檢測(cè)shard變化時(shí)間間隔,單位為毫秒。
Long
否
60000
設(shè)置為負(fù)值時(shí)可以關(guān)閉動(dòng)態(tài)檢測(cè)。
說明僅當(dāng)配置項(xiàng)enableNewSource為true時(shí)生效。
僅實(shí)時(shí)計(jì)算引擎VVR 8.0.9及以上版本支持該參數(shù)。
startupMode
源表啟動(dòng)模式。
String
否
timestamp
參數(shù)取值如下:
timestamp(默認(rèn)):從指定的起始時(shí)間開始消費(fèi)日志。
latest:從最新位點(diǎn)開始消費(fèi)日志。
earliest:從最早位點(diǎn)開始消費(fèi)日志。
說明若將配置項(xiàng)consumeFromCheckpoint設(shè)為true,則會(huì)從指定的消費(fèi)組中保存的Checkpoint開始消費(fèi)日志,此處的啟動(dòng)模式將不會(huì)生效。
startTime
消費(fèi)日志的開始時(shí)間。
String
否
當(dāng)前時(shí)間
格式為yyyy-MM-dd hh:mm:ss。
僅當(dāng)startupMode設(shè)為timestamp時(shí)生效。
說明startTime和stopTime基于SLS中的__receive_time__屬性,而非__timestamp__屬性。
stopTime
消費(fèi)日志的結(jié)束時(shí)間。
String
否
無(wú)
格式為yyyy-MM-dd hh:mm:ss。
說明如期望日志消費(fèi)到結(jié)尾時(shí)退出Flink程序,需要同時(shí)設(shè)置exitAfterFinish=true.
consumerGroup
消費(fèi)組名稱。
String
否
無(wú)
消費(fèi)組用于記錄消費(fèi)進(jìn)度。您可以自定義消費(fèi)組名,無(wú)固定格式。
說明不支持通過相同的消費(fèi)組進(jìn)行多作業(yè)的協(xié)同消費(fèi)。不同的Flink作業(yè)應(yīng)該設(shè)置不同的消費(fèi)組。如果不同的Flink作業(yè)使用相同的消費(fèi)組,它們將會(huì)消費(fèi)全部數(shù)據(jù)。這是因?yàn)樵贔link消費(fèi)SLS的數(shù)據(jù)時(shí),并不會(huì)經(jīng)過SLS消費(fèi)組進(jìn)行分區(qū)分配,因此導(dǎo)致各個(gè)消費(fèi)者獨(dú)立消費(fèi)各自的消息,即使消費(fèi)組是相同的。
consumeFromCheckpoint
是否從指定的消費(fèi)組中保存的Checkpoint開始消費(fèi)日志。
String
否
false
參數(shù)取值如下:
true:必須同時(shí)指定消費(fèi)組,F(xiàn)link程序會(huì)從消費(fèi)組中保存的Checkpoint開始消費(fèi)日志,如果該消費(fèi)組沒有對(duì)應(yīng)的Checkpoint,則從startTime配置值開始消費(fèi)。
false(默認(rèn)值):不從指定的消費(fèi)組中保存的Checkpoint開始消費(fèi)日志。
說明僅實(shí)時(shí)計(jì)算引擎VVR 6.0.5及以上版本支持該參數(shù)。
maxRetries
讀取SLS失敗后重試次數(shù)。
String
否
3
無(wú)。
batchGetSize
單次請(qǐng)求讀取logGroup的個(gè)數(shù)。
String
否
100
batchGetSize設(shè)置不能超過1000,否則會(huì)報(bào)錯(cuò)。
exitAfterFinish
在數(shù)據(jù)消費(fèi)完成后,F(xiàn)link程序是否退出。
String
否
false
參數(shù)取值如下:
true:數(shù)據(jù)消費(fèi)完后,F(xiàn)link程序退出。
false(默認(rèn)):數(shù)據(jù)消費(fèi)完后,F(xiàn)link程序不退出。
說明僅實(shí)時(shí)計(jì)算引擎VVR 4.0.13及以上版本支持該參數(shù)。
query
SLS消費(fèi)預(yù)處理語(yǔ)句。
String
否
無(wú)
通過使用query參數(shù),您可以在消費(fèi)SLS數(shù)據(jù)之前對(duì)其進(jìn)行過濾,以避免將所有數(shù)據(jù)都消費(fèi)到Flink中,從而實(shí)現(xiàn)節(jié)約成本和提高處理速度的目的。
例如
'query' = '*| where request_method = ''GET'''
表示在Flink讀取SLS數(shù)據(jù)前,先匹配出request_method字段值等于get的數(shù)據(jù)。說明query需使用日志服務(wù)SPL語(yǔ)言,請(qǐng)參見SPL概述。
重要僅實(shí)時(shí)計(jì)算引擎VVR 8.0.1及以上版本支持該參數(shù)。
日志服務(wù)SLS支持該功能的地域請(qǐng)參見基于規(guī)則消費(fèi)日志。
公測(cè)階段免費(fèi),后續(xù)可能會(huì)在產(chǎn)生日志服務(wù)SLS費(fèi)用,詳情請(qǐng)參見費(fèi)用說明。
結(jié)果表獨(dú)有
參數(shù)
說明
數(shù)據(jù)類型
是否必填
默認(rèn)值
備注
topicField
指定字段名,該字段的值會(huì)覆蓋__topic__屬性字段的值,表示日志的主題。
String
否
無(wú)
該參數(shù)值是表中已存在的字段之一。
timeField
指定字段名,該字段的值會(huì)覆蓋__timestamp__屬性字段的值,表示日志寫入時(shí)間。
String
否
當(dāng)前時(shí)間
該參數(shù)值是表中已存在的字段之一,且字段類型必須為INT。如果未指定,則默認(rèn)填充當(dāng)前時(shí)間。
sourceField
指定字段名,該字段的值會(huì)覆蓋__source__屬性字段的值,表示日志的來源地,例如產(chǎn)生該日志機(jī)器的IP地址。
String
否
無(wú)
該參數(shù)值是表中已存在的字段之一。
partitionField
指定字段名,數(shù)據(jù)寫入時(shí)會(huì)根據(jù)該列值計(jì)算Hash值,Hash值相同的數(shù)據(jù)會(huì)寫入同一個(gè)shard。
String
否
無(wú)
如果未指定,則每條數(shù)據(jù)會(huì)隨機(jī)寫入當(dāng)前可用的Shard中。
buckets
當(dāng)指定partitionField時(shí),根據(jù)Hash值重新分組的個(gè)數(shù)。
String
否
64
該參數(shù)的取值范圍是[1, 256],且必須是2的整數(shù)次冪。同時(shí),buckets個(gè)數(shù)應(yīng)當(dāng)大于等于Shard個(gè)數(shù),否則會(huì)出現(xiàn)部分Shard沒有數(shù)據(jù)寫入的情況。
說明僅實(shí)時(shí)計(jì)算引擎VVR 6.0.5及以上版本支持該參數(shù)。
flushIntervalMs
觸發(fā)數(shù)據(jù)寫入的時(shí)間周期。
String
否
2000
單位為毫秒。
writeNullProperties
是否將null值作為空字符串寫入SLS。
Boolean
否
true
參數(shù)取值如下:
true(默認(rèn)值):將null值作為空字符串寫入日志。
false:計(jì)算結(jié)果為null的字段不會(huì)寫入到日志中。
說明僅實(shí)時(shí)計(jì)算引擎VVR 8.0.6及以上版本支持該參數(shù)。
類型映射
Flink字段類型 | SLS字段類型 |
BOOLEAN | STRING |
VARBINARY | |
VARCHAR | |
TINYINT | |
INTEGER | |
BIGINT | |
FLOAT | |
DOUBLE | |
DECIMAL |
代碼示例
CREATE TEMPORARY TABLE sls_input(
`time` BIGINT,
url STRING,
dt STRING,
float_field FLOAT,
double_field DOUBLE,
boolean_field BOOLEAN,
`__topic__` STRING METADATA VIRTUAL,
`__source__` STRING METADATA VIRTUAL,
`__timestamp__` STRING METADATA VIRTUAL,
__tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
proctime as PROCTIME()
) WITH (
'connector' = 'sls',
'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
'accessId' = '${secret_values.ak_id}',
'accessKey' = '${secret_values.ak_secret}',
'starttime' = '2023-08-30 00:00:00',
'project' ='sls-test',
'logstore' ='sls-input'
);
CREATE TEMPORARY TABLE sls_sink(
`time` BIGINT,
url STRING,
dt STRING,
float_field FLOAT,
double_field DOUBLE,
boolean_field BOOLEAN,
`__topic__` STRING,
`__source__` STRING,
`__timestamp__` BIGINT ,
receive_time BIGINT
) WITH (
'connector' = 'sls',
'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
'accessId' = '${ak_id}',
'accessKey' = '${ak_secret}',
'project' ='sls-test',
'logstore' ='sls-output'
);
INSERT INTO sls_sink
SELECT
`time`,
url,
dt,
float_field,
double_field,
boolean_field,
`__topic__` ,
`__source__` ,
`__timestamp__` ,
cast(__tag__['__receive_time__'] as bigint) as receive_time
FROM sls_input;
DataStream API
通過DataStream的方式讀寫數(shù)據(jù)時(shí),則需要使用對(duì)應(yīng)的DataStream連接器連接Flink,DataStream連接器設(shè)置方法請(qǐng)參見DataStream連接器使用方法。
讀取SLS
實(shí)時(shí)計(jì)算引擎VVR提供SourceFunction的實(shí)現(xiàn)類SlsSourceFunction,用于讀取SLS,讀取SLS的示例如下。
public class SlsDataStreamSource {
public static void main(String[] args) throws Exception {
// Sets up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Creates and adds SLS source and sink.
env.addSource(createSlsSource())
.map(SlsDataStreamSource::convertMessages)
.print();
env.execute("SLS Stream Source");
}
private static SlsSourceFunction createSlsSource() {
SLSAccessInfo accessInfo = new SLSAccessInfo();
accessInfo.setEndpoint("yourEndpoint");
accessInfo.setProjectName("yourProject");
accessInfo.setLogstore("yourLogStore");
accessInfo.setAccessId("yourAccessId");
accessInfo.setAccessKey("yourAccessKey");
// The batch get size must be given.
accessInfo.setBatchGetSize(10);
// Optional parameters
accessInfo.setConsumerGroup("yourConsumerGroup");
accessInfo.setMaxRetries(3);
// time to start consuming, set to current time.
int startInSec = (int) (new Date().getTime() / 1000);
// time to stop consuming, -1 means never stop.
int stopInSec = -1;
return new SlsSourceFunction(accessInfo, startInSec, stopInSec);
}
private static List<String> convertMessages(SourceRecord input) {
List<String> res = new ArrayList<>();
for (FastLogGroup logGroup : input.getLogGroups()) {
int logsCount = logGroup.getLogsCount();
for (int i = 0; i < logsCount; i++) {
FastLog log = logGroup.getLogs(i);
int fieldCount = log.getContentsCount();
for (int idx = 0; idx < fieldCount; idx++) {
FastLogContent f = log.getContents(idx);
res.add(String.format("key: %s, value: %s", f.getKey(), f.getValue()));
}
}
}
return res;
}
}
寫入SLS
提供OutputFormat的實(shí)現(xiàn)類SLSOutputFormat,用于寫入SLS。寫入SLS的示例如下。
public class SlsDataStreamSink {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromSequence(0, 100)
.map((MapFunction<Long, SinkRecord>) aLong -> getSinkRecord(aLong))
.addSink(createSlsSink())
.name(SlsDataStreamSink.class.getSimpleName());
env.execute("SLS Stream Sink");
}
private static OutputFormatSinkFunction createSlsSink() {
Configuration conf = new Configuration();
conf.setString(SLSOptions.ENDPOINT, "yourEndpoint");
conf.setString(SLSOptions.PROJECT, "yourProject");
conf.setString(SLSOptions.LOGSTORE, "yourLogStore");
conf.setString(SLSOptions.ACCESS_ID, "yourAccessId");
conf.setString(SLSOptions.ACCESS_KEY, "yourAccessKey");
SLSOutputFormat outputFormat = new SLSOutputFormat(conf);
return new OutputFormatSinkFunction<>(outputFormat);
}
private static SinkRecord getSinkRecord(Long seed) {
SinkRecord record = new SinkRecord();
LogItem logItem = new LogItem((int) (System.currentTimeMillis() / 1000));
logItem.PushBack("level", "info");
logItem.PushBack("name", String.valueOf(seed));
logItem.PushBack("message", "it's a test message for " + seed.toString());
record.setContent(logItem);
return record;
}
}
XML
Maven中央庫(kù)中已經(jīng)放置了SLS DataStream連接器。
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-sls</artifactId>
<version>${vvr-version}</version>
</dependency>