日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

日志服務(wù)SLS

本文為您介紹如何使用日志服務(wù)SLS連接器。

背景信息

日志服務(wù)是針對日志類數(shù)據(jù)的一站式服務(wù)。日志服務(wù)可以幫助您快捷地完成數(shù)據(jù)采集、消費(fèi)、投遞以及查詢分析,提升運(yùn)維和運(yùn)營效率,建立海量日志處理能力。

SLS連接器支持的信息如下。

類別

詳情

支持類型

源表和結(jié)果表

運(yùn)行模式

僅支持流模式

特有監(jiān)控指標(biāo)

暫不適用

數(shù)據(jù)格式

暫無

API種類

SQL

是否支持更新或刪除結(jié)果表數(shù)據(jù)

不支持更新和刪除結(jié)果表數(shù)據(jù),只支持插入數(shù)據(jù)。

特色功能

SLS連接器源表支持直接讀取消息的屬性字段,支持的屬性字段如下。

字段名

字段類型

字段說明

__source__

STRING METADATA VIRTUAL

消息源。

__topic__

STRING METADATA VIRTUAL

消息主題。

__timestamp__

BIGINT METADATA VIRTUAL

日志時間。

__tag__

MAP<VARCHAR, VARCHAR> METADATA VIRTUAL

消息TAG。

對于屬性"__tag__:__receive_time__":"1616742274"'__receive_time__'和'1616742274'會被作為KV對,記錄在Map中,在SQL中通過__tag__['__receive_time__']的方式訪問。

前提條件

已創(chuàng)建日志服務(wù)Project和Logstore,詳情請參見創(chuàng)建Project和Logstore

使用限制

  • 僅實(shí)時計(jì)算引擎VVR 2.0.0及以上版本支持日志服務(wù)SLS連接器。

  • SLS連接器僅保證At-Least-Once語義。

  • 僅實(shí)時計(jì)算引擎VVR 4.0.13及以上版本支持Shard數(shù)目變化觸發(fā)自動Failover功能。

  • 強(qiáng)烈建議不要設(shè)置Source并發(fā)度大于Shard個數(shù),不僅會造成資源浪費(fèi),且在8.0.5及更低版本中,如果后續(xù)Shard數(shù)目發(fā)生變化,自動Failover功能可能會失效,造成部分Shard不被消費(fèi)。

語法結(jié)構(gòu)

CREATE TABLE sls_table(
  a INT,
  b INT,
  c VARCHAR
) WITH (
  'connector' = 'sls',
  'endPoint' = '<yourEndPoint>',
  'project' = '<yourProjectName>',
  'logStore' = '<yourLogStoreName>',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}'
);

