使用NimoShake將Amazon DynamoDB遷移至阿里云
NimoShake(又名DynamoShake)是阿里云研發(fā)的數(shù)據(jù)同步工具,您可以借助該工具將Amazon DynamoDB數(shù)據(jù)庫遷移至阿里云。
前提條件
已經(jīng)創(chuàng)建阿里云MongoDB實例,詳情請參見創(chuàng)建副本集實例或創(chuàng)建分片集群實例。
背景信息
本文檔主要介紹NimoShake工具及其使用方法。
NimoShake主要用于從DynamoDB進行遷移,目的端支持MongoDB和兼容DynamoDB協(xié)議的MongoDB實例。更多詳情請參見NimoShake介紹。
注意事項
在執(zhí)行全量數(shù)據(jù)遷移時將占用源庫和目標庫一定的資源,可能會導致數(shù)據(jù)庫服務器負載上升。如果數(shù)據(jù)庫業(yè)務量較大或服務器規(guī)格較低,可能會加重數(shù)據(jù)庫壓力,甚至導致數(shù)據(jù)庫服務不可用。建議您在執(zhí)行數(shù)據(jù)遷移前謹慎評估,在業(yè)務低峰期執(zhí)行數(shù)據(jù)遷移。
名詞解釋
斷點續(xù)傳:斷點續(xù)傳是指將一個任務分成多個部分進行傳輸,當遇到網(wǎng)絡故障或者其他原因造成的傳輸中斷,可以延續(xù)之前傳輸?shù)牟糠掷^續(xù)傳輸,而不用從頭開始。
說明全量同步不支持斷點續(xù)傳功能,增量同步支持斷點續(xù)傳,如果增量同步過程中連接斷開了,在一定時間內(nèi)恢復連接是可以繼續(xù)進行增量同步的。但在某些情況下,比如斷開的時間過久,或者之前位點的丟失,都會導致重新觸發(fā)全量同步。
位點:增量的斷點續(xù)傳是根據(jù)位點來實現(xiàn)的,默認的位點是寫入到目的端MongoDB中,庫名是dynamo-shake-checkpoint。每個表都會記錄一個checkpoint的表,同樣還會有一個status_table表記錄當前是全量同步還是增量同步。
NimoShake功能特性
NimoShake目前支持全量和增量分離的同步機制,即先同步全量數(shù)據(jù),再同步增量數(shù)據(jù)。
全量同步:包含數(shù)據(jù)同步和索引同步兩個部分,基本架構如下:
數(shù)據(jù)同步:NimoShake使用多個并發(fā)線程拉取源端數(shù)據(jù),如下圖所示。
線程名稱
說明
Fetcher
調(diào)用Amazon提供的協(xié)議轉(zhuǎn)換驅(qū)動批量抓取源表的數(shù)據(jù)并放入隊列中,直至抓取完源表的所有數(shù)據(jù)。
說明目前只提供一個Fetcher線程。
Parser
從隊列中讀取數(shù)據(jù),并解析成BSON結構。Parser解析完成后,將數(shù)據(jù)按條寫入Executor隊列。Parser線程可以啟動多個,默認為2個,您可以通過
FullDocumentParser
參數(shù)調(diào)整Parser的個數(shù)。Executor
從隊列中拉取數(shù)據(jù),并將數(shù)據(jù)進行聚合后寫入目的端MongoDB(聚合上限16MB,總條數(shù)1024)。Executor線程可以啟動多個,默認為4個,您可以通過
FullDocumentConcurrency
參數(shù)調(diào)整Executor的個數(shù)。索引同步:NimoShake會在完成數(shù)據(jù)同步之后寫入索引。索引分為自帶索引和用戶索引兩部分:
自帶索引:如您有分區(qū)鍵(Partition key)和排序鍵(Sort key),NimoShake將會創(chuàng)建一個聯(lián)合唯一索引寫入MongoDB,除此之外,NimoShake還會針對分區(qū)鍵創(chuàng)建一個哈希(Hash)索引同時寫入。如果您只有一個分區(qū)鍵,那么最終寫入到MongoDB的將會是一個哈希索引和一個唯一索引。
用戶索引:如果您有自建的索引,NimoShake將會根據(jù)主鍵(Primary key)創(chuàng)建一個哈希索引寫入MongoDB。
增量同步:增量同步只同步數(shù)據(jù),不同步增量同步過程中產(chǎn)生的索引。其基本架構如下:
線程名稱
說明
Fetcher
感知流(Stream)中分片(shard)的變化。
Manager
進行消息的通知,或者創(chuàng)建新的Dispatcher處理消息,一個shard對應一個Dispatcher。
Dispatcher
從源端拉取增量數(shù)據(jù)。如果是斷點續(xù)傳,則會從上一次的Checkpoint位點開始拉取,而不是從頭拉取。
Batcher
對Dispatcher線程拉取的增量數(shù)據(jù)進行數(shù)據(jù)解析和打包與整合。
Executor
將整合后的數(shù)據(jù)寫入到MongoDB,同時更新checkpoint位點。
將Amazon DynamoDB遷移至阿里云
本步驟以Ubuntu系統(tǒng)為例,介紹如何使用NimoShake將Amazon Dynamo數(shù)據(jù)庫遷移到阿里云數(shù)據(jù)庫。
在系統(tǒng)中執(zhí)行如下命令下載NimoShake包,等待下載完成。
wget https://github.com/alibaba/NimoShake/releases/download/release-v1.0.13-20220411/nimo-shake-v1.0.13.tar.gz
說明建議下載最新版本的NimoShake包,下載地址請參見NimoShake。
執(zhí)行如下命令解壓下載的NimoShake包。
tar zxvf nimo-shake-v1.0.13.tar.gz
解壓完成后,輸入
cd nimo
命令進入nimo文件夾。輸入
vi nimo-shake.conf
命令打開NimoShake的配置文件。配置NimoShake,各配置項說明如下:
參數(shù)
說明
示例值
id
遷移任務的ID,可自定義,用于輸出pid文件等信息,如本次任務的日志名稱、斷點續(xù)傳(checkpoint)位點信息存儲的數(shù)據(jù)庫名稱、同步到目的端的數(shù)據(jù)庫名稱。
id = nimo-shake
log.file
日志文件路徑,不配置將打印到stdout。
log.file = nimo-shake.log
log.level
日志的等級,取值:
none:不收集日志。
error:包含錯誤級別信息的日志。
warn:包含警告級別信息的日志。
info:反饋當前系統(tǒng)狀態(tài)的日志。
debug:包含調(diào)試信息的日志。
默認值:info。
log.level = info
log.buffer
是否啟用日志緩沖區(qū)。取值:
true:啟用。啟用后能保證性能,但退出時可能會丟失最后幾條日志。
false:不啟用。不啟用將會降低性能但保證退出時每條日志都被打印。
默認值:true。
log.buffer = true
system_profile
PPROF端口,作調(diào)試用,打印堆棧協(xié)程信息。
system_profile = 9330
http_profile
HTTP端口,開放該端口可通過外網(wǎng)查看NimoShake的當前狀態(tài)。
http_profile = 9340
sync_mode
同步的類型,取值如下:
all:執(zhí)行全量數(shù)據(jù)同步和增量數(shù)據(jù)同步。
full:僅執(zhí)行全量同步。
默認值:all。
sync_mode = all
source.access_key_id
DynamoDB端的AccessKey ID。
source.access_key_id = xxxxxxxxxxx
source.secret_access_key
DynamoDB端的AccessKey。
source.secret_access_key = xxxxxxxxxx
source.session_token
DynamoDB端的臨時密鑰,如沒有可以不配置。
source.session_token = xxxxxxxxxx
source.region
DynamoDB所屬的地域,如沒有可以不配置。
source.region = us-east-2
source.session.max_retries
會話失敗后的最大重試次數(shù)。
source.session.max_retries = 3
source.session.timeout
會話超時時間,0為不啟用。單位:毫秒。
source.session.timeout = 3000
filter.collection.white
數(shù)據(jù)同步的白名單,設置允許通過的表名。如
filter.collection.white = c1;c2
表示允許C1和C2表通過,剩下的表全部過濾。說明不能同時指定filter.collection.white和filter.collection.black參數(shù),否則表示全部表通過。
filter.collection.white = c1;c2
filter.collection.black
數(shù)據(jù)同步的黑名單,設置需要過濾的表名。如
filter.collection.black = c1;c2
表示過濾掉C1和C2表,剩下的表全部通過。說明不能同時指定filter.collection.white和filter.collection.black參數(shù),否則表示全部表通過。
filter.collection.black = c1;c2
qps.full
全量同步階段,限制
Scan
命令對表執(zhí)行的頻率,表示每秒鐘最多調(diào)用多少次Scan
。默認值:1000。qps.full = 1000
qps.full.batch_num
全量同步階段,每秒拉取多少條數(shù)據(jù)。默認值:128。
qps.full.batch_num = 128
qps.incr
增量同步階段,限制
GetRecords
命令對表執(zhí)行的頻率,表示每秒鐘最多調(diào)用多少次GetRecords
。默認值:1000。qps.incr = 1000
qps.incr.batch_num
增量同步階段,每秒拉取多少條數(shù)據(jù)。默認值:128。
qps.incr.batch_num = 128
target.type
目的端數(shù)據(jù)庫類型,取值:
mongodb:目的端為MongoDB數(shù)據(jù)庫。
aliyun_dynamo_proxy:目的端為兼容DynamoDB協(xié)議的MongoDB數(shù)據(jù)庫。
target.type = mongodb
target.mongodb.type
目的端MongoDB數(shù)據(jù)庫的類型,取值:
replica:副本集
sharding:分片集群
target.mongodb.type = sharding
target.address
目的端數(shù)據(jù)庫的連接地址,支持MongoDB的連接串地址和DynamoDB兼容連接地址。
獲取MongoDB的地址信息,請參見副本集實例連接說明或分片集群實例連接說明。
獲取DynamoDB兼容連接地址信息,請參見獲取DynamoDB協(xié)議兼容版實例的連接地址。
target.address = mongodb://username:password@s-*****-pub.mongodb.rds.aliyuncs.com:3717
target.db.exist
目的端重名表的處理方式,取值:
rename:對目的端已存在的重名表進行重命名,添加時間戳后綴,比如c1變?yōu)?span id="z68uejxpaoma" class="help-letter-space">c1.2019-07-01Z12:10:11。
警告此操作會修改目的端的表名稱,可能會對業(yè)務產(chǎn)生影響,請務必提前做好遷移的準備工作。
drop:刪除目的端的重名表。
如不配置則不作處理,此時如果目的端中有重名表會報錯退出,遷移終止。
target.db.exist = drop
full.concurrency
全量同步階段的表級別并發(fā)度,表示一次最多同步多少個表。默認值:4。
full.concurrency = 4
full.document.concurrency
全量同步階段參數(shù)。表內(nèi)document的并發(fā)度,表示使用多少個線程并發(fā)將一個表內(nèi)的內(nèi)容寫入目的端。默認值:4。
full.document.concurrency = 4
full.document.parser
全量同步階段參數(shù)。表內(nèi)解析線程個數(shù),表示使用多少個線程并發(fā)將Dynamo協(xié)議轉(zhuǎn)換到目的端對應協(xié)議。默認值:2。
full.document.parser = 2
full.enable_index.user
全量同步階段參數(shù)。是否同步用戶自建的索引。取值:
true:是
false:否
full.enable_index.user = true
full.executor.insert_on_dup_update
全量同步階段參數(shù)。在目的端碰到相同key的情況下,是否將
INSERT
操作改為UPDATE
。取值:true:是
false:否
full.executor.insert_on_dup_update = true
increase.executor.insert_on_dup_update
增量同步階段參數(shù)。在目的端碰到相同key的情況下,是否將
INSERT
操作改為UPDATE
。取值:true:是
false:否
increase.executor.insert_on_dup_update = true
increase.executor.upsert
增量同步階段參數(shù)。如果目的端不存在key的情況下,是否將
UPDATE
操作改為UPSERT
。取值:true:是
false:否
說明UPSERT
操作會判斷目標key是否存在,如果存在則執(zhí)行UPDATE
操作,如果不存在則執(zhí)行INSERT
操作。increase.executor.upsert = true
convert.type
增量同步階段參數(shù)。是否對Dynamo協(xié)議進行轉(zhuǎn)換。取值:
raw:不對Dynamo協(xié)議進行轉(zhuǎn)換,直接寫入。
change:對Dynamo協(xié)議進行轉(zhuǎn)換。如:將
{"hello":"1"}
轉(zhuǎn)換為{"hello": 1}
。
convert.type = change
increase.concurrency
增量同步階段參數(shù)。一次最多并發(fā)抓取多少個分片(shard)。默認值:16。
increase.concurrency = 16
checkpoint.type
斷點續(xù)傳(Checkpoint)位點信息的存儲類型。取值:
mongodb:斷點續(xù)傳(Checkpoint)位點信息存儲于MongoDB數(shù)據(jù)庫中,僅在
target.type
參數(shù)為mongodb
時可用。file:斷點續(xù)傳(Checkpoint)位點信息存儲于本地計算機中。
checkpoint.type = mongodb
checkpoint.address
存儲斷點續(xù)傳(Checkpoint)位點信息的地址。
checkpoint.type
參數(shù)為mongodb
:輸入MongoDB數(shù)據(jù)庫的連接地址。如不配置則默認存儲到目的端的MongoDB庫中。查看MongoDB的地址信息,請參見副本集實例連接說明或分片集群實例連接說明。checkpoint.type
參數(shù)為file
:輸入以nimo-shake運行文件所在路徑為基準的相對路徑,如:checkpoint。如不配置則默認存儲到checkpoint文件夾。
checkpoint.address = mongodb://username:password@s-*****-pub.mongodb.rds.aliyuncs.com:3717
checkpoint.db
存儲斷點續(xù)傳(Checkpoint)位點信息的MongoDB數(shù)據(jù)庫名,如不配置則數(shù)據(jù)庫名的格式默認為
<任務ID>-checkpoint
,示例:nimoshake-checkpoint。checkpoint.db = nimoshake-checkpoint
執(zhí)行如下命令使用配置好的nimo-shake.conf文件啟動遷移。
./nimo-shake.linux -conf=nimo-shake.conf
說明全量同步完成后,屏幕上會打印出
full sync done!
。如果中途出錯導致同步終止,程序會自動關閉并在屏幕上打印對應的錯誤信息,便于您定位錯誤原因。