通過實(shí)時(shí)計(jì)算處理數(shù)據(jù)并同步到Elasticsearch
當(dāng)您需要構(gòu)建一個(gè)日志檢索系統(tǒng)時(shí),可通過實(shí)時(shí)計(jì)算Flink對(duì)日志數(shù)據(jù)進(jìn)行計(jì)算后,輸出到Elasticsearch進(jìn)行搜索。本文以阿里云日志服務(wù)SLS(Log Service)為例,為您介紹具體的實(shí)現(xiàn)方法。
前提條件
您已完成以下操作:
開通阿里云實(shí)時(shí)計(jì)算服務(wù)并創(chuàng)建項(xiàng)目。
創(chuàng)建阿里云Elasticsearch實(shí)例。
具體操作,請(qǐng)參見創(chuàng)建阿里云Elasticsearch實(shí)例。
開通SLS服務(wù)、創(chuàng)建Project和Logstore。
具體操作,請(qǐng)參見開通阿里云日志服務(wù)、創(chuàng)建Project和創(chuàng)建Logstore。
背景信息
阿里云實(shí)時(shí)計(jì)算Flink是阿里云官方支持的Flink產(chǎn)品,支持包括Kafka、Elasticsearch等多種輸入輸出系統(tǒng)。實(shí)時(shí)計(jì)算Flink與Elasticsearch結(jié)合,能夠滿足典型的日志檢索場景。
Kafka或LOG等系統(tǒng)中的日志,經(jīng)過Flink進(jìn)行簡單或者復(fù)雜計(jì)算之后,輸出到Elasticsearch進(jìn)行搜索。結(jié)合Flink的強(qiáng)大計(jì)算能力與Elasticsearch的強(qiáng)大搜索能力,可為業(yè)務(wù)提供實(shí)時(shí)數(shù)據(jù)加工及查詢,助力業(yè)務(wù)實(shí)時(shí)化轉(zhuǎn)型。
實(shí)時(shí)計(jì)算Flink為您提供了非常簡單的方式來對(duì)接Elasticsearch。例如當(dāng)前業(yè)務(wù)中的日志或者數(shù)據(jù)被寫入了LOG中,并且需要對(duì)LOG中的數(shù)據(jù)進(jìn)行計(jì)算之后再寫到Elasticsearch中進(jìn)行搜索,可通過以下鏈路實(shí)現(xiàn)。
操作步驟
創(chuàng)建實(shí)時(shí)計(jì)算作業(yè)。
具體操作,請(qǐng)參見阿里云實(shí)時(shí)計(jì)算Blink獨(dú)享模式文檔《Blink SQL開發(fā)指南》中的《作業(yè)開發(fā)》 > 《開發(fā)》章節(jié)。
編寫Flink SQL。
創(chuàng)建日志服務(wù)LOG源表。
create table sls_stream( a int, b int, c VARCHAR ) WITH ( type ='sls', endPoint ='<yourEndpoint>', accessId ='<yourAccessId>', accessKey ='<yourAccessKey>', startTime = '<yourStartTime>', project ='<yourProjectName>', logStore ='<yourLogStoreName>', consumerGroup ='<yourConsumerGroupName>' );
WITH參數(shù)說明如下表。
變量
說明
endPoint
阿里云日志服務(wù)的公網(wǎng)服務(wù)入口,即訪問對(duì)應(yīng)LOG項(xiàng)目及其內(nèi)部日志數(shù)據(jù)的URL。詳細(xì)信息,請(qǐng)參見服務(wù)入口。
例如杭州區(qū)域的日志服務(wù)入口為:http://cn-hangzhou.log.aliyuncs.com。需要在對(duì)應(yīng)的服務(wù)入口前加http://。
accessId
您賬號(hào)的AccessKey ID。
accessKey
您賬號(hào)的AccessKey Secret。
startTime
消費(fèi)日志開始的時(shí)間點(diǎn)。運(yùn)行Flink作業(yè)時(shí)所選時(shí)間要大于此處設(shè)置的時(shí)間。
project
LogService的項(xiàng)目名稱。
logStore
LogService項(xiàng)目下具體的LogStore名稱。
consumerGroup
日志服務(wù)的消費(fèi)組名稱。
創(chuàng)建Elasticsearch結(jié)果表。
重要實(shí)時(shí)計(jì)算3.2.2及以上版本增加了Elasticsearch結(jié)果表功能。創(chuàng)建Flink作業(yè)時(shí),請(qǐng)注意所選的版本。
Elasticsearch結(jié)果表的實(shí)現(xiàn)使用了REST API,可以兼容Elasticsearch的各個(gè)版本。
CREATE TABLE es_stream_sink( a int, cnt BIGINT, PRIMARY KEY(a) ) WITH( type ='elasticsearch-7', endPoint = 'http://<instanceid>.public.elasticsearch.aliyuncs.com:<port>', accessId = '<yourAccessId>', accessKey = '<yourAccessSecret>', index = '<yourIndex>', typeName = '<yourTypeName>' );
WITH參數(shù)說明如下。
參數(shù)
說明
endPoint
阿里云Elasticsearch實(shí)例的公網(wǎng)地址,格式為http://<instanceid>.public.elasticsearch.aliyuncs.com:9200。可在實(shí)例的基本信息頁面獲取,詳細(xì)信息請(qǐng)參見查看實(shí)例的基本信息。
accessId
訪問阿里云Elasticsearch實(shí)例的用戶名,默認(rèn)為elastic。
accessKey
對(duì)應(yīng)用戶的密碼。elastic用戶的密碼在創(chuàng)建實(shí)例時(shí)設(shè)定,如果忘記可進(jìn)行重置,重置密碼的注意事項(xiàng)和操作步驟,請(qǐng)參見重置實(shí)例訪問密碼。
index
索引名稱。如果您還未創(chuàng)建過索引,需要先創(chuàng)建一個(gè)索引。具體操作,請(qǐng)參見步驟三:創(chuàng)建索引。您也可以開啟自動(dòng)創(chuàng)建索引功能,自動(dòng)創(chuàng)建對(duì)應(yīng)索引。具體操作,請(qǐng)參見配置YML參數(shù)。
typeName
索引類型。7.0及以上版本的Elasticsearch實(shí)例必須為_doc。
說明Elasticsearch支持根據(jù)PRIMARY KEY更新文檔,且
PRIMARY KEY
只能為1個(gè)字段。指定PRIMARY KEY
后,文檔的ID為PRIMARY KEY
字段的值。未指定PRIMARY KEY
,文檔的ID由系統(tǒng)隨機(jī)生成。詳細(xì)信息,請(qǐng)參見Index API。Elasticsearch支持多種更新模式,對(duì)應(yīng)WITH中的參數(shù)為updateMode:
當(dāng)
updateMode=full
時(shí),新增的文檔會(huì)完全覆蓋已存在的文檔。當(dāng)
updateMode=inc
時(shí),Elasticsearch會(huì)根據(jù)輸入的字段值更新對(duì)應(yīng)的字段。
Elasticsearch所有的更新默認(rèn)為UPSERT語義,即INSERT或UPDATE。
處理業(yè)務(wù)邏輯并同步數(shù)據(jù)。
INSERT INTO es_stream_sink SELECT a, count(*) as cnt FROM sls_stream GROUP BY a
上線并啟動(dòng)作業(yè)。
上線并啟動(dòng)作業(yè)后,即可將日志服務(wù)中的數(shù)據(jù)進(jìn)行簡單聚合后寫入阿里云Elasticsearch中。
更多信息
使用實(shí)時(shí)計(jì)算Flink+Elasticsearch,可幫助您快速創(chuàng)建實(shí)時(shí)搜索鏈路。如果您有更復(fù)雜的Elasticsearch寫入需求,可以使用實(shí)時(shí)計(jì)算Flink的自定義Sink功能來實(shí)現(xiàn)。