日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

通過實(shí)時(shí)計(jì)算處理數(shù)據(jù)并同步到Elasticsearch

更新時(shí)間:

當(dāng)您需要構(gòu)建一個(gè)日志檢索系統(tǒng)時(shí),可通過實(shí)時(shí)計(jì)算Flink對(duì)日志數(shù)據(jù)進(jìn)行計(jì)算后,輸出到Elasticsearch進(jìn)行搜索。本文以阿里云日志服務(wù)SLS(Log Service)為例,為您介紹具體的實(shí)現(xiàn)方法。

前提條件

您已完成以下操作:

背景信息

阿里云實(shí)時(shí)計(jì)算Flink是阿里云官方支持的Flink產(chǎn)品,支持包括Kafka、Elasticsearch等多種輸入輸出系統(tǒng)。實(shí)時(shí)計(jì)算Flink與Elasticsearch結(jié)合,能夠滿足典型的日志檢索場景。

Kafka或LOG等系統(tǒng)中的日志,經(jīng)過Flink進(jìn)行簡單或者復(fù)雜計(jì)算之后,輸出到Elasticsearch進(jìn)行搜索。結(jié)合Flink的強(qiáng)大計(jì)算能力與Elasticsearch的強(qiáng)大搜索能力,可為業(yè)務(wù)提供實(shí)時(shí)數(shù)據(jù)加工及查詢,助力業(yè)務(wù)實(shí)時(shí)化轉(zhuǎn)型。

實(shí)時(shí)計(jì)算Flink為您提供了非常簡單的方式來對(duì)接Elasticsearch。例如當(dāng)前業(yè)務(wù)中的日志或者數(shù)據(jù)被寫入了LOG中,并且需要對(duì)LOG中的數(shù)據(jù)進(jìn)行計(jì)算之后再寫到Elasticsearch中進(jìn)行搜索,可通過以下鏈路實(shí)現(xiàn)。Flink+ES數(shù)據(jù)鏈路