WITH參數(shù)

  • 通用

    參數(shù)

    說明

    數(shù)據(jù)類型

    是否必填

    默認(rèn)值

    備注

    connector

    表類型。

    String

    固定值sls。

    endPoint

    EndPoint地址。

    String

    請?zhí)顚慡LS的私網(wǎng)服務(wù)地址,詳情請參見服務(wù)接入點(diǎn)

    說明
    • 實(shí)時計(jì)算Flink版默認(rèn)不具備訪問公網(wǎng)的能力,但阿里云提供的NAT網(wǎng)關(guān)可以實(shí)現(xiàn)VPC網(wǎng)絡(luò)與公網(wǎng)網(wǎng)絡(luò)互通,詳情請參見控制臺操作

    • 不建議跨公網(wǎng)訪問SLS。如確有需要,請使用HTTPS網(wǎng)絡(luò)傳輸協(xié)議并且開啟SLS全球加速服務(wù),詳情請參見管理傳輸加速

    project

    SLS項(xiàng)目名稱。

    String

    無。

    logStore

    SLS LogStore或metricstore名稱。

    String

    logStore和metricstore是相同的消費(fèi)方式。

    accessId

    阿里云賬號的AccessKey ID。

    String

    詳情請參見如何查看AccessKey ID和AccessKey Secret信息?

    重要

    為了避免您的AK信息泄露,建議您通過密鑰管理的方式填寫AccessKey ID取值,詳情請參見變量管理

    accessKey

    阿里云賬號的AccessKey Secret。

    String

    詳情請參見如何查看AccessKey ID和AccessKey Secret信息?

    重要

    為了避免您的AK信息泄露,建議您通過密鑰管理的方式填寫AccessKey Secret取值,詳情請參見變量管理

  • 源表獨(dú)有

    參數(shù)

    說明

    數(shù)據(jù)類型

    是否必填

    默認(rèn)值

    備注

    enableNewSource

    是否啟用實(shí)現(xiàn)了FLIP-27接口的新數(shù)據(jù)源。

    Boolean

    false

    新數(shù)據(jù)源可以自動適應(yīng)shard變化,同時盡可能保證shard在所有的source并發(fā)上分布均勻。

    說明

    僅實(shí)時計(jì)算引擎VVR 8.0.9及以上版本支持該參數(shù)。

    重要

    作業(yè)在該配置項(xiàng)發(fā)生變化后無法從狀態(tài)恢復(fù)。

    可通過先設(shè)置配置項(xiàng)consumerGroup啟動作業(yè),將消費(fèi)進(jìn)度記錄到SLS消費(fèi)組中,再將配置項(xiàng)consumeFromCheckpoint設(shè)為true后無狀態(tài)啟動作業(yè),從而實(shí)現(xiàn)從歷史進(jìn)度繼續(xù)消費(fèi)。

    shardDiscoveryIntervalMs

    動態(tài)檢測shard變化時間間隔,單位為毫秒。

    Long

    60000

    設(shè)置為負(fù)值時可以關(guān)閉動態(tài)檢測。

    說明
    • 僅當(dāng)配置項(xiàng)enableNewSource為true時生效。

    • 僅實(shí)時計(jì)算引擎VVR 8.0.9及以上版本支持該參數(shù)。

    startupMode

    源表啟動模式。

    String

    timestamp

    參數(shù)取值如下:

    • timestamp(默認(rèn)):從指定的起始時間開始消費(fèi)日志。

    • latest:從最新位點(diǎn)開始消費(fèi)日志。

    • earliest:從最早位點(diǎn)開始消費(fèi)日志。

    說明

    若將配置項(xiàng)consumeFromCheckpoint設(shè)為true,則會從指定的消費(fèi)組中保存的Checkpoint開始消費(fèi)日志,此處的啟動模式將不會生效。

    startTime

    消費(fèi)日志的開始時間。

    String

    當(dāng)前時間

    格式為yyyy-MM-dd hh:mm:ss。

    僅當(dāng)startupMode設(shè)為timestamp時生效。

    說明

    startTime和stopTime基于SLS中的__receive_time__屬性,而非__timestamp__屬性。

    stopTime

    消費(fèi)日志的結(jié)束時間。

    String

    格式為yyyy-MM-dd hh:mm:ss。

    說明

    如期望日志消費(fèi)到結(jié)尾時退出Flink程序,需要同時設(shè)置exitAfterFinish=true.

    consumerGroup

    消費(fèi)組名稱。

    String

    消費(fèi)組用于記錄消費(fèi)進(jìn)度。您可以自定義消費(fèi)組名,無固定格式。

    說明

    不支持通過相同的消費(fèi)組進(jìn)行多作業(yè)的協(xié)同消費(fèi)。不同的Flink作業(yè)應(yīng)該設(shè)置不同的消費(fèi)組。如果不同的Flink作業(yè)使用相同的消費(fèi)組,它們將會消費(fèi)全部數(shù)據(jù)。這是因?yàn)樵贔link消費(fèi)SLS的數(shù)據(jù)時,并不會經(jīng)過SLS消費(fèi)組進(jìn)行分區(qū)分配,因此導(dǎo)致各個消費(fèi)者獨(dú)立消費(fèi)各自的消息,即使消費(fèi)組是相同的。

    consumeFromCheckpoint

    是否從指定的消費(fèi)組中保存的Checkpoint開始消費(fèi)日志。

    String

    false

    參數(shù)取值如下:

    • true:必須同時指定消費(fèi)組,F(xiàn)link程序會從消費(fèi)組中保存的Checkpoint開始消費(fèi)日志,如果該消費(fèi)組沒有對應(yīng)的Checkpoint,則從startTime配置值開始消費(fèi)。

    • false(默認(rèn)值):不從指定的消費(fèi)組中保存的Checkpoint開始消費(fèi)日志。

    說明

    僅實(shí)時計(jì)算引擎VVR 6.0.5及以上版本支持該參數(shù)。

    maxRetries

    讀取SLS失敗后重試次數(shù)。

    String

    3

    無。

    batchGetSize

    單次請求讀取logGroup的個數(shù)。

    String

    100

    batchGetSize設(shè)置不能超過1000,否則會報(bào)錯。

    exitAfterFinish

    在數(shù)據(jù)消費(fèi)完成后,F(xiàn)link程序是否退出。

    String

    false

    參數(shù)取值如下:

    • true:數(shù)據(jù)消費(fèi)完后,F(xiàn)link程序退出。

    • false(默認(rèn)):數(shù)據(jù)消費(fèi)完后,F(xiàn)link程序不退出。

    說明

    僅實(shí)時計(jì)算引擎VVR 4.0.13及以上版本支持該參數(shù)。

    query

    SLS消費(fèi)預(yù)處理語句。

    String

    通過使用query參數(shù),您可以在消費(fèi)SLS數(shù)據(jù)之前對其進(jìn)行過濾,以避免將所有數(shù)據(jù)都消費(fèi)到Flink中,從而實(shí)現(xiàn)節(jié)約成本和提高處理速度的目的。

    例如 'query' = '*| where request_method = ''GET'''表示在Flink讀取SLS數(shù)據(jù)前,先匹配出request_method字段值等于get的數(shù)據(jù)。

    說明

    query需使用日志服務(wù)SPL語言,請參見SPL概述

    重要
    • 僅實(shí)時計(jì)算引擎VVR 8.0.1及以上版本支持該參數(shù)。

    • 日志服務(wù)SLS支持該功能的地域請參見基于規(guī)則消費(fèi)日志

    • 公測階段免費(fèi),后續(xù)可能會在產(chǎn)生日志服務(wù)SLS費(fèi)用,詳情請參見費(fèi)用說明

  • 結(jié)果表獨(dú)有

    參數(shù)

    說明

    數(shù)據(jù)類型

    是否必填

    默認(rèn)值

    備注

    topicField

    指定字段名,該字段的值會覆蓋__topic__屬性字段的值,表示日志的主題。

    String

    該參數(shù)值是表中已存在的字段之一。

    timeField

    指定字段名,該字段的值會覆蓋__timestamp__屬性字段的值,表示日志寫入時間。

    String

    當(dāng)前時間

    該參數(shù)值是表中已存在的字段之一,且字段類型必須為INT。如果未指定,則默認(rèn)填充當(dāng)前時間。

    sourceField

    指定字段名,該字段的值會覆蓋__source__屬性字段的值,表示日志的來源地,例如產(chǎn)生該日志機(jī)器的IP地址。

    String

    該參數(shù)值是表中已存在的字段之一。

    partitionField

    指定字段名,數(shù)據(jù)寫入時會根據(jù)該列值計(jì)算Hash值,Hash值相同的數(shù)據(jù)會寫入同一個shard。

    String

    如果未指定,則每條數(shù)據(jù)會隨機(jī)寫入當(dāng)前可用的Shard中。

    buckets

    當(dāng)指定partitionField時,根據(jù)Hash值重新分組的個數(shù)。

    String

    64

    該參數(shù)的取值范圍是[1, 256],且必須是2的整數(shù)次冪。同時,buckets個數(shù)應(yīng)當(dāng)大于等于Shard個數(shù),否則會出現(xiàn)部分Shard沒有數(shù)據(jù)寫入的情況。

    說明

    僅實(shí)時計(jì)算引擎VVR 6.0.5及以上版本支持該參數(shù)。

    flushIntervalMs

    觸發(fā)數(shù)據(jù)寫入的時間周期。

    String

    2000

    單位為毫秒。

    writeNullProperties

    是否將null值作為空字符串寫入SLS。

    Boolean

    true

    參數(shù)取值如下:

    • true(默認(rèn)值):將null值作為空字符串寫入日志。

    • false:計(jì)算結(jié)果為null的字段不會寫入到日志中。

    說明

    僅實(shí)時計(jì)算引擎VVR 8.0.6及以上版本支持該參數(shù)。

