日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

日志服務(wù)SLS

更新時(shí)間:

本文為您介紹如何使用日志服務(wù)SLS連接器。

背景信息

日志服務(wù)是針對(duì)日志類數(shù)據(jù)的一站式服務(wù)。日志服務(wù)可以幫助您快捷地完成數(shù)據(jù)采集、消費(fèi)、投遞以及查詢分析,提升運(yùn)維和運(yùn)營(yíng)效率,建立海量日志處理能力。

SLS連接器支持的信息如下。

類別

詳情

支持類型

源表和結(jié)果表

運(yùn)行模式

僅支持流模式

特有監(jiān)控指標(biāo)

暫不適用

數(shù)據(jù)格式

暫無(wú)

API種類

SQL

是否支持更新或刪除結(jié)果表數(shù)據(jù)

不支持更新和刪除結(jié)果表數(shù)據(jù),只支持插入數(shù)據(jù)。

特色功能

SLS連接器源表支持直接讀取消息的屬性字段,支持的屬性字段如下。

字段名

字段類型

字段說明

__source__

STRING METADATA VIRTUAL

消息源。

__topic__

STRING METADATA VIRTUAL

消息主題。

__timestamp__

BIGINT METADATA VIRTUAL

日志時(shí)間。

__tag__

MAP<VARCHAR, VARCHAR> METADATA VIRTUAL

消息TAG。

對(duì)于屬性"__tag__:__receive_time__":"1616742274"'__receive_time__'和'1616742274'會(huì)被作為KV對(duì),記錄在Map中,在SQL中通過__tag__['__receive_time__']的方式訪問。

前提條件

已創(chuàng)建日志服務(wù)Project和Logstore,詳情請(qǐng)參見創(chuàng)建Project和Logstore

使用限制

  • 僅實(shí)時(shí)計(jì)算引擎VVR 2.0.0及以上版本支持日志服務(wù)SLS連接器。

  • SLS連接器僅保證At-Least-Once語(yǔ)義。

  • 僅實(shí)時(shí)計(jì)算引擎VVR 4.0.13及以上版本支持Shard數(shù)目變化觸發(fā)自動(dòng)Failover功能。

  • 強(qiáng)烈建議不要設(shè)置Source并發(fā)度大于Shard個(gè)數(shù),不僅會(huì)造成資源浪費(fèi),且在8.0.5及更低版本中,如果后續(xù)Shard數(shù)目發(fā)生變化,自動(dòng)Failover功能可能會(huì)失效,造成部分Shard不被消費(fèi)。

語(yǔ)法結(jié)構(gòu)

CREATE TABLE sls_table(
  a INT,
  b INT,
  c VARCHAR
) WITH (
  'connector' = 'sls',
  'endPoint' = '<yourEndPoint>',
  'project' = '<yourProjectName>',
  'logStore' = '<yourLogStoreName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}'
);

