日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

Elasticsearch

本文為您介紹如何使用Elasticsearch連接器。

背景信息

阿里云Elasticsearch兼容開源Elasticsearch的功能,以及Security、Machine Learning、Graph、APM等商業功能,致力于數據分析、數據搜索等場景服務。為您提供企業級權限管控、安全監控告警、自動報表生成等場景服務。

Elasticsearch連接器支持的信息如下:

類別

詳情

支持類型

源表、維表和結果表

運行模式

批模式和流模式

數據格式

JSON

特有監控指標

  • 源表

    • pendingRecords

    • numRecordsIn

    • numRecordsInPerSecond

    • numBytesIn

    • numBytesInPerSecond

  • 維表

  • 結果表 ( VVR 6.0.6及以上 )

    • numRecordsOut

    • numRecordsOutPerSecond

說明

指標含義詳情,請參見監控指標說明

API種類

Datastream和SQL

是否支持更新或刪除結果表數據

前提條件

使用限制

  • 源表和維表支持大于等于6.8.x,但小于8.x版本的Elasticsearch。

  • 結果表僅支持Elasticsearch 6.x、7.x和8.x版本。

  • 僅Flink計算引擎VVR 2.0.0及以上版本支持Elasticsearch連接器。

  • 僅支持全量Elasticsearch源表,不支持增量Elasticsearch源表。

語法結構

  • 源表

    CREATE TABLE elasticsearch_source(
      name STRING,
      location STRING,
      value FLOAT
    ) WITH (
      'connector' ='elasticsearch',
      'endPoint' = '<yourEndPoint>',
      'indexName' = '<yourIndexName>'
    );
  • 維表

    CREATE TABLE es_dim(
      field1 STRING, --作為JOIN時的Key,必須為STRING類型。
      field2 FLOAT,
      field3 BIGINT,
      PRIMARY KEY (field1) NOT ENFORCED
    ) WITH (
      'connector' ='elasticsearch',
      'endPoint' = '<yourEndPoint>',
      'indexName' = '<yourIndexName>'
    );
    說明
    • 如果指定主鍵,則維表JOIN時的Key(字段)有且只能有一個,且必須為Elasticsearch對應索引的文檔ID。

    • 如果不指定主鍵,則維表JOIN時的Key可以有一個或多個,需要為Elasticsearch對應索引的文檔中的字段。

    • 對于String類型,為了保持兼容性,默認會對表中字段名增加.keyword后綴。如果因此無法匹配到Elasticsearch中的Text字段,可以將配置項ignoreKeywordSuffix配置為true。

  • 結果表

    CREATE TABLE es_sink(
      user_id   STRING,
      user_name   STRING,
      uv BIGINT,
      pv BIGINT,
      PRIMARY KEY (user_id) NOT ENFORCED
    ) WITH (
      'connector' = 'elasticsearch-7', -- 如果是Elasticsearch 6.x版本,填寫elasticsearch-6
      'hosts' = '<yourHosts>',
      'index' = '<yourIndex>'
    );
    說明
    • Elasticsearch結果表會根據是否定義了主鍵,確定是在upsert模式或append模式下工作。

      • 如果定義了主鍵,則主鍵必須為文檔ID,Elasticsearch結果表將在upsert模式下工作,該模式可以處理包含UPDATE和DELETE的消息。

      • 如果未定義主鍵,Elasticsearch將自動生成隨機的文檔ID,Elasticsearch結果表將在append模式工作,該模式只能消費INSERT消息。

    • 某些類型(例如BYTES、ROW、ARRAY和MAP等)由于沒有對應的字符串表示形式,所以不允許其作為主鍵字段。

    • DDL中的字段均對應Elasticsearch文檔中的字段,不支持將文檔ID等Meta信息寫入Elasticsearch結果表中,因為文檔ID等Meta信息由Elasticsearch實例側維護。

