本文中含有需要您注意的重要提示信息,忽略該信息可能對您的業務造成影響,請務必仔細閱讀。
當您的業務數據存儲在PolarDB-X(原DRDS升級版)中,需要進行全文檢索和語義分析時,可借助阿里云Elasticsearch實現。本文介紹如何通過阿里云Logstash,將PolarDB-X中的數據實時同步至阿里云Elasticsearch。
背景信息
阿里云Logstash是一款強大的數據收集和處理工具,提供了數據采集、轉換、優化和輸出的能力。通過Logstash的logstash-input-jdbc插件(默認已安裝,不可卸載),可批量查詢PolarDB-X中的數據并同步到Elasticsearch中。同時,logstash-input-jdbc插件會定期對PolarDB-X中的數據進行輪詢查詢,并將自上次輪詢以來插入或更改的記錄同步到Elasticsearch。本方案適用于同步全量數據并接受秒級延遲的場景、批量查詢數據然后進行同步的場景。
前提條件
已創建PolarDB-X 1.0實例及數據庫、阿里云Elasticsearch實例、Logstash實例。建議您在同一專有網絡下創建相關實例。
創建PolarDB-X 1.0實例及數據庫。具體操作,請參見PolarDB-X 1.0快速入門。
創建阿里云Elasticsearch實例。具體操作請參見創建阿里云Elasticsearch實例。本文創建的實例的版本為通用商業版6.7。
創建阿里云Logstash實例。具體操作參見創建阿里云Logstash實例。
說明您也可以使用公網環境的服務,前提是需要通過配置NAT網關實現與公網的連通。詳細信息,請參見配置NAT公網數據傳輸。
使用限制
Elasticsearch中的_id字段必須與數據庫中的id字段相同。
該條件可以確保當您將數據庫中的記錄寫入Elasticsearch時,同步任務可在數據庫記錄與Elasticsearch文檔之間建立一個直接映射的關系。例如當您在數據庫中更新了某條記錄時,同步任務會覆蓋Elasticsearch中與更新記錄具有相同ID的文檔。
說明根據Elasticsearch內部原理,更新操作的本質是刪除舊文檔然后對新文檔進行索引,因此在Elasticsearch中覆蓋文檔的效率與更新操作的效率一樣高。
當您在數據庫中插入或者更新數據時,對應記錄必須有一個包含更新或插入時間的字段。
Logstash每次對數據庫進行輪詢時,都會保存其從數據庫中所讀取的最后一條記錄的更新或插入時間。在讀取數據時,Logstash僅讀取符合條件的記錄,即該記錄的更新或插入時間需要晚于上一次輪詢中最后一條記錄的更新或插入時間。
重要logstash-input-jdbc插件無法實現同步刪除。如果您要刪除Elasticsearch中的數據,需要在Elasticsearch中執行相關命令,手動刪除。
操作步驟
步驟一:環境準備
在PolarDB-X 1.0實例中創建表,并準備測試數據。
本文使用的建表語句如下。
CREATE table food( id int PRIMARY key AUTO_INCREMENT, name VARCHAR (32), insert_time DATETIME, update_time DATETIME );
插入測試數據語句如下。
INSERT INTO food values(null,'巧克力',now(),now()); INSERT INTO food values(null,'酸奶',now(),now()); INSERT INTO food values(null,'火腿腸',now(),now());
在阿里云Elasticsearch實例中開啟自動創建索引功能。具體操作,請參見快速訪問與配置。
在阿里云Logstash實例中上傳與PolarDB-X數據庫版本兼容的SQL JDBC驅動。具體操作,請參見配置擴展文件。本文使用mysql-connector-java-5.1.35.jar驅動。
說明本文使用MySQL JDBC驅動連接PolarDB-X數據庫。您也可以使用PolarDB JDBC驅動,但對于一些高版本PolarDB-X數據庫,使用PolarDB JDBC驅動會有問題,建議您使用MySQL JDBC驅動。
在PolarDB-X數據庫白名單中加入阿里云Logstash節點的IP地址(可在基本信息頁面獲取)。具體操作,請參見設置白名單。
步驟二:配置Logstash管道
- 進入阿里云Elasticsearch控制臺的Logstash頁面。
- 進入目標實例。
- 在頂部菜單欄處,選擇地域。
- 在Logstash實例中單擊目標實例ID。
- 單擊左側導航欄的管道管理。
單擊創建管道。
在創建管道任務頁面,輸入管道ID,并進行Config配置。
本文使用的Config配置如下。
input { jdbc { jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_driver_library => "/ssd/1/share/<Logstash實例ID>/logstash/current/config/custom/mysql-connector-java-5.1.35.jar" jdbc_connection_string => "jdbc:mysql://drdshbga51x6****.drds.aliyuncs.com:3306/<數據庫名稱>?useUnicode=true&characterEncoding=utf-8&useSSL=false&allowLoadLocalInfile=false&autoDeserialize=false" jdbc_user => "db_user" jdbc_password => "db_password" jdbc_paging_enabled => "true" jdbc_page_size => "50000" statement => "select * from food where update_time >= :sql_last_value" schedule => "* * * * *" record_last_run => true last_run_metadata_path => "/ssd/1/<Logstash實例ID>/logstash/data/last_run_metadata_update_time.txt" clean_run => false tracking_column_type => "timestamp" use_column_value => true tracking_column => "update_time" } } filter { } output { elasticsearch { hosts => "http://es-cn-n6w1o1x0w001c****.elasticsearch.aliyuncs.com:9200" user => "elastic" password => "es_password" index => "drds_test" document_id => "%{id}" } }
說明代碼中
<Logstash實例ID>
需要替換為您創建的Logstash實例的ID。獲取方式,請參見實例列表概覽。表 1. input參數說明 參數
描述
jdbc_driver_class
JDBC Class配置。
jdbc_driver_library
指定JDBC連接MySQL驅動文件。具體操作請參見配置擴展文件。
jdbc_connection_string
配置數據庫連接的域名、端口及數據庫。
jdbc_user
數據庫用戶名。
jdbc_password
數據庫密碼。
jdbc_paging_enabled
是否啟用分頁,默認false。
jdbc_page_size
分頁大小。
statement
指定SQL語句。
schedule
指定定時操作,
"* * * * *"
表示每分鐘定時同步數據。record_last_run
是否記錄上次執行結果。如果為true,則會把上次執行到的tracking_column字段的值記錄下來,保存到last_run_metadata_path指定的文件中。
last_run_metadata_path
指定最后運行時間文件存放的地址。目前后端開放了/ssd/1/<Logstash實例ID>/logstash/data/路徑來保存文件。指定參數路徑后,Logstash會在對應路徑下自動生成文件,但不支持查看文件內容。
clean_run
是否清除last_run_metadata_path的記錄,默認為false。如果為true,那么每次都要從頭開始查詢所有的數據庫記錄。
use_column_value
是否需要記錄某個column的值。
tracking_column_type
跟蹤列的類型,默認是numeric。
tracking_column
指定跟蹤列,該列必須是遞增的,一般是表的主鍵。
表 2. output參數說明 參數
說明
hosts
阿里云Elasticsearch實例的訪問地址,格式為http://<實例的私網地址>:9200。其中實例的私網地址可在基本信息頁面獲取,詳細信息請參見查看實例的基本信息。
user
訪問阿里云Elasticsearch實例的用戶名,默認為elastic。
password
對應用戶的密碼。elastic用戶的密碼在創建實例時設定,如果忘記可重置。重置密碼的注意事項和操作,請參見重置實例訪問密碼。
index
索引名稱。
document_id
文檔ID。設置為%{id},表示與源數據庫中的ID字段保持一致。
重要以上配置按照測試數據配置,在實際業務中,請按照業務需求進行合理配置。input插件支持的其他配置選項,請參見官方Logstash Jdbc input plugin。
如果配置中有類似
last_run_metadata_path
的參數,需要阿里云Logstash服務提供文件路徑。目前后端開放了/ssd/1/<Logstash實例ID>/logstash/data/
路徑供您測試使用,且該目錄下的數據不會被刪除。因此在使用時,請確保磁盤有充足的使用空間。指定參數路徑后,Logstash會在對應路徑下自動生成文件,但不支持查看文件內容。為了提升安全性,如果在配置管道時使用了JDBC驅動,需要在
jdbc_connection_string
參數后面添加allowLoadLocalInfile=false&autoDeserialize=false
,否則當您在添加Logstash配置文件時,調度系統會拋出校驗失敗的提示,例如jdbc_connection_string => "jdbc:mysql://drdshbga51x6****.drds.aliyuncs.com:3306/<數據庫名稱>?allowLoadLocalInfile=false&autoDeserialize=false"
。
更多Config配置,請參見Logstash配置文件說明。
單擊下一步,配置管道參數。
參數
說明
管道工作線程
并行執行管道的Filter和Output的工作線程數量。當事件出現積壓或CPU未飽和時,請考慮增大線程數,更好地使用CPU處理能力。默認值:實例的CPU核數。
管道批大小
單個工作線程在嘗試執行Filter和Output前,可以從Input收集的最大事件數目。較大的管道批大小可能會帶來較大的內存開銷。您可以設置LS_HEAP_SIZE變量,來增大JVM堆大小,從而有效使用該值。默認值:125。
管道批延遲
創建管道事件批時,將過小的批分派給管道工作線程之前,要等候每個事件的時長,單位為毫秒。默認值:50ms。
隊列類型
用于事件緩沖的內部排隊模型。可選值:
MEMORY:默認值。基于內存的傳統隊列。
PERSISTED:基于磁盤的ACKed隊列(持久隊列)。
隊列最大字節數
請確保該值小于您的磁盤總容量。默認值:1024 MB。
隊列檢查點寫入數
啟用持久性隊列時,在強制執行檢查點之前已寫入事件的最大數目。設置為0,表示無限制。默認值:1024。
警告配置完成后,需要保存并部署才能生效。保存并部署操作會觸發實例重啟,請在不影響業務的前提下,繼續執行以下步驟。
單擊保存或者保存并部署。
保存:將管道信息保存在Logstash里并觸發實例變更,配置不會生效。保存后,系統會返回管道管理頁面。可在管道列表區域,單擊操作列下的立即部署,觸發實例重啟,使配置生效。
保存并部署:保存并且部署后,會觸發實例重啟,使配置生效。
步驟三:驗證結果
登錄目標阿里云Elasticsearch實例的Kibana控制臺。
具體操作,請參見登錄Kibana控制臺。
在左側導航欄,單擊Dev Tools(開發工具)。
在Console中,執行以下命令,查看同步成功的索引數量。
GET drds_test/_count { "query": {"match_all": {}} }
運行成功后,結果如下。
{ "count" : 3, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 } }
在表中更新并插入數據。
UPDATE food SET name='Chocolates',update_time=now() where id = 1; INSERT INTO food values(null,'雞蛋',now(),now());
在Kibana控制臺,查看更新后的數據。
查詢name為Chocolates的數據。
GET drds_test/_search { "query": { "match": { "name": "Chocolates" }} }
返回結果如下。
查詢所有數據。
GET drds_test/_search { "query": { "match_all": {} } }
返回結果如下。