Tablestore Stream插件主要用于導出Tablestore增量數據,本文將為您介紹如何通過Tablestore Stream配置同步任務。
背景信息
Tablestore Stream插件與全量導出插件不同,增量導出插件僅支持多版本模式,且不支持指定列。增量數據可以看作操作日志,除數據本身外還附有操作信息。詳情請參見Tablestore Stream數據源。
Tablestore Stream配置同步任務時,請注意以下問題:
如果配置任務為日調度,您可以讀取當前時間24小時以內的數據,但會丟失當前時間前5分鐘的數據。建議您配置任務為小時調度。
設置的結束時間不能超過系統顯示的時間,即您設置的結束時間要比運行時間早5分鐘。
配置日調度會出現數據丟失的情況。
不可以配置周期調度和月調度。
開始時間和結束時間需要包含操作Table Store表的時間。例如,20171019162000您向Table Store插入2條數據,則開始時間設置為20171019161000,結束時間設置為20171019162600。
新增數據源
進入數據集成頁面。
登錄DataWorks控制臺,切換至目標地域后,單擊左側導航欄的 ,在下拉框中選擇對應工作空間后單擊進入數據集成。
單擊左側導航欄中的數據源,進入數據源列表。
單擊新增數據源。
在新增數據源對話框中,選擇數據源類型為Tablestore 。
填寫Tablestore數據源的各配置項。
參數
描述
數據源名稱
數據源名稱必須以字母、數字、下劃線組合,且不能以數字和下劃線開頭。
數據源描述
對數據源進行簡單描述,不得超過80個字符。
Endpoint
Table Store服務對應的Endpoint。
Table Store實例名稱
Table Store服務對應的實例名稱。
AccessKey ID
訪問密鑰中的AccessKey ID,您可以進入用戶信息管理頁面進行復制。
AccessKey Secret
訪問密鑰中的AccessKey Secret,相當于登錄密碼。
單擊測試連通性。
測試連通性通過后,單擊完成。
通過向導模式配置同步任務
進入數據開發頁面。
登錄DataWorks控制臺,切換至目標地域后,單擊左側導航欄的 ,在下拉框中選擇對應工作空間后單擊進入數據開發。
在目標業務流程中,右鍵單擊數據集成,選擇 。
在新建節點對話框中,輸入名稱并選擇路徑,單擊確認。
選擇離線同步任務的數據來源Table Stream和數據去向MaxCompute(ODPS),以及用于執行同步任務的資源組,并測試連通性。
配置數據來源與去向。
類別
參數
描述
數據來源
表
導出增量數據的表的名稱。該表需要開啟Stream,您可以在建表時開啟。
開始時間
增量數據的時間范圍(左閉右開)的左邊界,格式為yyyymmddhh24miss,單位為毫秒。
結束時間
增量數據的時間范圍(左閉右開)的右邊界,格式為yyyymmddhh24miss,單位為毫秒。
狀態表
用于記錄狀態的表的名稱。
最大重試次數
從TableStore中讀增量數據時,每次請求的最大重試次數,默認是30。
導出時序信息
是否導出時序信息,包括數據的寫入時間等信息。
數據去向
表
選擇需要寫入的表。
分區信息
此處需同步的表是非分區表,所以無分區信息。
清理規則
寫入前清理已有數據:導數據之前,清空表或者分區的所有數據,相當于
insert overwrite
。寫入前保留已有數據:導數據之前,不清理任何數據,每次運行數據都是追加進去的,相當于
insert into
。
空字符串作為null
默認值為否。
配置字段映射關系。
左側的源頭表字段和右側的目標表字段為一一對應的關系。單擊添加一行可以增加單個字段,鼠標放至需要刪除的字段上,即可單擊刪除圖標進行刪除。
配置通道。
單擊工具欄中的保存圖標。
單擊工具欄中的運行圖標,運行之前需要配置自定義參數。
通過腳本模式配置同步任務
如果您需要通過腳本模式配置此任務,單擊工具欄中的轉換腳本,選擇確認即可進入腳本模式。
您可以根據自身進行配置,示例腳本如下。
{
"type": "job",
"version": "1.0",
"configuration": {
"reader": {
"plugin": "Tablestore",
"parameter": {
"datasource": "Tablestore",//數據源名,需要與您添加的數據源名稱保持一致。
"dataTable": "person",//導出增量數據的表的名稱。該表需要開啟Stream,可以在建表時開啟。
"startTimeString": "${startTime}",//增量數據的時間范圍(左閉右開)的左邊界,格式為yyyymmddhh24miss,單位毫秒。
"endTimeString": "${endTime}",//運行時間。
"statusTable": "TableStoreStreamReaderStatusTable",//用于記錄狀態的表的名稱。
"maxRetries": 30,//請求的最大重試次數。
"isExportSequenceInfo": false,
}
},
"writer": {
"plugin": "odps",
"parameter": {
"datasource": "odps_source",//數據源名。
"table": "person",//目標表名。
"truncate": true,
"partition": "pt=${bdp.system.bizdate}",//分區信息。
"column": [//目標列名。
"id",
"colname",
"version",
"colvalue",
"optype",
"sequenceinfo"
]
}
},
"setting": {
"speed": {
"mbps": 7,//作業速率上限,此處1mbps = 1MB/s。
"concurrent": 7//并發數。
}
}
}
}
關于運行時間參數和結束時間參數,有以下兩種表現形式(配置任務時選擇其中一種):
"startTimeString": "${startTime}"
:增量數據的時間范圍(左閉右開)的左邊界,格式為yyyymmddhh24miss,單位為毫秒。"endTimeString": "${endTime}"
:增量數據的時間范圍(左閉右開)的右邊界,格式為yyyymmddhh24miss,單位為毫秒。"startTimestampMillis":""
:增量數據的時間范圍(左閉右開)的左邊界,單位為毫秒。Reader插件會從statusTable中找對應startTimestampMillis的位點,從該點開始讀取開始導出數據。
如果statusTable中找不到對應的位點,則從系統保留的增量數據的第1條開始讀取,并跳過寫入時間小于startTimestampMillis的數據。
"endTimestampMillis":" "
:增量數據的時間范圍(左閉右開)的右邊界,單位為毫秒。Reader插件startTimestampMillis位置開始導出數據后,當遇到第1條時間戳大于等于endTimestampMillis的數據時,結束導出數據,導出完成。
當讀取完當前全部的增量數據時,結束讀取,即使未達endTimestampMillis。
如果配置isExportSequenceInfo項為true,如“isExportSequenceInfo”: true
,則會導出時序信息,目標會多出1行,目標字段列則多1列。時序信息包含了數據的寫入時間等,默認該值為false,即不導出。