WITH參數

  • 源表

    參數

    說明

    數據類型

    是否必填

    默認值

    備注

    connector

    源表類型。

    String

    固定值為elasticsearch。

    endPoint

    Server地址。

    String

    例如:http://127.0.0.1:XXXX

    indexName

    索引名稱。

    String

    無。

    accessId

    Elasticsearch實例的用戶名。

    String

    默認為空,不進行權限驗證。如果定義了accessId,則必須定義非空的accessKey

    重要

    為了避免您的用戶名和密碼信息泄露,建議您通過密鑰管理的方式填寫用戶名和密碼取值,詳情請參見變量管理

    accessKey

    Elasticsearch實例的密碼。

    String

    typeNames

    Type名稱。

    String

    _doc

    Elasticsearch 7.0以上版本不建議設置該參數。

    batchSize

    每個scroll請求從Elasticsearch集群獲取的最大文檔數。

    Int

    2000

    無。

    keepScrollAliveSecs

    scroll上下文保留的最長時間。

    Int

    3600

    單位為秒。

  • 結果表

    參數

    說明

    數據類型

    是否必填

    默認值

    備注

    connector

    結果表類型。

    String

    固定值為elasticsearch-6elasticsearch-7 elasticsearch-8

    說明

    僅實時計算引擎VVR 8.0.5及以上版本支持配置為elasticsearch-8

    hosts

    Server地址。

    String

    例如:127.0.0.1:XXXX

    index

    索引名稱。

    String

    Elasticsearch結果表同時支持靜態索引和動態索引。在使用靜態和動態索引時,您需要注意以下幾點:

    • 如果使用靜態索引,則索引選項值應為純字符串,例如myusers,所有記錄都將被寫入myusers索引。

    • 如果使用動態索引,可以使用{field_name}引用記錄中的字段值以動態生成目標索引。您還可以使用{field_name|date_format_string}將TIMESTAMP、DATE和TIME類型的字段值轉換為date_format_string指定的格式。date_format_string與Java的DateTimeFormatter兼容。例如,如果設置為myusers-{log_ts|yyyy-MM-dd},則log_ts字段值為2020-03-27 12:25:55的記錄將被寫入myusers-2020-03-27索引。

    document-type

    文檔類型。

    String

    • elasticsearch-6:必填

    • elasticsearch-7:不支持

    當連接器類型為elasticsearch-6時,此處參數取值需要和Elasticsearch側的type參數取值保持一致。

    username

    用戶名。

    String

    默認為空,不進行權限驗證。如果定義了username,則必須定義非空的password。

    重要

    為了避免您的用戶名和密碼信息泄露,建議您通過密鑰管理的方式填寫用戶名和密碼取值,詳情請參見變量管理

    password

    密碼。

    String

    document-id.key-delimiter

    文檔ID的分隔符。

    String

    _

    在Elasticsearch結果表中,主鍵用于計算Elasticsearch的文檔ID。Elasticsearch結果表通過使用document-id.key-delimiter指定的鍵分隔符,按照DDL中定義的順序連接所有主鍵字段,從而為每一行生成一個文檔ID字符串。

    說明

    文檔ID為最多512個字節但不包含空格的字符串。

    failure-handler

    Elasticsearch請求失敗時的故障處理策略。

    String

    fail

    可選策略如下:

    • fail(默認值):如果請求失敗,則作業失敗。

    • ignore:忽略失敗并刪除請求。

    • retry-rejected:重新添加由于隊列容量滿而失敗的請求。

    • custom class name:用于使用ActionRequestFailureHandler子類進行故障處理。

    sink.flush-on-checkpoint

    是否在checkpoint時執行flush。

    Boolean

    true

    • true:默認值。

    • false:禁用該功能后,在Elasticsearch進行Checkpoint時,連接器將不等待確認所有pending請求是否已完成,故連接器不會為請求提供At-least-once保證。

    sink.bulk-flush.backoff.strategy

    如果由于臨時請求錯誤導致flush操作失敗,則設置sink.bulk-flush.backoff.strategy指定重試策略。

    Enum

    DISABLED

    • DISABLED(默認值):不執行重試,即第一次請求錯誤后失敗。

    • CONSTANT:常量回退,即每次回退等待時間相同。

    • EXPONENTIAL:指數回退,即每次回退等待時間指數遞增。

    sink.bulk-flush.backoff.max-retries

    最大回退重試次數。

    Int

    無。

    sink.bulk-flush.backoff.delay

    每次回退嘗試之間的延遲。

    Duration

    • 對于CONSTANT回退策略:該值為每次重試之間的延遲。

    • 對于EXPONENTIAL回退策略:該值為初始基準延遲。

    sink.bulk-flush.max-actions

    每個批量請求的最大緩沖操作數。

    Int

    1000

    0表示禁用該功能。

    sink.bulk-flush.max-size

    存放請求的緩沖區內存最大值。

    String

    2 MB

    單位為MB,默認值為2 MB,0 MB表示禁用該功能。

    sink.bulk-flush.interval

    flush的間隔。

    Duration

    1s

    單位為秒,默認值為1s,0s表示禁用該功能。

    connection.path-prefix

    要添加到每個REST通信中的前綴字符串。

    String

    無。

    retry-on-conflict

    更新操作中,允許因版本沖突異常而重試的最大次數。超過該次數后將拋出異常導致作業失敗。

    Int

    0

    說明
    • 僅實時計算引擎VVR 4.0.13及以上版本支持該參數。

    • 該參數僅在定義了主鍵的情況下生效。

    routing-fields

    指定一個或多個ES字段名稱,用來將文檔路由到Elasticsearch的指定分片中。

    String

    多個字段名以分號(;)進行分割。如果某個字段數據為空,則該字段會被置為null。

    說明

    僅實時計算引擎VVR 8.0.6及以上版本,且elasticsearch-7和elasticsearch-8支持該參數。

    sink.delete-strategy

    用來配置收到回撤(-D/-U)類型消息時的行為

    Enum

    DELETE_ROW_ON_PK

    可選行為如下:

    • DELETE_ROW_ON_PK(默認值):忽略-U類型的消息,但是在收到-D類型的消息時刪除主鍵對應的行(文檔)。

    • IGNORE_DELETE:忽略-U和-D 類型的消息,Elasticsearch Sink不發生回撤。

    • NON_PK_FIELD_TO_NULL:忽略 -U類型的消息,但是在收到-D類型的消息時,會修改主鍵對應的行(文檔),主鍵值保持不變,表 Schema中其他非主鍵值均置為 NULL。主要用在多個Sink同時寫入同一張Elasticsearch表時部分更新的場景。

    • CHANGELOG_STANDARD:和 DELETE_ROW_ON_PK類似,唯一的區別是該模式收到-U類型的消息時也會刪除主鍵對應的行(文檔)。

      說明

      僅實時計算引擎VVR 8.0.8及以上版本支持該參數。

  • 維表

    參數

    說明

    數據類型

    是否必填

    默認值

    備注

    connector

    維表類型。

    String

    固定值為elasticsearch。

    endPoint

    Server地址。

    String

    例如:http://127.0.0.1:XXXX

    indexName

    索引名稱。

    String

    無。

    accessId

    Elasticsearch實例的用戶名。

    String

    默認為空,不進行權限驗證。如果定義了accessId,則必須定義非空的accessKey

    重要

    為了避免您的用戶名和密碼信息泄露,建議您通過密鑰管理的方式填寫用戶名和密碼取值,詳情請參見變量管理

    accessKey

    Elasticsearch實例的密碼。

    String

    typeNames

    Type名稱。

    String

    _doc

    Elasticsearch 7.0以上版本不建議設置該參數。

    maxJoinRows

    單行數據Join的最多行數。

    Integer

    1024

    無。

    cache

    緩存策略。

    String

    ALL

    支持以下三種緩存策略:

    • ALL:緩存維表里的所有數據。在Job運行前,系統會將維表中的所有數據加載到Cache中,之后所有的維表查找數據都會通過Cache進行。如果在Cache中無法找到數據,則KEY不存在,并在Cache過期后重新加載一遍全量Cache。

    • LRU:緩存維表里的部分數據。源表的每條數據都會觸發系統先在Cache中查找數據。如果沒有找到,則去物理維表中查找。

    • None:無緩存。

    cacheSize

    緩存大小,即緩存多少行數據。

    Long

    100000

    僅當cache選擇LRU緩存策略時,cacheSize參數生效。

    cacheTTLMs

    緩存失效的超時時間。

    Long

    Long.MAX_VALUE

    單位為毫秒。cacheTTLMs配置和cache配置有關:

    • 如果cache配置為LRU,則cacheTTLMs為緩存失效的超時時間,默認不過期。

    • 如果cache配置為ALL,則cacheTTLMs為設置緩存重新加載的間隔時間,默認不重新加載。

    ignoreKeywordSuffix

    是否忽略自動為String字段添加的.keyword后綴。

    Boolean

    false

    為了保證兼容性,Flink將Elasticsearch中的Text類型轉換為String,并默認在String類型字段名后增加.keyword后綴。

    參數取值如下:

    • true:忽略。

      如果因此無法匹配到Elasticsearch中的Text類型字段,需要將該參數配置為true。

    • false:不忽略。

    cacheEmpty

    是否緩存物理維表中查找結果為空的結果。

    Boolean

    true

    僅當cache選擇LRU緩存策略時,cacheEmpty參數生效。

    queryMaxDocs

    非主鍵維表的輸入端每條數據到來后,查詢ElasticSearch Server時返回的最大文檔條數。

    Integer

    10000

    默認值10000是ElasticSearch Server返回文檔條數的最大限制,該配置項的取值不能超過這個上限。

    說明
    • 僅實時計算引擎VVR 8.0.8及以上版本支持該參數。

    • 該參數僅對非主鍵維表生效,因為主鍵表中數據是唯一的。

    • 為了查詢的正確性,默認值給的比較大。但是該值會增大查詢Elasticsearch時的內存占用,確實遇到內存問題后,可以適當降低該值來優化內存使用。