類型映射

Flink字段類型

SLS字段類型

BOOLEAN

STRING

VARBINARY

VARCHAR

TINYINT

INTEGER

BIGINT

FLOAT

DOUBLE

DECIMAL

代碼示例

CREATE TEMPORARY TABLE sls_input(
  `time` BIGINT,
  url STRING,
  dt STRING,
  float_field FLOAT,
  double_field DOUBLE,
  boolean_field BOOLEAN,
  `__topic__` STRING METADATA VIRTUAL,
  `__source__` STRING METADATA VIRTUAL,
  `__timestamp__` STRING METADATA VIRTUAL,
   __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
  proctime as PROCTIME()
) WITH (
  'connector' = 'sls',
  'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
  'accessId' = '${secret_values.ak_id}',
  'accessKey' = '${secret_values.ak_secret}',
  'starttime' = '2023-08-30 00:00:00',
  'project' ='sls-test',
  'logstore' ='sls-input'
);

CREATE TEMPORARY TABLE sls_sink(
  `time` BIGINT,
  url STRING,
  dt STRING,
  float_field FLOAT,
  double_field DOUBLE,
  boolean_field BOOLEAN,
  `__topic__` STRING,
  `__source__` STRING,
  `__timestamp__` BIGINT ,
  receive_time BIGINT
) WITH (
  'connector' = 'sls',
  'endpoint' ='cn-hangzhou-intranet.log.aliyuncs.com',
  'accessId' = '${ak_id}',
  'accessKey' = '${ak_secret}',
  'project' ='sls-test',
  'logstore' ='sls-output'
);