WITH參數(shù)

  • 通用

    參數(shù)

    說明

    數(shù)據(jù)類型

    是否必填

    默認(rèn)值

    備注

    connector

    表類型。

    String

    無(wú)

    固定值sls。

    endPoint

    EndPoint地址。

    String

    無(wú)

    請(qǐng)?zhí)顚慡LS的私網(wǎng)服務(wù)地址,詳情請(qǐng)參見服務(wù)接入點(diǎn)。

    說明
    • 實(shí)時(shí)計(jì)算Flink版默認(rèn)不具備訪問公網(wǎng)的能力,但阿里云提供的NAT網(wǎng)關(guān)可以實(shí)現(xiàn)VPC網(wǎng)絡(luò)與公網(wǎng)網(wǎng)絡(luò)互通,詳情請(qǐng)參見控制臺(tái)操作。

    • 不建議跨公網(wǎng)訪問SLS。如確有需要,請(qǐng)使用HTTPS網(wǎng)絡(luò)傳輸協(xié)議并且開啟SLS全球加速服務(wù),詳情請(qǐng)參見管理傳輸加速。

    project

    SLS項(xiàng)目名稱。

    String

    無(wú)

    無(wú)。

    logStore

    SLS LogStore或metricstore名稱。

    String

    無(wú)

    logStore和metricstore是相同的消費(fèi)方式。

    accessId

    阿里云賬號(hào)的AccessKey ID。

    String

    無(wú)

    詳情請(qǐng)參見如何查看AccessKey ID和AccessKey Secret信息?

    重要

    為了避免您的AK信息泄露,建議您通過密鑰管理的方式填寫AccessKey ID取值,詳情請(qǐng)參見變量管理。

    accessKey

    阿里云賬號(hào)的AccessKey Secret。

    String

    無(wú)

    詳情請(qǐng)參見如何查看AccessKey ID和AccessKey Secret信息?

    重要

    為了避免您的AK信息泄露,建議您通過密鑰管理的方式填寫AccessKey Secret取值,詳情請(qǐng)參見變量管理。

  • 源表獨(dú)有

    參數(shù)

    說明

    數(shù)據(jù)類型

    是否必填

    默認(rèn)值

    備注

    enableNewSource

    是否啟用實(shí)現(xiàn)了FLIP-27接口的新數(shù)據(jù)源。

    Boolean

    false

    新數(shù)據(jù)源可以自動(dòng)適應(yīng)shard變化,同時(shí)盡可能保證shard在所有的source并發(fā)上分布均勻。

    說明

    僅實(shí)時(shí)計(jì)算引擎VVR 8.0.9及以上版本支持該參數(shù)。

    重要

    作業(yè)在該配置項(xiàng)發(fā)生變化后無(wú)法從狀態(tài)恢復(fù)。

    可通過先設(shè)置配置項(xiàng)consumerGroup啟動(dòng)作業(yè),將消費(fèi)進(jìn)度記錄到SLS消費(fèi)組中,再將配置項(xiàng)consumeFromCheckpoint設(shè)為true后無(wú)狀態(tài)啟動(dòng)作業(yè),從而實(shí)現(xiàn)從歷史進(jìn)度繼續(xù)消費(fèi)。

    shardDiscoveryIntervalMs

    動(dòng)態(tài)檢測(cè)shard變化時(shí)間間隔,單位為毫秒。

    Long

    60000

    設(shè)置為負(fù)值時(shí)可以關(guān)閉動(dòng)態(tài)檢測(cè)。

    說明
    • 僅當(dāng)配置項(xiàng)enableNewSource為true時(shí)生效。

    • 僅實(shí)時(shí)計(jì)算引擎VVR 8.0.9及以上版本支持該參數(shù)。

    startupMode

    源表啟動(dòng)模式。

    String

    timestamp

    參數(shù)取值如下:

    • timestamp(默認(rèn)):從指定的起始時(shí)間開始消費(fèi)日志。

    • latest:從最新位點(diǎn)開始消費(fèi)日志。

    • earliest:從最早位點(diǎn)開始消費(fèi)日志。

    說明

    若將配置項(xiàng)consumeFromCheckpoint設(shè)為true,則會(huì)從指定的消費(fèi)組中保存的Checkpoint開始消費(fèi)日志,此處的啟動(dòng)模式將不會(huì)生效。

    startTime

    消費(fèi)日志的開始時(shí)間。

    String

    當(dāng)前時(shí)間

    格式為yyyy-MM-dd hh:mm:ss。

    僅當(dāng)startupMode設(shè)為timestamp時(shí)生效。

    說明

    startTime和stopTime基于SLS中的__receive_time__屬性,而非__timestamp__屬性。

    stopTime

    消費(fèi)日志的結(jié)束時(shí)間。

    String

    無(wú)

    格式為yyyy-MM-dd hh:mm:ss。

    說明

    如期望日志消費(fèi)到結(jié)尾時(shí)退出Flink程序,需要同時(shí)設(shè)置exitAfterFinish=true.

    consumerGroup

    消費(fèi)組名稱。

    String

    無(wú)

    消費(fèi)組用于記錄消費(fèi)進(jìn)度。您可以自定義消費(fèi)組名,無(wú)固定格式。

    說明

    不支持通過相同的消費(fèi)組進(jìn)行多作業(yè)的協(xié)同消費(fèi)。不同的Flink作業(yè)應(yīng)該設(shè)置不同的消費(fèi)組。如果不同的Flink作業(yè)使用相同的消費(fèi)組,它們將會(huì)消費(fèi)全部數(shù)據(jù)。這是因?yàn)樵贔link消費(fèi)SLS的數(shù)據(jù)時(shí),并不會(huì)經(jīng)過SLS消費(fèi)組進(jìn)行分區(qū)分配,因此導(dǎo)致各個(gè)消費(fèi)者獨(dú)立消費(fèi)各自的消息,即使消費(fèi)組是相同的。

    consumeFromCheckpoint

    是否從指定的消費(fèi)組中保存的Checkpoint開始消費(fèi)日志。

    String

    false

    參數(shù)取值如下:

    • true:必須同時(shí)指定消費(fèi)組,F(xiàn)link程序會(huì)從消費(fèi)組中保存的Checkpoint開始消費(fèi)日志,如果該消費(fèi)組沒有對(duì)應(yīng)的Checkpoint,則從startTime配置值開始消費(fèi)。

    • false(默認(rèn)值):不從指定的消費(fèi)組中保存的Checkpoint開始消費(fèi)日志。

    說明

    僅實(shí)時(shí)計(jì)算引擎VVR 6.0.5及以上版本支持該參數(shù)。

    maxRetries

    讀取SLS失敗后重試次數(shù)。

    String

    3

    無(wú)。

    batchGetSize

    單次請(qǐng)求讀取logGroup的個(gè)數(shù)。

    String

    100

    batchGetSize設(shè)置不能超過1000,否則會(huì)報(bào)錯(cuò)。

    exitAfterFinish

    在數(shù)據(jù)消費(fèi)完成后,F(xiàn)link程序是否退出。

    String

    false

    參數(shù)取值如下:

    • true:數(shù)據(jù)消費(fèi)完后,F(xiàn)link程序退出。

    • false(默認(rèn)):數(shù)據(jù)消費(fèi)完后,F(xiàn)link程序不退出。

    說明

    僅實(shí)時(shí)計(jì)算引擎VVR 4.0.13及以上版本支持該參數(shù)。

    query

    SLS消費(fèi)預(yù)處理語(yǔ)句。

    String

    無(wú)

    通過使用query參數(shù),您可以在消費(fèi)SLS數(shù)據(jù)之前對(duì)其進(jìn)行過濾,以避免將所有數(shù)據(jù)都消費(fèi)到Flink中,從而實(shí)現(xiàn)節(jié)約成本和提高處理速度的目的。

    例如 'query' = '*| where request_method = ''GET'''表示在Flink讀取SLS數(shù)據(jù)前,先匹配出request_method字段值等于get的數(shù)據(jù)。

    說明

    query需使用日志服務(wù)SPL語(yǔ)言,請(qǐng)參見SPL概述

    重要
    • 僅實(shí)時(shí)計(jì)算引擎VVR 8.0.1及以上版本支持該參數(shù)。

    • 日志服務(wù)SLS支持該功能的地域請(qǐng)參見基于規(guī)則消費(fèi)日志。

    • 公測(cè)階段免費(fèi),后續(xù)可能會(huì)在產(chǎn)生日志服務(wù)SLS費(fèi)用,詳情請(qǐng)參見費(fèi)用說明

  • 結(jié)果表獨(dú)有

    參數(shù)

    說明

    數(shù)據(jù)類型

    是否必填

    默認(rèn)值

    備注

    topicField

    指定字段名,該字段的值會(huì)覆蓋__topic__屬性字段的值,表示日志的主題。

    String

    無(wú)

    該參數(shù)值是表中已存在的字段之一。

    timeField

    指定字段名,該字段的值會(huì)覆蓋__timestamp__屬性字段的值,表示日志寫入時(shí)間。

    String

    當(dāng)前時(shí)間

    該參數(shù)值是表中已存在的字段之一,且字段類型必須為INT。如果未指定,則默認(rèn)填充當(dāng)前時(shí)間。

    sourceField

    指定字段名,該字段的值會(huì)覆蓋__source__屬性字段的值,表示日志的來源地,例如產(chǎn)生該日志機(jī)器的IP地址。

    String

    無(wú)

    該參數(shù)值是表中已存在的字段之一。

    partitionField

    指定字段名,數(shù)據(jù)寫入時(shí)會(huì)根據(jù)該列值計(jì)算Hash值,Hash值相同的數(shù)據(jù)會(huì)寫入同一個(gè)shard。

    String

    無(wú)

    如果未指定,則每條數(shù)據(jù)會(huì)隨機(jī)寫入當(dāng)前可用的Shard中。

    buckets

    當(dāng)指定partitionField時(shí),根據(jù)Hash值重新分組的個(gè)數(shù)。

    String

    64

    該參數(shù)的取值范圍是[1, 256],且必須是2的整數(shù)次冪。同時(shí),buckets個(gè)數(shù)應(yīng)當(dāng)大于等于Shard個(gè)數(shù),否則會(huì)出現(xiàn)部分Shard沒有數(shù)據(jù)寫入的情況。

    說明

    僅實(shí)時(shí)計(jì)算引擎VVR 6.0.5及以上版本支持該參數(shù)。

    flushIntervalMs

    觸發(fā)數(shù)據(jù)寫入的時(shí)間周期。

    String

    2000

    單位為毫秒。

    writeNullProperties

    是否將null值作為空字符串寫入SLS。

    Boolean

    true

    參數(shù)取值如下:

    • true(默認(rèn)值):將null值作為空字符串寫入日志。

    • false:計(jì)算結(jié)果為null的字段不會(huì)寫入到日志中。

    說明

    僅實(shí)時(shí)計(jì)算引擎VVR 8.0.6及以上版本支持該參數(shù)。

