使用通道服務、DataWorks或者DataX將表格存儲數據表中的數據同步到另一個數據表。
前提條件
已創建目標數據表,目標數據表的列必須與源數據表中待遷移的列一一對應。具體操作,請參見創建數據表。
如果要實現跨賬號、跨地域數據遷移,請使用DataX工具通過互聯網或者通過云企業網連通VPC進行操作。關于使用云企業網的具體操作,請參見云企業網快速入門。
使用通道服務遷移同步
創建源數據表的通道后,使用SDK進行遷移同步。遷移過程中可以自定義業務處理邏輯對數據進行處理。
前提條件
已確定要使用的Endpoint。具體操作,請參見初始化OTSClient。
已配置密鑰。具體操作,請參見初始化OTSClient。
已將密鑰配置到環境變量中。具體操作,請參見初始化OTSClient。
表格存儲使用OTS_AK_ENV環境變量名表示阿里云賬號或者RAM用戶的AccessKey ID,使用OTS_SK_ENV環境變量名表示對應AccessKey Secret,請根據實際配置。
操作步驟
使用表格存儲控制臺或SDK創建源數據表的通道并記錄通道ID,具體操作請分別參見快速入門或通過SDK使用通道服務。
使用SDK遷移數據。
示例代碼如下:
public class TunnelTest { public static void main(String[] args){ String accessKeyId = System.getenv("OTS_AK_ENV"); String accessKeySecret = System.getenv("OTS_SK_ENV"); TunnelClient tunnelClient = new TunnelClient("endpoint", accessKeyId,accessKeySecret,"instanceName"); TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor()); //tunnelId可以在表格存儲控制臺通道管理頁面查看或者調用describeTunnelRequest查詢。 TunnelWorker worker = new TunnelWorker("tunnelId", tunnelClient, config); try { worker.connectAndWorking(); } catch (Exception e) { e.printStackTrace(); worker.shutdown(); tunnelClient.shutdown(); } } public static class SimpleProcessor implements IChannelProcessor{ //目標tablestore連接對象。 TunnelClient tunnelClient = new TunnelClient("endpoint", "accessKeyId","accessKeySecret","instanceName"); @Override public void process(ProcessRecordsInput processRecordsInput) { //ProcessRecordsInput中返回了增量或全量數據。 List<StreamRecord> list = processRecordsInput.getRecords(); for(StreamRecord streamRecord : list){ switch (streamRecord.getRecordType()){ case PUT: //自定義業務處理邏輯。 //putRow break; case UPDATE: //updateRow break; case DELETE: //deleteRow break; } System.out.println(streamRecord.toString()); } } @Override public void shutdown() { } } }
使用DataWorks或者DataX遷移同步
通過DataWorks或者DataX實現表格存儲數據的遷移同步,此處以DataWorks為例介紹遷移操作。
步驟一:新增表格存儲數據源
分別以源數據表和目標數據表所在實例新增表格存儲數據源。
進入數據集成頁面。
登錄DataWorks控制臺,切換至目標地域后,單擊左側導航欄的 ,在下拉框中選擇對應工作空間后單擊進入數據集成。
在左側導航欄,單擊數據源。
在數據源列表頁面,單擊新增數據源。
在新增數據源對話框,找到Tablestore區塊,單擊Tablestore。
在新增OTS數據源對話框,根據下表配置數據源參數。
參數
說明
數據源名稱
數據源名稱必須以字母、數字、下劃線(_)組合,且不能以數字和下劃線(_)開頭。
數據源描述
對數據源進行簡單描述,不得超過80個字符。
Endpoint
Tablestore實例的服務地址。更多信息,請參見服務地址。
如果Tablestore實例和目標數據源的資源在同一個地域,填寫VPC地址;如果Tablestore實例和目標數據源的資源不在同一個地域,填寫公網地址。
Table Store實例名稱
Tablestore實例的名稱。更多信息,請參見實例。
AccessKey ID
阿里云賬號或者RAM用戶的AccessKey ID和AccessKey Secret。獲取方式請參見創建AccessKey。
AccessKey Secret
測試資源組連通性。
創建數據源時,您需要測試資源組的連通性,以保證同步任務使用的資源組能夠與數據源連通,否則將無法正常執行數據同步任務。
重要數據同步時,一個任務只能使用一種資源組。資源組列表默認顯示僅數據集成公共資源組。為確保數據同步的穩定性和性能要求,推薦使用獨享數據集成資源組。
單擊前往購買進行全新創建并綁定資源組到工作空間或單擊綁定已購資源組為工作空間綁定已有資源組。具體操作,請參見新增和使用獨享數據集成資源組。
待資源組啟動成功后,單擊相應資源組連通狀態(生產環境)列的測試連通性。
當連通狀態顯示為可連通時,表示連通成功。
測試連通性通過后,單擊完成。
在數據源列表中,可以查看新建的數據源。
步驟二:新建同步任務節點
進入數據開發頁面。
登錄DataWorks控制臺,切換至目標地域后,單擊左側導航欄的 ,在下拉框中選擇對應工作空間后單擊進入數據開發。
在DataStudio控制臺的數據開發頁面,單擊業務流程節點下的目標業務流程。
如果需要新建業務流程,請參見創建業務流程。
在數據集成節點上右鍵選擇新建節點 > 離線同步。
在新建節點對話框,選擇路徑并填寫節點名稱。
單擊確認。
在數據集成節點下會顯示新建的離線同步節點。
步驟三:配置離線同步任務并啟動
在數據集成節點下,雙擊打開新建的離線同步任務節點。
配置同步網絡鏈接。
選擇離線同步任務的數據來源、數據去向以及用于執行同步任務的資源組,并測試連通性。
重要數據同步任務的執行必須經過資源組來實現,請選擇資源組并保證資源組與讀寫兩端的數據源能聯通訪問。
在網絡與資源配置步驟,選擇數據來源為Tablestore,并選擇數據源名稱為新增的源數據源。
選擇資源組。
選擇資源組后,系統會顯示資源組的地域、規格等信息以及自動測試資源組與所選數據源之間連通性。
重要請與新增數據源時選擇的資源組保持一致。
選擇數據去向為Tablestore,并選擇數據源名稱為新增的目標數據源。
系統會自動測試資源組與所選數據源之間連通性。
測試可連通后,單擊下一步。
在提示對話框,單擊確認使用腳本模式。
重要表格存儲僅支持腳本模式。當存在不支持向導模式的數據源時,如果繼續編輯任務,將強制使用腳本模式進行編輯。
任務轉為腳本模式后,將無法轉為向導模式。
配置任務并保存。
全量數據的同步需要使用到Tablestore Reader與Tablestore Writer插件。腳本配置規則請參見Tablestore數據源。
在配置任務步驟,編輯腳本。
配置Tablestore Reader
Tablestore Reader插件實現了從Tablestore讀取數據,通過您指定的抽取數據范圍,可以方便地實現數據增量抽取的需求。具體操作,請參見附錄一:Reader腳本Demo與參數說明。
配置Tablestore Writer
Tablestore Writer通過Tablestore官方Java SDK連接到Tablestore服務端,并通過SDK寫入Tablestore服務端 。Tablestore Writer本身對于寫入過程進行了諸多優化,包括寫入超時重試、異常寫入重試、批量提交等功能。具體操作,請參見附錄二:Writer腳本Demo與參數說明。
按【Ctrl+S】保存腳本。
說明執行后續操作時,如果未保存腳本,則系統會出現保存確認的提示,單擊確認即可。
執行同步任務。
說明全量數據一般只需要同步一次,無需配置調度屬性。
單擊圖標。
在參數對話框,選擇運行資源組的名稱。
單擊運行。
運行結束后,在同步任務的運行日志頁簽,單擊Detail log url對應的鏈接后。在任務的詳細運行日志頁面,查看
Current task status
對應的狀態。當
Current task status
的值為FINISH時,表示任務運行完成。