INSERT INTO sls_sink
SELECT 
 `time`,
  url,
  dt,
  float_field,
  double_field,
  boolean_field,
  `__topic__` ,
  `__source__` ,
  `__timestamp__` ,
  cast(__tag__['__receive_time__'] as bigint) as receive_time
FROM sls_input; 

DataStream API

重要

通過DataStream的方式讀寫數(shù)據(jù)時,則需要使用對應(yīng)的DataStream連接器連接Flink,DataStream連接器設(shè)置方法請參見DataStream連接器使用方法

讀取SLS

實(shí)時計(jì)算引擎VVR提供SourceFunction的實(shí)現(xiàn)類SlsSourceFunction,用于讀取SLS,讀取SLS的示例如下。

public class SlsDataStreamSource {

    public static void main(String[] args) throws Exception {
        // Sets up the streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Creates and adds SLS source and sink.
        env.addSource(createSlsSource())
                .map(SlsDataStreamSource::convertMessages)
                .print();
        env.execute("SLS Stream Source");
    }

    private static SlsSourceFunction createSlsSource() {
        SLSAccessInfo accessInfo = new SLSAccessInfo();
        accessInfo.setEndpoint("yourEndpoint");
        accessInfo.setProjectName("yourProject");
        accessInfo.setLogstore("yourLogStore");
        accessInfo.setAccessId("yourAccessId");
        accessInfo.setAccessKey("yourAccessKey");

        // The batch get size must be given.
        accessInfo.setBatchGetSize(10);

        // Optional parameters
        accessInfo.setConsumerGroup("yourConsumerGroup");
        accessInfo.setMaxRetries(3);

        // time to start consuming, set to current time.
        int startInSec = (int) (new Date().getTime() / 1000);

        // time to stop consuming, -1 means never stop.
        int stopInSec = -1;

        return new SlsSourceFunction(accessInfo, startInSec, stopInSec);
    }

    private static List<String> convertMessages(SourceRecord input) {
        List<String> res = new ArrayList<>();
        for (FastLogGroup logGroup : input.getLogGroups()) {
            int logsCount = logGroup.getLogsCount();
            for (int i = 0; i < logsCount; i++) {
                FastLog log = logGroup.getLogs(i);
                int fieldCount = log.getContentsCount();
                for (int idx = 0; idx < fieldCount; idx++) {
                    FastLogContent f = log.getContents(idx);
                    res.add(String.format("key: %s, value: %s", f.getKey(), f.getValue()));
                }
            }
        }
        return res;
    }
}

寫入SLS

提供OutputFormat的實(shí)現(xiàn)類SLSOutputFormat,用于寫入SLS。寫入SLS的示例如下。

public class SlsDataStreamSink {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.fromSequence(0, 100)
                .map((MapFunction<Long, SinkRecord>) aLong -> getSinkRecord(aLong))
                .addSink(createSlsSink())
                .name(SlsDataStreamSink.class.getSimpleName());
        env.execute("SLS Stream Sink");
    }

    private static OutputFormatSinkFunction createSlsSink() {
        Configuration conf = new Configuration();
        conf.setString(SLSOptions.ENDPOINT, "yourEndpoint");
        conf.setString(SLSOptions.PROJECT, "yourProject");
        conf.setString(SLSOptions.LOGSTORE, "yourLogStore");
        conf.setString(SLSOptions.ACCESS_ID, "yourAccessId");
        conf.setString(SLSOptions.ACCESS_KEY, "yourAccessKey");
        SLSOutputFormat outputFormat = new SLSOutputFormat(conf);
        return new OutputFormatSinkFunction<>(outputFormat);
    }

    private static SinkRecord getSinkRecord(Long seed) {
        SinkRecord record = new SinkRecord();
        LogItem logItem = new LogItem((int) (System.currentTimeMillis() / 1000));
        logItem.PushBack("level", "info");
        logItem.PushBack("name", String.valueOf(seed));
        logItem.PushBack("message", "it's a test message for " + seed.toString());
        record.setContent(logItem);
        return record;
    }

}

XML

Maven中央庫中已經(jīng)放置了SLS DataStream連接器

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-sls</artifactId>
    <version>${vvr-version}</version>
</dependency>

常見問題

恢復(fù)失敗的Flink程序時,TaskManager發(fā)生OOM,源表報(bào)錯java.lang.OutOfMemoryError: Java heap space