當您的業務數據存儲在MongoDB中,并且需要進行語義分析和大圖展示時,可借助阿里云Elasticsearch實現全文搜索、語義分析、可視化展示等。本文介紹如何通過Monstache將MongoDB數據實時同步至阿里云Elasticsearch,并對數據進行分析及展示。
背景信息
本文以解析及統計熱門電影數據為例,提供的解決方案可以幫助您完成以下需求:
通過Monstache快速同步及訂閱全量或增量數據。
將MongoDB數據實時同步至高版本Elasticsearch。
解讀Monstache常用配置參數,應用于更多的業務場景。
方案優勢
MongoDB、阿里云Elasticsearch及Monstache服務部署在專有網絡VPC(Virtual Private Cloud)內,所有數據私網通信,高速且安全。
Monstache基于MongoDB的oplog實現實時數據同步及訂閱,支持MongoDB與高版本Elasticsearch之間的數據同步,同時支持MongoDB的變更流和聚合管道功能,并且擁有豐富的特性。
Monstache不僅支持軟刪除和硬刪除,還支持數據庫刪除和集合刪除,能夠確保Elasticsearch端實時與源端數據保持一致。
操作流程
準備同一專有網絡下的阿里云MongoDB實例、阿里云Elasticsearch實例和ECS實例。其中ECS實例用來安裝Monstache。
說明請準備版本兼容的Monstache工具、阿里云Elasticsearch和MongoDB實例,版本兼容性詳情請參見Monstache version。
在ECS實例中安裝Monstache,用來將MongoDB中的數據同步至阿里云Elasticsearch。安裝前需要先配置Go環境變量。
修改默認的Monstache配置文件,在配置文件中指定MongoDB和Elasticsearch的訪問地址、待同步的集合、Elasticsearch的用戶名和密碼等參數。配置完成后,運行Monstache服務,即可將MongoDB中的數據實時同步至阿里云Elasticsearch中。
分別在MongoDB數據庫中添加、更新、刪除數據,驗證數據是否實時同步。
在Kibana控制臺中,分析數據并使用Pie圖展示分析結果。
步驟一:環境準備
創建阿里云Elasticsearch實例,并開啟實例的自動創建索引功能。
具體操作步驟請參見創建阿里云Elasticsearch實例和配置YML參數。本文使用的實例版本為通用商業版6.7。
創建阿里云MongoDB實例,并準備測試數據。
具體操作步驟請參見MongoDB快速入門。本文以4.2版本的副本集MongoDB實例為例,部分數據如下。
重要MongoDB實例必須是副本集或分片集架構,不支持單節點架構。
創建ECS實例。
具體操作步驟請參見自定義購買實例。該ECS實例用來安裝Monstache,需要與阿里云Elasticsearch實例在同一專有網絡下,且選擇Linux操作系統。
步驟二:搭建Monstache環境
連接ECS實例。
具體操作請參見通過密碼或密鑰認證登錄Linux實例。
說明本文檔以普通用戶權限為例。
安裝Go,并配置環境變量。
說明由于Monstache數據同步依賴于Go語言,因此需要先在ECS中準備Go環境。
下載Go安裝包并解壓。
wget https://dl.google.com/go/go1.14.4.linux-amd64.tar.gz tar -xzf go1.14.4.linux-amd64.tar.gz
配置環境變量。
使用
vim ~/.bash_profile
命令打開環境變量配置文件,并將如下內容寫入該文件中。其中GOPROXY
用來指定阿里云Go模塊代理。export GOROOT=/home/test1/go export GOPATH=/home/go/ export PATH=$PATH:$GOROOT/bin:$GOPATH/bin export GOPROXY=https://mirrors.aliyun.com/goproxy/
應用環境變量配置。
source ~/.bash_profile
安裝Monstache。
從Git庫中下載安裝包。
git clone https://github.com/rwynn/monstache.git
說明如果出現
git: command not found
的錯誤提示,需要先執行sudo yum install -y git
命令安裝Git。進入monstache目錄。
cd monstache
切換版本。
本文以rel5版本為例。
git checkout rel5
安裝Monstache。
sudo go install
查看Monstache版本。
monstache -v
執行成功后,預期結果如下。
5.5.5
步驟三:配置實時同步任務
Monstache配置使用TOML格式,默認情況下,Monstache會使用默認端口連接本地主機上的Elasticsearch和MongoDB,并追蹤MongoDB oplog。在Monstache運行期間,MongoDB的任何更改都會同步到Elasticsearch中。
由于本文使用阿里云MongoDB和Elasticsearch,并且需要指定同步對象(mydb數據庫中的hotmovies和col集合),因此要修改默認的Monstache配置文件。修改方式如下:
進入Monstache安裝目錄,創建并編輯配置文件。
cd monstache vim config.toml
參考以下示例,修改配置文件。
簡單的配置示例如下,詳細配置請參見Monstache Usage。
# connection settings # connect to MongoDB using the following URL mongo-url = "mongodb://<your_mongodb_user>:<your_mongodb_password>@dds-bp1aadcc629******.mongodb.rds.aliyuncs.com:3717" # connect to the Elasticsearch REST API at the following node URLs elasticsearch-urls = ["http://es-cn-mp91kzb8m00******.elasticsearch.aliyuncs.com:9200"] # frequently required settings # if you need to seed an index from a collection and not just listen and sync changes events # you can copy entire collections or views from MongoDB to Elasticsearch direct-read-namespaces = ["mydb.hotmovies","mydb.col"] # if you want to use MongoDB change streams instead of legacy oplog tailing use change-stream-namespaces # change streams require at least MongoDB API 3.6+ # if you have MongoDB 4+ you can listen for changes to an entire database or entire deployment # in this case you usually don't need regexes in your config to filter collections unless you target the deployment. # to listen to an entire db use only the database name. For a deployment use an empty string. #change-stream-namespaces = ["mydb.col"] # additional settings # if you don't want to listen for changes to all collections in MongoDB but only a few # e.g. only listen for inserts, updates, deletes, and drops from mydb.mycollection # this setting does not initiate a copy, it is only a filter on the change event listener #namespace-regex = '^mydb\.col$' # compress requests to Elasticsearch #gzip = true # generate indexing statistics #stats = true # index statistics into Elasticsearch #index-stats = true # use the following PEM file for connections to MongoDB #mongo-pem-file = "/path/to/mongoCert.pem" # disable PEM validation #mongo-validate-pem-file = false # use the following user name for Elasticsearch basic auth elasticsearch-user = "elastic" # use the following password for Elasticsearch basic auth elasticsearch-password = "<your_es_password>" # use 4 go routines concurrently pushing documents to Elasticsearch elasticsearch-max-conns = 4 # use the following PEM file to connections to Elasticsearch #elasticsearch-pem-file = "/path/to/elasticCert.pem" # validate connections to Elasticsearch #elastic-validate-pem-file = true # propagate dropped collections in MongoDB as index deletes in Elasticsearch dropped-collections = true # propagate dropped databases in MongoDB as index deletes in Elasticsearch dropped-databases = true # do not start processing at the beginning of the MongoDB oplog # if you set the replay to true you may see version conflict messages # in the log if you had synced previously. This just means that you are replaying old docs which are already # in Elasticsearch with a newer version. Elasticsearch is preventing the old docs from overwriting new ones. #replay = false # resume processing from a timestamp saved in a previous run resume = true # do not validate that progress timestamps have been saved #resume-write-unsafe = false # override the name under which resume state is saved #resume-name = "default" # use a custom resume strategy (tokens) instead of the default strategy (timestamps) # tokens work with MongoDB API 3.6+ while timestamps work only with MongoDB API 4.0+ resume-strategy = 0 # exclude documents whose namespace matches the following pattern #namespace-exclude-regex = '^mydb\.ignorecollection$' # turn on indexing of GridFS file content #index-files = true # turn on search result highlighting of GridFS content #file-highlighting = true # index GridFS files inserted into the following collections #file-namespaces = ["users.fs.files"] # print detailed information including request traces verbose = true # enable clustering mode cluster-name = 'es-cn-mp91kzb8m00******' # do not exit after full-sync, rather continue tailing the oplog #exit-after-direct-reads = false [[mapping]] namespace = "mydb.hotmovies" index = "hotmovies" type = "movies" [[mapping]] namespace = "mydb.col" index = "mydbcol" type = "collection"
參數
說明
mongo-url
MongoDB實例的主節點訪問地址,可在實例的基本信息頁面獲取。其中
<your_mongodb_user>
為您使用的MongoDB實例的數據庫賬號,<your_mongodb_password>
為對應數據庫賬號的密碼。獲取前需配置MongoDB實例的白名單,即在白名單中添加安裝Monstache的ECS實例的內網IP地址,詳情請參見設置白名單。
elasticsearch-urls
阿里云Elasticsearch實例的訪問地址,格式為
http://<阿里云Elasticsearch實例的私網地址>:9200
。阿里云Elasticsearch實例的私網地址可在實例的基本信息頁面獲取,詳情請參見查看實例的基本信息。direct-read-namespaces
指定待同步的集合,詳情請參見direct-read-namespaces。本文同步的數據集為mydb數據庫下的hotmovies和col集合。
change-stream-namespaces
如果要使用MongoDB變更流功能,需要指定此參數。啟用此參數后,oplog追蹤會被設置為無效,詳情請參見change-stream-namespaces。
namespace-regex
通過正則表達式指定需要監聽的集合。此設置可以用來監控符合正則表達式的集合中數據的變化。
elasticsearch-user
訪問阿里云Elasticsearch實例的用戶名,默認為elastic。
重要實際業務中不建議使用elastic用戶,這樣會降低系統安全性。建議使用自建用戶,并給予自建用戶分配相應的角色和權限,詳情請參見通過Elasticsearch X-Pack角色管理實現用戶權限管控。
elasticsearch-password
對應用戶的密碼。elastic用戶的密碼在創建實例時指定,如果忘記可進行重置,重置密碼的注意事項和操作步驟請參見重置實例訪問密碼。
elasticsearch-max-conns
定義連接Elasticsearch的線程數。默認為4,即使用4個Go線程同時將數據同步到Elasticsearch。
dropped-collections
默認為true,表示當刪除MongoDB集合時,會同時刪除Elasticsearch中對應的索引。
dropped-databases
默認為true,表示當刪除MongoDB數據庫時,會同時刪除Elasticsearch中對應的索引。
resume
默認為false。設置為true,Monstache會將已成功同步到Elasticsearch的MongoDB操作的時間戳寫入monstache.monstache集合中。當Monstache因為意外停止時,可通過該時間戳恢復同步任務,避免數據丟失。如果指定了cluster-name,該參數將自動開啟,詳情請參見resume。
resume-strategy
指定恢復策略。僅當resume為true時生效,詳情請參見resume-strategy。
verbose
默認為false,表示不啟用調試日志。
cluster-name
指定集群名稱。指定后,Monstache將進入高可用模式,集群名稱相同的進程將進行協調,詳情請參見cluster-name。
mapping
指定Elasticsearch索引映射。默認情況下,數據從MongoDB同步到Elasticsearch時,索引會自動映射為
數據庫名.集合名
。如果需要修改索引名稱,可通過該參數設置,詳情請參見Index Mapping。說明Monstache支持豐富的參數配置,以上配置僅使用了部分參數完成數據實時同步,如果您有更復雜的同步需求,請參見Monstache config和Advanced進行配置。
運行Monstache。
monstache -f config.toml
說明通過-f參數,您可以顯式運行Monstache,系統會打印所有調試日志(包括對Elasticsearch的請求追蹤)。
步驟四:驗證數據同步結果
分別進入MongoDB的DMS控制臺和阿里云Elasticsearch實例的Kibana控制臺,查看同步前后對應文檔的數量。
說明登錄DMS控制臺的方法請參見通過DMS連接MongoDB副本集實例。
登錄Kibana控制臺的方法請參見登錄Kibana控制臺。
MongoDB
db.hotmovies.find().count()
預期結果如下。
[ 10000 ]
阿里云Elasticsearch
GET hotmovies/_count
預期結果如下。通過以下結果可以看到同步前后的文檔的數量都為10000條。
{ "count" : 10000, "_shards" : { "total" : 5, "successful" : 5, "skipped" : 0, "failed" : 0 } }
在MongoDB數據庫中插入數據,查看該數據是否同步到阿里云Elasticsearch實例中。
MongoDB
db.hotmovies.insert({id: 11003,title: "乘風破浪的程序媛",overview: "一群IT高智商女人,如何打破傳統逆序IT精英",original_language:"cn",release_date:"2020-06-17",popularity:67.654,vote_count:65487,vote_average:9.9}) db.hotmovies.insert({id: 11004,title: "英姿颯爽的程序猿",overview: "一群IT高智商man,如何打破傳統逆序IT精英",original_language:"cn",release_date:"2020-06-15",popularity:77.654,vote_count:85487,vote_average:11.9})
阿里云Elasticsearch
GET hotmovies/_search { "query": { "bool": { "should": [ {"term":{"id":"11003"}}, {"term":{"id":"11004"}} ] } } }
預期結果如下。
在MongoDB數據庫中更新數據,查看阿里云Elasticsearch實例中對應的數據是否會同步更新。
MongoDB
db.hotmovies.update({'title':'乘風破浪的程序媛'},{$set:{'title':'美女小姐姐'}})
阿里云Elasticsearch
GET hotmovies/_search { "query": { "match": { "id":"11003" } } }
預期結果如下。
在MongoDB數據庫中刪除數據,查看阿里云Elasticsearch實例中對應的數據是否會同步刪除。
MongoDB
db.hotmovies.remove({id: 11003}) db.hotmovies.remove({id: 11004})
阿里云Elasticsearch
GET hotmovies/_search { "query": { "bool": { "should": [ {"term":{"id":"11003"}}, {"term":{"id":"11004"}} ] } } }
預期結果如下。
步驟五:通過Kibana分析并展示數據
- 登錄目標阿里云Elasticsearch實例的Kibana控制臺,根據頁面提示進入Kibana主頁。登錄Kibana控制臺的具體操作,請參見登錄Kibana控制臺。說明 本文以阿里云Elasticsearch 6.7.0版本為例,其他版本操作可能略有差別,請以實際界面為準。
創建索引模式。
- 在左側導航欄,單擊Management。
- 在Kibana區域,單擊Index Patterns。
- 單擊Create index pattern。
- 輸入Index pattern名稱,單擊Next step。
從Time Filter field name中,選擇時間過濾器字段名(本文選擇I don't want to use the Time Filter)。
- 單擊Create index pattern。
配置Kibana大圖。
本文以配置最受歡迎的Top10電影的Pie圖為例,操作步驟如下:
在左側導航欄,單擊Visualize。
在搜索框右側,單擊+。
在New Visualization對話框中,單擊Pie。
單擊hotmovies索引模式。
按照下圖配置Metrics和Buckets。
單擊圖標,應用配置,查看數據展示結果。
常見問題
問題
阿里云Elasticsearch實例開啟高可用、高并發功能后,數據有丟失現象,如何排查?
解決方案
查看阿里云Elasticsearch集群的整體情況是否正常:
正常:需要排查Monstache服務的問題,詳細信息請參見Monstache官網。
不正常:參見常見問題,排查阿里云Elasticsearch集群的問題。同時降低并發數,觀察數據是否正常。