準備工作
1. 創建ES index
DataHub?持將數據同步到Elasticsearch對應的index中,目前支持ES5、ES6和ES7的實例。
?前DataHub僅?持將TUPLE類型Topic的數據同步到Elasticsearch中。開始同步任務之前請保證已經在ES中創建index或者允許自動創建index,否則同步任務會失敗。
2.準備同步任務賬號并授權
新建同步ES任務時,需要用戶手動填寫ES的endpoint、index和賬號密碼等信息。請確保所填入信息賬號信息真實有效,否則會導致創建同步任務失敗。
創建同步任務
一次進入
項目列表/project詳情/Topic詳情
頁面點擊右上角的
+同步
選擇
Elasticsearch
類型作業,如下圖所指示:
配置說明
1. Endpoint
ElasticSearch服務地址,需要填寫內網地址和和內網端口,格式為內網地
址:
內網端口
。
例如:內網地址為 es-cn-xxx.elasticsearch.aliyuncs.com
,內網端口為9200
,則填入es-cn-xxx.elasticsearch.aliyuncs.com:9200
。
2. Index
目前支持兩種index的指定方式:靜態index和動態index。
靜態index
用戶預先創建好一個index或者允許自動創建index,所有的數據都會寫入該index。
動態index
用戶需要允許自動創建index,否則會寫入失敗。用戶可以指定一個時間周期或者指定某列作為index的生成方式。如果采用數據列生成index,那么最多可以選擇1列。
支持配置的時間格式
年 | 月 | 日 | 周 |
%Y | %m | %d | %U |
示例一:每天凌晨生成一個新的index配置index為test_${%Y-%m-%d}
,如果當前日期為2021年3月31日,那么最終寫入的index為test_2021-03-31
示例二:根據數據列生成新的index數據列中包含有一列col1
,配置index為test_${col1}
,如果有兩條數據,這兩條數據的col1分別為AAA
和BBB
,那么這兩條數據寫入的index分別為test_AAA
和test_BBB
。
當ES中的index數量增多時,寫入數據會變慢,過多可能會導致DataHub寫入超時,因此用戶使用動態index時,需要盡量避免生成的index數量過多
3. User/Password
訪問ES的用戶名密碼。
4. Type屬性列
針對不同版本,DataHub同步ES的生成的Type也不一樣。在ES5中,用戶可以在一個index中創建多個type,但是ES6中,用戶只能在一個index中創建一個type,因此,DataHub同步ES的行為也有所改變。Type不可以為空。
對于ES5,DataHub同步數據時,將會以用戶選擇作為Type的列的值作為一條數據的type,如果選擇多列,則多列的值會以 “|” 分割作為一條數據的type。選擇作為Type屬性列的字段不能為null。
對于ES6,DataHub同步數據時,將會以用戶選擇的列的列名作為一條數據的type,如果選擇多列,則多列的列名會以“|”分割作為一條數據的type,并且ES6支持以任意名稱作為type。
例如:
DataHub Schema : f1 string, f2 string, f3 string, f4 string
數據 : ["test1","test2","test3",null]
type屬性列 | ES5 type | ES6 type |
f1 | test1 | f1 |
f1,f3 | test1 | test3 |
ff | 創建失敗 | ff |
f1,ff | 創建失敗 | f1|ff |
f4 | 創建成功,但同步失敗,臟數據 | 創建成功,并成功同步 |
備注:
目前在頁面上創建同步ES6任務時無法自定義type名稱,如果想要自定義用戶可以使用SDK來創建。
ES7中所有的type均采用默認type,所以 ES7不需要選擇type屬性列。
5. ID屬性列
用戶可以根據寫入DataHub的數據來生成寫入ES的數據id,也可以不選擇任何列,由ES將會為每條數據生成一個唯一的id。DataHub同步ES時,將會以用戶選擇的列的值作為一條數據的id,如果選擇多列,則多列的值會以 “|” 分割作為一條數據的id。選擇作為ID屬性列的字段不能為null。
例如:
DataHub Schema : f1 string, f2 string, f3 string, f4 string
數據 : ["test1","test2","test3",null]
ID屬性列 | 數據id |
ES自動生成唯一ID | |
f1 | test1 |
f1,f3 | test1|test3 |
ff | 創建失敗 |
f4 | 創建成功,但是同步失敗,臟數據 |
6. Router屬性列
用戶可以根據寫入DataHub的數據來生成寫入ES的router,也可以不選擇任何列,不選擇時將不使用ES的Router功能。DataHub同步ES時,將會以用戶選擇的列的值作為一條數據的router,如果選擇多列,則多列的值會以 “|” 分割作為一條數據的id。選擇作為Router屬性列的字段不能為null。
示例可參考ID屬性列
。
7. 導入字段
DataHub需要導入到ES的字段,對于未選擇的字段,DataHub不會同步到ES中。對于作為ID的字段和ES5中作為Type的字段,不會再放到數據中。因為導入字段并不能完全決定最后生成的數據,因此不再給出示例,下文會給出完整的數據同步示例。
8. 網絡類型
需要根據ES示例的類型選擇,因為目前公有云上的ES實例均為VPC實例,所以DataHub公有云同步ES任務只支持VPC網絡類型。使用VPC網絡類型時,需要填寫VPC ID和實例ID等信息,查看方式如下圖。
注意:填入實例ID時需要注意加上-worker
,例如實例ID為es-cn-xxx
,則實例ID填寫es-cn-xxx-worker
。
寫入數據示例
這里的示例是指創建同步ES任務成功之后,如果創建同步ES任務失敗,則參考上述的配置進行修改。
DataHub Schema為:
字段名稱 | 字段類型 |
f1 | BIGINT |
f2 | STRING |
f3 | BOOLEAN |
f4 | DOUBLE |
f5 | TIMESTAMP |
f6 | DECIMAL |
示例1:
Type屬性列 = f1(ES7無Type屬性列)
ID屬性列 = f2
導入字段 = f1,f2,f3,f4,f5,f6
數據 = v1,v2,v3,v4,v5,v6
ES版本 | type | id | data |
ES5 | v1 | v2 |
|
ES6 | f1 | v2 |
|
ES7 | - | v2 |
|
數據 = null,v2,v3,v4,v5,v6
ES版本 | type | id | data |
ES5 | - | - | type屬性列為null,臟數據 |
ES6 | f1 | v2 |
|
ES7 | - | v2 |
|
數據 = v1,null,v3,v4,v5,v6
ES版本 | type | id | data |
ES5 | - | - | id屬性列為null,臟數據 |
ES6 | - | - | id屬性列為null,臟數據 |
ES7 | - | - | id屬性列為null,臟數據 |
示例2:
Type屬性列 = f1,f2(ES7無Type屬性列)
ID屬性列 = f3,f4
導入字段 = f1,f2,f3,f4,f5,f6
數據 = v1,v2,v3,v4,v5,v6
ES版本 | type | id | data |
ES5 | v1|v2 | v3|v4 |
|
ES6 | f1|f2 | v3|v4 | {f1:v1,f2:v2,f5:v5,f6:v6} |
ES7 | - | v3|v4 | {f1:v1,f2:v2,f5:v5,f6:v6} |
數據 = v1,null,v3,v4,v5,v6
ES版本 | type | id | data |
ES5 | - | - | type屬性列為null,臟數據 |
ES6 | f1|f2 | v3|v4 | {f1:v1,f2:v2,f5:v5,f6:v6} |
ES7 | - | v3|v4 | {f1:v1,f2:v2,f5:v5,f6:v6} |
數據 = v1,v2,null,v4,v5,v6
ES版本 | type | id | data |
ES5 | - | - | id屬性列為null,臟數據 |
ES6 | - | - | id屬性列為null,臟數據 |
ES7 | - | - | id屬性列為null,臟數據 |
示例3:
Type屬性列 = f1(ES7無Type屬性列)
ID屬性列 = f2
Router屬性列 = f3
導入字段 = f1,f2,f3,f4,f5,f6
數據 = v1,v2,v3,v4,v5,v6
ES版本 | type | id | router | data |
ES5 | v1 | v2 | v3 |
|
ES6 | f1 | v2 | v3 |
|
ES7 | - | v2 | v3 |
|
數據 = null,v2,v3,v4,v5,v6
ES版本 | type | id | data |
ES5 | - | - | type屬性列為null,臟數據 |
ES6 | f1 | v2 |
|
ES7 | - | v2 |
|
數據 = v1,null,v3,v4,v5,v6
ES版本 | type | id | data |
ES5 | - | - | id屬性列為null,臟數據 |
ES6 | - | - | id屬性列為null,臟數據 |
ES7 | - | - | id屬性列為null,臟數據 |
數據 = v1,v2,null,v4,v5,v6
ES版本 | type | id | data |
ES5 | - | - | router屬性列為null,臟數據 |
ES6 | - | - | router屬性列為null,臟數據 |
ES7 | - | - | router屬性列為null,臟數據 |
查看同步任務
可以點擊對應connector的詳情??查看同步任務的運?狀態和點位等信息, 包含同步點位、同步狀態以及重啟和停?等操作,如下圖所示:
備注:需要先停止任務才可以進行充值點位操作
同步示例
本示例以阿里云ES6.7為例,展示DataHub同步到ES的完整操作。其中ES相關的操作均使用Kibana Dev Tools
執行,其他操作方法請參考ES官方文檔。
1. 創建ES index
一般情況下可以ES默認可以自動創建index,因此可以忽略這一步。如果設置了不允許自動創建,那需要手動創建一下index,具體創建命令可參考ES官方文檔。
2. 建立DataHub Topic
備注:目前僅支持Tuple類型的Topic建立同步ES任務。
創建DataHub的過程可以參考創建Topic。
創建完成之后查看創建的Topic。
3. 建立同步ES任務
這里創建同步ES任務時,將f1和f2作為Type屬性列,將f3和f4作為ID屬性列,并將所有字段作為導入字段。
4. 向DataHub Topic寫?數據
可以使?DataHub-SDK或者DataHub插件進?數據寫?。
寫入一條數據之后,在頁面上進行抽樣,查看一下寫入的數據。
5. 確認同步數據
首先查看一下ES同步任務的點位。
這里可以看到同步任務的點位和同步時間發生了變化,同步時間就是剛剛數據寫入DataHub的時間,同步點位變成了1(點位從0開始計算,點位為0表示第一條數據已經寫入)。
然后在ES查看一下數據的同步情況,通過Kibana可以看到數據已經同步成功。