操作步驟

  1. 登錄實(shí)時(shí)計(jì)算控制臺(tái)

  2. 創(chuàng)建實(shí)時(shí)計(jì)算作業(yè)。

    具體操作,請(qǐng)參見阿里云實(shí)時(shí)計(jì)算Blink獨(dú)享模式文檔《Blink SQL開發(fā)指南》中的《作業(yè)開發(fā)》 > 《開發(fā)》章節(jié)。

  3. 編寫Flink SQL。

    1. 創(chuàng)建日志服務(wù)LOG源表。

      create table sls_stream(
        a int,
        b int,
        c VARCHAR
      )
      WITH (
        type ='sls',  
        endPoint ='<yourEndpoint>',
        accessId ='<yourAccessId>',
        accessKey ='<yourAccessKey>',
        startTime = '<yourStartTime>',
        project ='<yourProjectName>',
        logStore ='<yourLogStoreName>',
        consumerGroup ='<yourConsumerGroupName>'
      );

      WITH參數(shù)說明如下表。

      變量

      說明

      endPoint

      阿里云日志服務(wù)的公網(wǎng)服務(wù)入口,即訪問對(duì)應(yīng)LOG項(xiàng)目及其內(nèi)部日志數(shù)據(jù)的URL。詳細(xì)信息,請(qǐng)參見服務(wù)入口

      例如杭州區(qū)域的日志服務(wù)入口為:http://cn-hangzhou.log.aliyuncs.com。需要在對(duì)應(yīng)的服務(wù)入口前加http://

      accessId

      您賬號(hào)的AccessKey ID。

      accessKey

      您賬號(hào)的AccessKey Secret。

      startTime

      消費(fèi)日志開始的時(shí)間點(diǎn)。運(yùn)行Flink作業(yè)時(shí)所選時(shí)間要大于此處設(shè)置的時(shí)間。

      project

      LogService的項(xiàng)目名稱。

      logStore

      LogService項(xiàng)目下具體的LogStore名稱。

      consumerGroup

      日志服務(wù)的消費(fèi)組名稱。

    2. 創(chuàng)建Elasticsearch結(jié)果表。

      重要
      • 實(shí)時(shí)計(jì)算3.2.2及以上版本增加了Elasticsearch結(jié)果表功能。創(chuàng)建Flink作業(yè)時(shí),請(qǐng)注意所選的版本。

      • Elasticsearch結(jié)果表的實(shí)現(xiàn)使用了REST API,可以兼容Elasticsearch的各個(gè)版本。

      CREATE TABLE es_stream_sink(
        a int,
        cnt BIGINT,
        PRIMARY KEY(a)
      )
      WITH(
        type ='elasticsearch-7',
        endPoint = 'http://<instanceid>.public.elasticsearch.aliyuncs.com:<port>',
        accessId = '<yourAccessId>',
        accessKey = '<yourAccessSecret>',
        index = '<yourIndex>',
        typeName = '<yourTypeName>'
      );

      WITH參數(shù)說明如下。

      參數(shù)

      說明

      endPoint

      阿里云Elasticsearch實(shí)例的公網(wǎng)地址,格式為http://<instanceid>.public.elasticsearch.aliyuncs.com:9200。可在實(shí)例的基本信息頁面獲取,詳細(xì)信息請(qǐng)參見查看實(shí)例的基本信息

      accessId

      訪問阿里云Elasticsearch實(shí)例的用戶名,默認(rèn)為elastic。

      accessKey

      對(duì)應(yīng)用戶的密碼。elastic用戶的密碼在創(chuàng)建實(shí)例時(shí)設(shè)定,如果忘記可進(jìn)行重置,重置密碼的注意事項(xiàng)和操作步驟,請(qǐng)參見重置實(shí)例訪問密碼

      index

      索引名稱。如果您還未創(chuàng)建過索引,需要先創(chuàng)建一個(gè)索引。具體操作,請(qǐng)參見步驟三:創(chuàng)建索引。您也可以開啟自動(dòng)創(chuàng)建索引功能,自動(dòng)創(chuàng)建對(duì)應(yīng)索引。具體操作,請(qǐng)參見配置YML參數(shù)

      typeName

      索引類型。7.0及以上版本的Elasticsearch實(shí)例必須為_doc

      說明
      • Elasticsearch支持根據(jù)PRIMARY KEY更新文檔,且PRIMARY KEY只能為1個(gè)字段。指定PRIMARY KEY后,文檔的ID為PRIMARY KEY字段的值。未指定PRIMARY KEY,文檔的ID由系統(tǒng)隨機(jī)生成。詳細(xì)信息,請(qǐng)參見Index API

      • Elasticsearch支持多種更新模式,對(duì)應(yīng)WITH中的參數(shù)為updateMode

        • 當(dāng)updateMode=full時(shí),新增的文檔會(huì)完全覆蓋已存在的文檔。

        • 當(dāng)updateMode=inc時(shí),Elasticsearch會(huì)根據(jù)輸入的字段值更新對(duì)應(yīng)的字段。

      • Elasticsearch所有的更新默認(rèn)為UPSERT語義,即INSERT或UPDATE。

    3. 處理業(yè)務(wù)邏輯并同步數(shù)據(jù)。

      INSERT INTO es_stream_sink
      SELECT 
        a,
        count(*) as cnt
      FROM sls_stream GROUP BY a
  4. 上線并啟動(dòng)作業(yè)。

    上線并啟動(dòng)作業(yè)后,即可將日志服務(wù)中的數(shù)據(jù)進(jìn)行簡單聚合后寫入阿里云Elasticsearch中。

更多信息

使用實(shí)時(shí)計(jì)算Flink+Elasticsearch,可幫助您快速創(chuàng)建實(shí)時(shí)搜索鏈路。如果您有更復(fù)雜的Elasticsearch寫入需求,可以使用實(shí)時(shí)計(jì)算Flink的自定義Sink功能來實(shí)現(xiàn)。