類型映射

Flink以JSON來解析Elasticsearch數據,詳情請參見數據類型映射關系

使用示例

  • 源表示例

    CREATE TEMPORARY TABLE elasticsearch_source (
      name STRING,
      location STRING,
      `value` FLOAT
    ) WITH (
      'connector' ='elasticsearch',
      'endPoint' = '<yourEndPoint>',
      'accessId' = '${secret_values.ak_id}',
      'accessKey' = '${secret_values.ak_secret}',
      'indexName' = '<yourIndexName>',
      'typeNames' = '<yourTypeName>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink (
      name STRING,
      location STRING,
      `value` FLOAT
    ) WITH (
      'connector' ='blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT name, location, `value`
    FROM elasticsearch_source;
  • 維表示例

    CREATE TEMPORARY TABLE datagen_source (
      id STRING, 
      data STRING,
      proctime as PROCTIME()
    ) WITH (
      'connector' = 'datagen' 
    );
    
    CREATE TEMPORARY TABLE es_dim (
      id STRING,
      `value` FLOAT,
      PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
      'connector' ='elasticsearch',
      'endPoint' = '<yourEndPoint>',
      'accessId' = '${secret_values.ak_id}',
      'accessKey' = '${secret_values.ak_secret}',
      'indexName' = '<yourIndexName>',
      'typeNames' = '<yourTypeName>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink (
      id STRING,
      data STRING,
      `value` FLOAT
    ) WITH (
      'connector' = 'blackhole' 
    );
    
    INSERT INTO blackhole_sink
    SELECT e.*, w.*
    FROM datagen_source AS e
    JOIN es_dim FOR SYSTEM_TIME AS OF e.proctime AS w
    ON e.id = w.id;
  • 結果表示例1

    CREATE TEMPORARY TABLE datagen_source (
      id STRING, 
      name STRING,
      uv BIGINT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE es_sink (
      user_id STRING,
      user_name STRING,
      uv BIGINT,
      PRIMARY KEY (user_id) NOT ENFORCED -- 主鍵可選,如果定義了主鍵,則作為文檔ID,否則文檔ID將為隨機值。
    ) WITH (
      'connector' = 'elasticsearch-6',
      'hosts' = '<yourHosts>',
      'index' = '<yourIndex>',
      'document-type' = '<yourElasticsearch.types>',
      'username' ='${secret_values.ak_id}',
      'password' ='${secret_values.ak_secret}'
    );
    
    INSERT INTO es_sink
    SELECT id, name, uv
    FROM datagen_source;
  • 結果表示例2

    CREATE TEMPORARY TABLE datagen_source(  
      id STRING,
        details ROW<  
            name STRING,  
            ages ARRAY<INT>,  
            attributes MAP<STRING, STRING>  
        >
    ) WITH (  
        'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE es_sink (
      id STRING,
        details ROW<  
            name STRING,  
            ages ARRAY<INT>,  
            attributes MAP<STRING, STRING>  
        >, 
      PRIMARY KEY (id) NOT ENFORCED  -- 主鍵可選,如果定義了主鍵,則作為文檔ID,否則文檔ID將為隨機值。
    ) WITH (
      'connector' = 'elasticsearch-6',
      'hosts' = '<yourHosts>',
      'index' = '<yourIndex>',
      'document-type' = '<yourElasticsearch.types>',
      'username' ='${secret_values.ak_id}',
      'password' ='${secret_values.ak_secret}'
    );
    
    INSERT INTO es_sink
    SELECT id, details
    FROM datagen_source;