本文為您介紹如何使用Elasticsearch連接器。
背景信息
阿里云Elasticsearch兼容開源Elasticsearch的功能,以及Security、Machine Learning、Graph、APM等商業功能,致力于數據分析、數據搜索等場景服務。為您提供企業級權限管控、安全監控告警、自動報表生成等場景服務。
Elasticsearch連接器支持的信息如下:
類別 | 詳情 |
支持類型 | 源表、維表和結果表 |
運行模式 | 批模式和流模式 |
數據格式 | JSON |
特有監控指標 |
說明 指標含義詳情,請參見監控指標說明。 |
API種類 | Datastream和SQL |
是否支持更新或刪除結果表數據 | 是 |
前提條件
已創建Elasticsearch索引,詳情請參見創建示例。
已配置Elasticsearch公網或私網訪問白名單,詳情請參見配置實例公網或私網訪問白名單。
使用限制
源表和維表支持大于等于6.8.x,但小于8.x版本的Elasticsearch。
結果表僅支持Elasticsearch 6.x、7.x和8.x版本。
僅Flink計算引擎VVR 2.0.0及以上版本支持Elasticsearch連接器。
僅支持全量Elasticsearch源表,不支持增量Elasticsearch源表。
語法結構
源表
CREATE TABLE elasticsearch_source( name STRING, location STRING, value FLOAT ) WITH ( 'connector' ='elasticsearch', 'endPoint' = '<yourEndPoint>', 'indexName' = '<yourIndexName>' );
維表
CREATE TABLE es_dim( field1 STRING, --作為JOIN時的Key,必須為STRING類型。 field2 FLOAT, field3 BIGINT, PRIMARY KEY (field1) NOT ENFORCED ) WITH ( 'connector' ='elasticsearch', 'endPoint' = '<yourEndPoint>', 'indexName' = '<yourIndexName>' );
說明如果指定主鍵,則維表JOIN時的Key(字段)有且只能有一個,且必須為Elasticsearch對應索引的文檔ID。
如果不指定主鍵,則維表JOIN時的Key可以有一個或多個,需要為Elasticsearch對應索引的文檔中的字段。
對于String類型,為了保持兼容性,默認會對表中字段名增加.keyword后綴。如果因此無法匹配到Elasticsearch中的Text字段,可以將配置項ignoreKeywordSuffix配置為true。
結果表
CREATE TABLE es_sink( user_id STRING, user_name STRING, uv BIGINT, pv BIGINT, PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'elasticsearch-7', -- 如果是Elasticsearch 6.x版本,填寫elasticsearch-6 'hosts' = '<yourHosts>', 'index' = '<yourIndex>' );
說明Elasticsearch結果表會根據是否定義了主鍵,確定是在upsert模式或append模式下工作。
如果定義了主鍵,則主鍵必須為文檔ID,Elasticsearch結果表將在upsert模式下工作,該模式可以處理包含UPDATE和DELETE的消息。
如果未定義主鍵,Elasticsearch將自動生成隨機的文檔ID,Elasticsearch結果表將在append模式工作,該模式只能消費INSERT消息。
某些類型(例如BYTES、ROW、ARRAY和MAP等)由于沒有對應的字符串表示形式,所以不允許其作為主鍵字段。
DDL中的字段均對應Elasticsearch文檔中的字段,不支持將文檔ID等Meta信息寫入Elasticsearch結果表中,因為文檔ID等Meta信息由Elasticsearch實例側維護。
WITH參數
源表
參數
說明
數據類型
是否必填
默認值
備注
connector
源表類型。
String
是
無
固定值為elasticsearch。
endPoint
Server地址。
String
是
無
例如:
http://127.0.0.1:XXXX
。indexName
索引名稱。
String
是
無
無。
accessId
Elasticsearch實例的用戶名。
String
否
無
默認為空,不進行權限驗證。如果定義了accessId,則必須定義非空的accessKey。
重要為了避免您的用戶名和密碼信息泄露,建議您通過密鑰管理的方式填寫用戶名和密碼取值,詳情請參見變量管理。
accessKey
Elasticsearch實例的密碼。
String
否
無
typeNames
Type名稱。
String
否
_doc
Elasticsearch 7.0以上版本不建議設置該參數。
batchSize
每個scroll請求從Elasticsearch集群獲取的最大文檔數。
Int
否
2000
無。
keepScrollAliveSecs
scroll上下文保留的最長時間。
Int
否
3600
單位為秒。
結果表
參數
說明
數據類型
是否必填
默認值
備注
connector
結果表類型。
String
是
無
固定值為
elasticsearch-6
、elasticsearch-7
或elasticsearch-8
。說明僅實時計算引擎VVR 8.0.5及以上版本支持配置為
elasticsearch-8
。hosts
Server地址。
String
是
無
例如:
127.0.0.1:XXXX
。index
索引名稱。
String
是
無
Elasticsearch結果表同時支持靜態索引和動態索引。在使用靜態和動態索引時,您需要注意以下幾點:
如果使用靜態索引,則索引選項值應為純字符串,例如
myusers
,所有記錄都將被寫入myusers
索引。如果使用動態索引,可以使用
{field_name}
引用記錄中的字段值以動態生成目標索引。您還可以使用{field_name|date_format_string}
將TIMESTAMP、DATE和TIME類型的字段值轉換為date_format_string
指定的格式。date_format_string
與Java的DateTimeFormatter兼容。例如,如果設置為myusers-{log_ts|yyyy-MM-dd}
,則log_ts字段值為2020-03-27 12:25:55
的記錄將被寫入myusers-2020-03-27
索引。
document-type
文檔類型。
String
elasticsearch-6:必填
elasticsearch-7:不支持
無
當連接器類型為
elasticsearch-6
時,此處參數取值需要和Elasticsearch側的type參數取值保持一致。username
用戶名。
String
否
空
默認為空,不進行權限驗證。如果定義了username,則必須定義非空的password。
重要為了避免您的用戶名和密碼信息泄露,建議您通過密鑰管理的方式填寫用戶名和密碼取值,詳情請參見變量管理。
password
密碼。
String
否
空
document-id.key-delimiter
文檔ID的分隔符。
String
否
_
在Elasticsearch結果表中,主鍵用于計算Elasticsearch的文檔ID。Elasticsearch結果表通過使用document-id.key-delimiter指定的鍵分隔符,按照DDL中定義的順序連接所有主鍵字段,從而為每一行生成一個文檔ID字符串。
說明文檔ID為最多512個字節但不包含空格的字符串。
failure-handler
Elasticsearch請求失敗時的故障處理策略。
String
否
fail
可選策略如下:
fail(默認值):如果請求失敗,則作業失敗。
ignore:忽略失敗并刪除請求。
retry-rejected:重新添加由于隊列容量滿而失敗的請求。
custom class name:用于使用ActionRequestFailureHandler子類進行故障處理。
sink.flush-on-checkpoint
是否在checkpoint時執行flush。
Boolean
否
true
true:默認值。
false:禁用該功能后,在Elasticsearch進行Checkpoint時,連接器將不等待確認所有pending請求是否已完成,故連接器不會為請求提供At-least-once保證。
sink.bulk-flush.backoff.strategy
如果由于臨時請求錯誤導致flush操作失敗,則設置sink.bulk-flush.backoff.strategy指定重試策略。
Enum
否
DISABLED
DISABLED(默認值):不執行重試,即第一次請求錯誤后失敗。
CONSTANT:常量回退,即每次回退等待時間相同。
EXPONENTIAL:指數回退,即每次回退等待時間指數遞增。
sink.bulk-flush.backoff.max-retries
最大回退重試次數。
Int
否
無
無。
sink.bulk-flush.backoff.delay
每次回退嘗試之間的延遲。
Duration
否
無
對于CONSTANT回退策略:該值為每次重試之間的延遲。
對于EXPONENTIAL回退策略:該值為初始基準延遲。
sink.bulk-flush.max-actions
每個批量請求的最大緩沖操作數。
Int
否
1000
0表示禁用該功能。
sink.bulk-flush.max-size
存放請求的緩沖區內存最大值。
String
否
2 MB
單位為MB,默認值為2 MB,0 MB表示禁用該功能。
sink.bulk-flush.interval
flush的間隔。
Duration
否
1s
單位為秒,默認值為1s,0s表示禁用該功能。
connection.path-prefix
要添加到每個REST通信中的前綴字符串。
String
否
空
無。
retry-on-conflict
更新操作中,允許因版本沖突異常而重試的最大次數。超過該次數后將拋出異常導致作業失敗。
Int
否
0
說明僅實時計算引擎VVR 4.0.13及以上版本支持該參數。
該參數僅在定義了主鍵的情況下生效。
routing-fields
指定一個或多個ES字段名稱,用來將文檔路由到Elasticsearch的指定分片中。
String
否
無
多個字段名以分號(;)進行分割。如果某個字段數據為空,則該字段會被置為null。
說明僅實時計算引擎VVR 8.0.6及以上版本,且elasticsearch-7和elasticsearch-8支持該參數。
sink.delete-strategy
用來配置收到回撤(-D/-U)類型消息時的行為
Enum
否
DELETE_ROW_ON_PK
可選行為如下:
DELETE_ROW_ON_PK(默認值):忽略-U類型的消息,但是在收到-D類型的消息時刪除主鍵對應的行(文檔)。
IGNORE_DELETE:忽略-U和-D 類型的消息,Elasticsearch Sink不發生回撤。
NON_PK_FIELD_TO_NULL:忽略 -U類型的消息,但是在收到-D類型的消息時,會修改主鍵對應的行(文檔),主鍵值保持不變,表 Schema中其他非主鍵值均置為 NULL。主要用在多個Sink同時寫入同一張Elasticsearch表時部分更新的場景。
CHANGELOG_STANDARD:和 DELETE_ROW_ON_PK類似,唯一的區別是該模式收到-U類型的消息時也會刪除主鍵對應的行(文檔)。
說明僅實時計算引擎VVR 8.0.8及以上版本支持該參數。
維表
參數
說明
數據類型
是否必填
默認值
備注
connector
維表類型。
String
是
無
固定值為elasticsearch。
endPoint
Server地址。
String
是
無
例如:
http://127.0.0.1:XXXX
。indexName
索引名稱。
String
是
無
無。
accessId
Elasticsearch實例的用戶名。
String
否
無
默認為空,不進行權限驗證。如果定義了accessId,則必須定義非空的accessKey。
重要為了避免您的用戶名和密碼信息泄露,建議您通過密鑰管理的方式填寫用戶名和密碼取值,詳情請參見變量管理。
accessKey
Elasticsearch實例的密碼。
String
否
無
typeNames
Type名稱。
String
否
_doc
Elasticsearch 7.0以上版本不建議設置該參數。
maxJoinRows
單行數據Join的最多行數。
Integer
否
1024
無。
cache
緩存策略。
String
否
ALL
支持以下三種緩存策略:
ALL:緩存維表里的所有數據。在Job運行前,系統會將維表中的所有數據加載到Cache中,之后所有的維表查找數據都會通過Cache進行。如果在Cache中無法找到數據,則KEY不存在,并在Cache過期后重新加載一遍全量Cache。
LRU:緩存維表里的部分數據。源表的每條數據都會觸發系統先在Cache中查找數據。如果沒有找到,則去物理維表中查找。
None:無緩存。
cacheSize
緩存大小,即緩存多少行數據。
Long
否
100000
僅當cache選擇LRU緩存策略時,cacheSize參數生效。
cacheTTLMs
緩存失效的超時時間。
Long
否
Long.MAX_VALUE
單位為毫秒。cacheTTLMs配置和cache配置有關:
如果cache配置為LRU,則cacheTTLMs為緩存失效的超時時間,默認不過期。
如果cache配置為ALL,則cacheTTLMs為設置緩存重新加載的間隔時間,默認不重新加載。
ignoreKeywordSuffix
是否忽略自動為String字段添加的.keyword后綴。
Boolean
否
false
為了保證兼容性,Flink將Elasticsearch中的Text類型轉換為String,并默認在String類型字段名后增加.keyword后綴。
參數取值如下:
true:忽略。
如果因此無法匹配到Elasticsearch中的Text類型字段,需要將該參數配置為true。
false:不忽略。
cacheEmpty
是否緩存物理維表中查找結果為空的結果。
Boolean
否
true
僅當cache選擇LRU緩存策略時,cacheEmpty參數生效。
queryMaxDocs
非主鍵維表的輸入端每條數據到來后,查詢ElasticSearch Server時返回的最大文檔條數。
Integer
否
10000
默認值10000是ElasticSearch Server返回文檔條數的最大限制,該配置項的取值不能超過這個上限。
說明僅實時計算引擎VVR 8.0.8及以上版本支持該參數。
該參數僅對非主鍵維表生效,因為主鍵表中數據是唯一的。
為了查詢的正確性,默認值給的比較大。但是該值會增大查詢Elasticsearch時的內存占用,確實遇到內存問題后,可以適當降低該值來優化內存使用。
類型映射
Flink以JSON來解析Elasticsearch數據,詳情請參見數據類型映射關系。
使用示例
源表示例
CREATE TEMPORARY TABLE elasticsearch_source ( name STRING, location STRING, `value` FLOAT ) WITH ( 'connector' ='elasticsearch', 'endPoint' = '<yourEndPoint>', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'indexName' = '<yourIndexName>', 'typeNames' = '<yourTypeName>' ); CREATE TEMPORARY TABLE blackhole_sink ( name STRING, location STRING, `value` FLOAT ) WITH ( 'connector' ='blackhole' ); INSERT INTO blackhole_sink SELECT name, location, `value` FROM elasticsearch_source;
維表示例
CREATE TEMPORARY TABLE datagen_source ( id STRING, data STRING, proctime as PROCTIME() ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE es_dim ( id STRING, `value` FLOAT, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' ='elasticsearch', 'endPoint' = '<yourEndPoint>', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'indexName' = '<yourIndexName>', 'typeNames' = '<yourTypeName>' ); CREATE TEMPORARY TABLE blackhole_sink ( id STRING, data STRING, `value` FLOAT ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT e.*, w.* FROM datagen_source AS e JOIN es_dim FOR SYSTEM_TIME AS OF e.proctime AS w ON e.id = w.id;
結果表示例1
CREATE TEMPORARY TABLE datagen_source ( id STRING, name STRING, uv BIGINT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE es_sink ( user_id STRING, user_name STRING, uv BIGINT, PRIMARY KEY (user_id) NOT ENFORCED -- 主鍵可選,如果定義了主鍵,則作為文檔ID,否則文檔ID將為隨機值。 ) WITH ( 'connector' = 'elasticsearch-6', 'hosts' = '<yourHosts>', 'index' = '<yourIndex>', 'document-type' = '<yourElasticsearch.types>', 'username' ='${secret_values.ak_id}', 'password' ='${secret_values.ak_secret}' ); INSERT INTO es_sink SELECT id, name, uv FROM datagen_source;
結果表示例2
CREATE TEMPORARY TABLE datagen_source( id STRING, details ROW< name STRING, ages ARRAY<INT>, attributes MAP<STRING, STRING> > ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE es_sink ( id STRING, details ROW< name STRING, ages ARRAY<INT>, attributes MAP<STRING, STRING> >, PRIMARY KEY (id) NOT ENFORCED -- 主鍵可選,如果定義了主鍵,則作為文檔ID,否則文檔ID將為隨機值。 ) WITH ( 'connector' = 'elasticsearch-6', 'hosts' = '<yourHosts>', 'index' = '<yourIndex>', 'document-type' = '<yourElasticsearch.types>', 'username' ='${secret_values.ak_id}', 'password' ='${secret_values.ak_secret}' ); INSERT INTO es_sink SELECT id, details FROM datagen_source;