本文中含有需要您注意的重要提示信息,忽略該信息可能對您的業務造成影響,請務必仔細閱讀。
如果您需要將RDS MySQL中的數據同步到阿里云Elasticsearch中,可使用阿里云Logstash的logstash-input-jdbc插件(默認已安裝,不可卸載),通過管道配置將全量或增量數據實時同步至阿里云Elasticsearch中。本文介紹具體的實現方法。
背景信息
阿里云Logstash是一款強大的數據收集和處理工具,提供了數據采集、轉換、優化和輸出的能力。通過Logstash的logstash-input-jdbc插件(默認已安裝,不可卸載),可批量查詢RDS MySQL中的數據并同步到Elasticsearch中。同時,logstash-input-jdbc插件會定期對RDS中的數據進行輪詢查詢,并將自上次輪詢以來插入或更改的記錄同步到Elasticsearch。更多詳細信息,請參見官方文檔中的如何使用Logstash和JDBC確保Elasticsearch與關系型數據庫保持同步。本方案適用于同步全量數據且接受秒級延遲的場景或批量查詢特定條件的數據然后進行同步的場景。
前提條件
建議您在同一專有網絡下創建以下實例:
您也可以使用公網環境的服務,前提是需要配置SNAT、打開RDS MySQL的公網地址并取消白名單限制。SNAT的具體配置方法,請參見配置NAT公網數據傳輸。設置白名單操作,請參見設置IP白名單。
創建RDS MySQL實例。具體操作請參見創建RDS MySQL實例。本文以MySQL 5.7版本為例。
創建阿里云Elasticsearch實例。具體操作請參見創建阿里云Elasticsearch實例。本文以7.10版本ES實例為例。
創建阿里云Logstash實例。具體操作參見創建阿里云Logstash實例。
使用限制
確保MySQL實例、Logstash實例、阿里云Elasticsearch實例在同一時區,否則當同步與時間相關的數據時,同步前后的數據可能存在時區差。
Elasticsearch中的_id字段必須與MySQL中的id字段相同。
該條件可以確保當您將MySQL中的記錄寫入Elasticsearch時,同步任務可在MySQL記錄與Elasticsearch文檔之間建立一個直接映射的關系。例如當您在MySQL中更新了某條記錄時,同步任務會覆蓋Elasticsearch中與更新記錄具有相同ID的文檔。
說明根據Elasticsearch內部原理,更新操作的本質是刪除舊文檔然后對新文檔進行索引,因此在Elasticsearch中覆蓋文檔的效率與更新操作的效率一樣高。
當您在MySQL中插入或者更新數據時,對應記錄必須有一個包含更新或插入時間的字段。
Logstash每次對MySQL進行輪詢時,都會保存其從MySQL所讀取的最后一條記錄的更新或插入時間。在讀取數據時,Logstash僅讀取符合條件的記錄,即該記錄的更新或插入時間需要晚于上一次輪詢中最后一條記錄的更新或插入時間。
重要logstash-input-jdbc插件無法實現同步刪除,需要在Elasticsearch中執行相關命令手動刪除。
操作步驟
步驟一:環境準備
在阿里云Elasticsearch實例中開啟自動創建索引功能。具體操作,請參見快速訪問與配置。
在Logstash實例中上傳與RDS MySQL版本兼容的SQL JDBC驅動(本文使用mysql-connector-java-5.1.48.jar)。具體操作,請參見配置擴展文件。
準備測試數據,本文使用的建表語句如下。
CREATE table food ( id int PRIMARY key AUTO_INCREMENT, name VARCHAR (32), insert_time DATETIME, update_time DATETIME );
插入數據語句如下。
INSERT INTO food values(null,'巧克力',now(),now()); INSERT INTO food values(null,'酸奶',now(),now()); INSERT INTO food values(null,'火腿腸',now(),now());
在RDS MySQL的白名單中加入阿里云Logstash節點的IP地址(可在Logstash實例的基本信息頁面獲取)。
步驟二:配置Logstash管道
- 進入阿里云Elasticsearch控制臺的Logstash頁面。
- 進入目標實例。
- 在頂部菜單欄處,選擇地域。
- 在Logstash實例中單擊目標實例ID。
在左側導航欄,單擊管道管理。
單擊創建管道。
在創建管道任務頁面,輸入管道ID,并進行Config配置。
本文使用的Config配置如下。
input { jdbc { jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_driver_library => "/ssd/1/share/<Logstash實例ID>/logstash/current/config/custom/mysql-connector-java-5.1.48.jar" jdbc_connection_string => "jdbc:mysql://rm-bp1xxxxx.mysql.rds.aliyuncs.com:3306/<數據庫名稱>?useUnicode=true&characterEncoding=utf-8&useSSL=false&allowLoadLocalInfile=false&autoDeserialize=false" jdbc_user => "xxxxx" jdbc_password => "xxxx" jdbc_paging_enabled => "true" jdbc_page_size => "50000" statement => "select * from food where update_time >= :sql_last_value" schedule => "* * * * *" record_last_run => true last_run_metadata_path => "/ssd/1/<Logstash實例ID>/logstash/data/last_run_metadata_update_time.txt" clean_run => false tracking_column_type => "timestamp" use_column_value => true tracking_column => "update_time" } } filter { } output { elasticsearch { hosts => "http://es-cn-0h****dd0hcbnl.elasticsearch.aliyuncs.com:9200" index => "rds_es_dxhtest_datetime" user => "elastic" password => "xxxxxxx" document_id => "%{id}" } }
說明代碼中
<Logstash實例ID>
需要替換為您創建的Logstash實例的ID。獲取方式,請參見查看實例的基本信息。表 1. Config配置說明
配置
說明
input
指定輸入數據源。支持的數據源類型,請參見Input plugins。本文使用JDBC數據源,具體參數說明請參見input參數說明。
filter
指定對輸入數據進行過濾的插件。支持的插件類型,請參見Filter plugins。
output
指定目標數據源類型。支持的數據源類型,請參見Output plugins。本文需要將MySQL中的數據同步至Elasticsearch中,因此output中需要指定目標Elasticsearch的信息。具體參數說明,請參見步驟三:創建并運行管道任務。
重要如果output中使用了file_extend參數,需要先安裝logstash-output-file_extend插件。具體操作,請參見安裝或卸載插件。
表 2. input參數說明
參數
描述
jdbc_driver_class
JDBC Class配置。
jdbc_driver_library
指定JDBC連接MySQL驅動文件,格式為/ssd/1/share/<Logstash實例ID>/logstash/current/config/custom/<驅動文件名稱>。您需要提前在控制臺中上傳驅動文件,阿里云Logstash支持的驅動文件及其上傳方法,請參見配置擴展文件。
jdbc_connection_string
配置數據庫連接的域名、端口及數據庫,格式為
jdbc:mysql://<MySQL的連接地址>:<端口>/<數據庫名稱>?useUnicode=true&characterEncoding=utf-8&useSSL=false&allowLoadLocalInfile=false&autoDeserialize=false
。<MySQL的連接地址>:配置MySQL的內網地址。
<端口>:需要與MySQL的出方向端口保持一致,一般為3306。
說明如果使用外網地址,需要為Logstash配置NAT網關,將
jdbc:mysql://<MySQL的連接地址>:<端口>
配置為公網域名,實現公網數據傳輸。具體操作,請參見配置NAT公網數據傳輸。jdbc_user
數據庫用戶名。
jdbc_password
數據庫密碼。
jdbc_paging_enabled
是否啟用分頁,默認false。
jdbc_page_size
分頁大小。
statement
指定SQL語句,多表查詢可使用join語句。
說明sql_last_value用于計算要查詢哪一行,在運行任何查詢之前,此值設置為1970年1月1日星期四。詳細信息,請參見Jdbc input plugin。
schedule
指定定時操作,"* * * * *"表示每分鐘定時同步數據。該參數使用的是Rufus版的Cron表達式。
record_last_run
是否記錄上次執行結果。如果為true,則會把上次執行到的tracking_column字段的值記錄下來,保存到last_run_metadata_path指定的文件中。
last_run_metadata_path
指定最后運行時間文件存放的地址。目前后端開放了
/ssd/1/<Logstash實例ID>/logstash/data/
路徑來保存文件。指定參數路徑后,Logstash會在對應路徑下自動生成文件,但不支持查看文件內容。說明配置Logstash管道時,建議按照
/ssd/1/<Logstash實例ID>/logstash/data/
路徑配置此參數。如果不按照該路徑配置,會導致同步的條件記錄因為權限不足而無法存放在last_run_metadata_path路徑下的配置文件中。clean_run
是否清除last_run_metadata_path的記錄,默認為false。如果為true,那么每次都要從頭開始查詢所有的數據庫記錄。
use_column_value
是否需要記錄某個column的值。當該值設置成true時,系統會記錄tracking_column參數所指定的列的最新的值,并在下一次管道執行時通過該列的值來判斷需要更新的記錄。
tracking_column_type
跟蹤列的類型,默認是numeric。
tracking_column
指定跟蹤列,該列必須是遞增的,一般是MySQL主鍵。
重要以上配置按照測試數據配置,在實際業務中,請按照業務需求進行合理配置。input插件支持的其他配置選項,請參見官方Logstash Jdbc input plugin文檔。
如果配置中有類似last_run_metadata_path的參數,那么需要阿里云Logstash服務提供文件路徑。目前后端開放了
/ssd/1/<Logstash實例ID>/logstash/data/
路徑供您測試使用,且該目錄下的數據不會被刪除。因此在使用時,請確保磁盤有充足的使用空間。指定參數路徑后,Logstash會在對應路徑下自動生成文件,但不支持查看文件內容。為了提升安全性,如果在配置管道時使用了JDBC驅動,需要在jdbc_connection_string參數后面添加allowLoadLocalInfile=false&autoDeserialize=false,否則當您在添加Logstash配置文件時,調度系統會拋出校驗失敗的提示,例如
jdbc_connection_string => "jdbc:mysql://xxx.drds.aliyuncs.com:3306/<數據庫名稱>?allowLoadLocalInfile=false&autoDeserialize=false"
。
更多Config配置,請參見Logstash配置文件說明。
單擊下一步,配置管道參數。
參數
說明
管道工作線程
并行執行管道的Filter和Output的工作線程數量。當事件出現積壓或CPU未飽和時,請考慮增大線程數,更好地使用CPU處理能力。默認值:實例的CPU核數。
管道批大小
單個工作線程在嘗試執行Filter和Output前,可以從Input收集的最大事件數目。較大的管道批大小可能會帶來較大的內存開銷。您可以設置LS_HEAP_SIZE變量,來增大JVM堆大小,從而有效使用該值。默認值:125。
管道批延遲
創建管道事件批時,將過小的批分派給管道工作線程之前,要等候每個事件的時長,單位為毫秒。默認值:50ms。
隊列類型
用于事件緩沖的內部排隊模型。可選值:
MEMORY:默認值。基于內存的傳統隊列。
PERSISTED:基于磁盤的ACKed隊列(持久隊列)。
隊列最大字節數
請確保該值小于您的磁盤總容量。默認值:1024 MB。
隊列檢查點寫入數
啟用持久性隊列時,在強制執行檢查點之前已寫入事件的最大數目。設置為0,表示無限制。默認值:1024。
警告配置完成后,需要保存并部署才能生效。保存并部署操作會觸發實例重啟,請在不影響業務的前提下,繼續執行以下步驟。
單擊保存或者保存并部署。
保存:將管道信息保存在Logstash里并觸發實例變更,配置不會生效。保存后,系統會返回管道管理頁面。可在管道列表區域,單擊操作列下的立即部署,觸發實例重啟,使配置生效。
保存并部署:保存并且部署后,會觸發實例重啟,使配置生效。
步驟三:驗證結果
登錄阿里云Elasticsearch實例的Kibana控制臺。
具體操作,請參見登錄Kibana控制臺。
在Kibana頁面的左上角,選擇
在Console中,執行如下命令,查看同步成功的索引數量。
GET rds_es_dxhtest_datetime/_count { "query": {"match_all": {}} }
預期結果如下。
{ "count" : 3, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 } }
更新MySQL表數據并插入表數據。
UPDATE food SET name='Chocolates',update_time=now() where id = 1; INSERT INTO food values(null,'雞蛋',now(),now());
在Kibana控制臺,查看更新后的數據。
查詢name為Chocolates的數據。
GET rds_es_dxhtest_datetime/_search { "query": { "match": { "name": "Chocolates" }} }
預期結果如下。
查詢所有數據。
GET rds_es_dxhtest_datetime/_search { "query": { "match_all": {} } }
預期結果如下。
常見問題
Q:同步任務失敗(例如管道一直在生效中、前后數據不一致、數據庫連接不成功),如何解決?
A:查看Logstash實例的主日志是否有報錯,根據報錯判斷原因,具體操作請參見查詢日志。常見的原因及解決方法如下。
說明執行以下操作時,如果集群正在變更中,可參見查看實例任務進度詳情先中斷變更,操作完成后再觸發重啟恢復。
原因
解決方法
MySQL白名單中沒有加入Logstash的IP地址。
參見通過客戶端、命令行連接RDS MySQL實例,在MySQL白名單中加入Logstash節點的IP地址。
說明獲取Logstash的IP地址的具體操作,請參見查看實例的基本信息。
Logstash的IP地址沒有添加到對應ECS服務器的安全組中(ECS自建MySQL)。
參見添加安全組規則,在ECS安全組中添加Logstash的IP地址和端口號。
說明獲取Logstash的IP地址和端口號的具體操作,請參見查看實例的基本信息。
Logstash和Elasticsearch不在同一VPC下。
選擇以下任意一種方式處理:
參見創建阿里云Elasticsearch實例,重新購買同一VPC下的Elasticsearch實例。購買后,修改現有管道配置。
參見配置NAT公網數據傳輸,配置NAT網關實現公網數據傳輸。
MySQL地址不正確,端口不是3306。
參見查看和管理實例連接地址和端口,獲取正確的地址和端口。使用正確的地址和端口,按照腳本格式替換管道配置中的jdbc_connection_string參數值。
重要<MySQL的連接地址>:需要配置MySQL的內網地址。如果使用外網地址,需要為Logstash配置NAT網關實現公網數據傳輸,具體操作請參見配置NAT公網數據傳輸。
Elasticsearch未開啟自動創建索引。
參見配置YML參數,開啟Elasticsearch實例的自動創建索引功能。
Elasticsearch或Logstash的負載太高。
參見升配集群,升級實例規格。
說明Elasticsearch負載情況可參見指標含義與異常處理建議,通過控制臺監控指標查看。Logstash負載情況可參見配置X-Pack監控,通過Kibana X-Pack監控查看。
沒有上傳JDBC連接MySQL的驅動文件。
參見配置擴展文件,下載并上傳驅動文件。
管道配置中包含了file_extend,但沒有安裝logstash-output-file_extend插件。
選擇以下任意一種方式處理:
參見安裝或卸載插件,安裝logstash-output-file_extend插件。
在管道配置中,去掉file_extend配置。
更多問題原因及解決方法,請參見Logstash數據寫入問題排查方案。
Q:管道input配置中,如何在一個管道中配置多個源端JDBC?
A:您可以在管道input配置中定義多個jdbc數據源,并在statement中指定對應表的查詢語句,實現一個管道中配置多個源端JDBC,參考示例如下。
input { jdbc { jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_driver_library => "/ssd/1/share/<Logstash實例ID>/logstash/current/config/custom/mysql-connector-java-5.1.48.jar" jdbc_connection_string => "jdbc:mysql://rm-bp1xxxxx.mysql.rds.aliyuncs.com:3306/<數據庫名稱>?useUnicode=true&characterEncoding=utf-8&useSSL=false&allowLoadLocalInfile=false&autoDeserialize=false" jdbc_user => "xxxxx" jdbc_password => "xxxx" jdbc_paging_enabled => "true" jdbc_page_size => "50000" statement => "select * from tableA where update_time >= :sql_last_value" schedule => "* * * * *" record_last_run => true last_run_metadata_path => "/ssd/1/<Logstash實例ID>/logstash/data/last_run_metadata_update_time.txt" clean_run => false tracking_column_type => "timestamp" use_column_value => true tracking_column => "update_time" type => "A" } jdbc { jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_driver_library => "/ssd/1/share/<Logstash實例ID>/logstash/current/config/custom/mysql-connector-java-5.1.48.jar" jdbc_connection_string => "jdbc:mysql://rm-bp1xxxxx.mysql.rds.aliyuncs.com:3306/<數據庫名稱>?useUnicode=true&characterEncoding=utf-8&useSSL=false&allowLoadLocalInfile=false&autoDeserialize=false" jdbc_user => "xxxxx" jdbc_password => "xxxx" jdbc_paging_enabled => "true" jdbc_page_size => "50000" statement => "select * from tableB where update_time >= :sql_last_value" schedule => "* * * * *" record_last_run => true last_run_metadata_path => "/ssd/1/<Logstash實例ID>/logstash/data/last_run_metadata_update_time.txt" clean_run => false tracking_column_type => "timestamp" use_column_value => true tracking_column => "update_time" type => "B" } } output { if[type] == "A" { elasticsearch { hosts => "http://es-cn-0h****dd0hcbnl.elasticsearch.aliyuncs.com:9200" index => "rds_es_dxhtest_datetime_A" user => "elastic" password => "xxxxxxx" document_id => "%{id}" } } if[type] == "B" { elasticsearch { hosts => "http://es-cn-0h****dd0hcbnl.elasticsearch.aliyuncs.com:9200" index => "rds_es_dxhtest_datetime_B" user => "elastic" password => "xxxxxxx" document_id => "%{id}" } } }
以上示例在jdbc中,新增了一個屬性type,用來在output中進行判斷,將不同表的數據同步至不同的索引中。