類型映射

Flink字段類型

SLS字段類型

BOOLEAN

STRING

VARBINARY

VARCHAR

TINYINT

INTEGER

BIGINT

FLOAT

DOUBLE

DECIMAL

代碼示例

CREATE TEMPORARY TABLE sls_input(
  `time` BIGINT,
  url STRING,
  dt STRING,
  float_field FLOAT,
  double_field DOUBLE,
  boolean_field BOOLEAN,
  `__topic__` STRING METADATA VIRTUAL,
  `__source__` STRING METADATA VIRTUAL,
  `__timestamp__` STRING METADATA VIRTUAL,
   __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
  proctime as PROCTIME()
) WITH (
  'connector' = 'sls',
  'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'starttime' = '2023-08-30 00:00:00',
  'project' ='sls-test',
  'logstore' ='sls-input'
);

CREATE TEMPORARY TABLE sls_sink(
  `time` BIGINT,
  url STRING,
  dt STRING,
  float_field FLOAT,
  double_field DOUBLE,
  boolean_field BOOLEAN,
  `__topic__` STRING,
  `__source__` STRING,
  `__timestamp__` BIGINT ,
  receive_time BIGINT
) WITH (
  'connector' = 'sls',
  'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
  'accessId' = '${ak_id}',
  'accessKey' = '${ak_secret}',
  'project' ='sls-test',
  'logstore' ='sls-output'
);

INSERT INTO sls_sink
SELECT 
 `time`,
  url,
  dt,
  float_field,
  double_field,
  boolean_field,
  `__topic__` ,
  `__source__` ,
  `__timestamp__` ,
  cast(__tag__['__receive_time__'] as bigint) as receive_time
FROM sls_input; 

DataStream API

重要

通過DataStream的方式讀寫數(shù)據(jù)時(shí),則需要使用對(duì)應(yīng)的DataStream連接器連接Flink,DataStream連接器設(shè)置方法請(qǐng)參見DataStream連接器使用方法。

讀取SLS

實(shí)時(shí)計(jì)算引擎VVR提供SourceFunction的實(shí)現(xiàn)類SlsSourceFunction,用于讀取SLS,讀取SLS的示例如下。

public class SlsDataStreamSource {

    public static void main(String[] args) throws Exception {
        // Sets up the streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Creates and adds SLS source and sink.
        env.addSource(createSlsSource())
                .map(SlsDataStreamSource::convertMessages)
                .print();
        env.execute("SLS Stream Source");
    }

    private static SlsSourceFunction createSlsSource() {
        SLSAccessInfo accessInfo = new SLSAccessInfo();
        accessInfo.setEndpoint("yourEndpoint");
        accessInfo.setProjectName("yourProject");
        accessInfo.setLogstore("yourLogStore");
        accessInfo.setAccessId("yourAccessId");
        accessInfo.setAccessKey("yourAccessKey");

        // The batch get size must be given.
        accessInfo.setBatchGetSize(10);

        // Optional parameters
        accessInfo.setConsumerGroup("yourConsumerGroup");
        accessInfo.setMaxRetries(3);

        // time to start consuming, set to current time.
        int startInSec = (int) (new Date().getTime() / 1000);

        // time to stop consuming, -1 means never stop.
        int stopInSec = -1;

        return new SlsSourceFunction(accessInfo, startInSec, stopInSec);
    }

    private static List<String> convertMessages(SourceRecord input) {
        List<String> res = new ArrayList<>();
        for (FastLogGroup logGroup : input.getLogGroups()) {
            int logsCount = logGroup.getLogsCount();
            for (int i = 0; i < logsCount; i++) {
                FastLog log = logGroup.getLogs(i);
                int fieldCount = log.getContentsCount();
                for (int idx = 0; idx < fieldCount; idx++) {
                    FastLogContent f = log.getContents(idx);
                    res.add(String.format("key: %s, value: %s", f.getKey(), f.getValue()));
                }
            }
        }
        return res;
    }
}

寫入SLS

提供OutputFormat的實(shí)現(xiàn)類SLSOutputFormat,用于寫入SLS。寫入SLS的示例如下。

public class SlsDataStreamSink {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.fromSequence(0, 100)
                .map((MapFunction<Long, SinkRecord>) aLong -> getSinkRecord(aLong))
                .addSink(createSlsSink())
                .name(SlsDataStreamSink.class.getSimpleName());
        env.execute("SLS Stream Sink");
    }

    private static OutputFormatSinkFunction createSlsSink() {
        Configuration conf = new Configuration();
        conf.setString(SLSOptions.ENDPOINT, "yourEndpoint");
        conf.setString(SLSOptions.PROJECT, "yourProject");
        conf.setString(SLSOptions.LOGSTORE, "yourLogStore");
        conf.setString(SLSOptions.ACCESS_ID, "yourAccessId");
        conf.setString(SLSOptions.ACCESS_KEY, "yourAccessKey");
        SLSOutputFormat outputFormat = new SLSOutputFormat(conf);
        return new OutputFormatSinkFunction<>(outputFormat);
    }

    private static SinkRecord getSinkRecord(Long seed) {
        SinkRecord record = new SinkRecord();
        LogItem logItem = new LogItem((int) (System.currentTimeMillis() / 1000));
        logItem.PushBack("level", "info");
        logItem.PushBack("name", String.valueOf(seed));
        logItem.PushBack("message", "it's a test message for " + seed.toString());
        record.setContent(logItem);
        return record;
    }

}

XML

Maven中央庫(kù)中已經(jīng)放置了SLS DataStream連接器

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-sls</artifactId>
    <version>${vvr-version}</version>
</dependency>

常見問題

恢復(fù)失敗的Flink程序時(shí),TaskManager發(fā)生OOM,源表報(bào)錯(cuò)java.lang.OutOfMemoryError: Java heap space