DataWorks數據集成支持使用Tablestore Stream Reader讀取Tablestore的增量數據,本文為您介紹DataWorks的Tablestore Stream數據讀取能力。
數據同步前準備:Tablestore Stream環境準備
使用Tablestore Stream插件前,您必須確保Tablestore表上已經開啟Stream功能。時序表已默認開啟stream功能。您可以在建表時指定開啟,也可以使用SDK的UpdateTable接口開啟。開啟Stream的方法,如下所示。
SyncClient client = new SyncClient("", "", "", "");
#方法1:建表的時候開啟:
CreateTableRequest createTableRequest = new CreateTableRequest(tableMeta);
createTableRequest.setStreamSpecification(new StreamSpecification(true, 24)); // 24代表增量數據保留24小時。
client.createTable(createTableRequest);
#方法2:如果建表時未開啟,您可以通過UpdateTable開啟:
UpdateTableRequest updateTableRequest = new UpdateTableRequest("tableName");
updateTableRequest.setStreamSpecification(new StreamSpecification(true, 24));
client.updateTable(updateTableRequest);
使用SDK的UpdateTable接口開啟時:
指定開啟Stream并設置過期時間,即開啟了Tablestore增量數據導出功能。開啟stream功能后,Tablestore服務端就會將您的操作日志額外保存起來,每個分區有一個有序的操作日志隊列,每條操作日志會在一定時間后被垃圾回收,該時間即為您指定的過期時間。
Tablestore的SDK提供了幾個Stream相關的API用于讀取這部分的操作日志,增量插件也是通過Tablestore SDK的接口獲取到增量數據。列模式下會將增量數據轉化為多個6元組的形式(pk、colName、version、colValue、opType和sequenceInfo),行模式則會以普通行的形式導出增量數據。
支持的同步模式與字段類型
Tablestore Stream Reader插件支持使用列模式或行模式同步Tablestore的增量數據。兩種模式下的同步過程和字段類型要求如下。
列模式
在Tablestore多版本模式下,表中的數據組織為行>列>版本
三級的模式, 一行可以有任意列,列名并不是固定的,每一列可以含有多個版本,每個版本都有一個特定的時間戳(版本號)。
您可以通過Tablestore的API進行一系列讀寫操作,Tablestore通過記錄您最近對表的一系列寫操作(或數據更改操作)來實現記錄增量數據的目的,所以您也可以把增量數據看作一批操作記錄。
Tablestore支持PutRow、UpdateRow和DeleteRow操作:
PutRow:寫入一行,如果該行已存在即覆蓋該行。
UpdateRow:更新一行,不更改原行的其它數據。更新包括新增或覆蓋(如果對應列的對應版本已存在)一些列值、刪除某一列的全部版本、刪除某一列的某個版本。
DeleteRow:刪除一行。
Tablestore會根據每種操作生成對應的增量數據記錄,Reader插件會讀出這些記錄,并導出為數據集成的數據格式。
同時,由于Tablestore具有動態列、多版本的特性,所以Reader插件導出的一行不對應Tablestore中的一行,而是對應Tablestore中的一列的一個版本。即Tablestore中的一行可能會導出很多行,每行包含主鍵值、該列的列名、該列下該版本的時間戳(版本號)、該版本的值、操作類型。如果設置isExportSequenceInfo為true,還會包括時序信息。
轉換為數據集成的數據格式后,定義了以下四種操作類型:
U(UPDATE):寫入一列的一個版本。
DO(DELETE_ONE_VERSION):刪除某一列的某個版本。
DA(DELETE_ALL_VERSION):刪除某一列的全部版本,此時需要根據主鍵和列名,刪除對應列的全部版本。
DR(DELETE_ROW):刪除某一行,此時需要根據主鍵,刪除該行數據。
假設該表有兩個主鍵列,主鍵列名分別為pkName1, pkName2,示例如下。
pkName1 | pkName2 | columnName | timestamp | columnValue | opType |
pk1_V1 | pk2_V1 | col_a | 1441803688001 | col_val1 | U |
pk1_V1 | pk2_V1 | col_a | 1441803688002 | col_val2 | U |
pk1_V1 | pk2_V1 | col_b | 1441803688003 | col_val3 | U |
pk1_V2 | pk2_V2 | col_a | 1441803688000 | — | DO |
pk1_V2 | pk2_V2 | col_b | — | — | DA |
pk1_V3 | pk2_V3 | — | — | — | DR |
pk1_V3 | pk2_V3 | col_a | 1441803688005 | col_val1 | U |
假設導出的數據如上,共7行,對應Tablestore表內的3行,主鍵分別是(pk1_V1,pk2_V1),(pk1_V2, pk2_V2),(pk1_V3, pk2_V3):
對于主鍵為(pk1_V1,pk2_V1)的一行,包括寫入col_a列的兩個版本和col_b列的一個版本等操作。
對于主鍵為(pk1_V2,pk2_V2)的一行,包括刪除col_a列的一個版本和刪除col_b列的全部版本等操作。
對于主鍵為(pk1_V3,pk2_V3)的一行,包括刪除整行和寫入col_a列的一個版本等操作。
行模式
寬行表
您可以通過行模式導出數據,該模式將用戶每次更新的記錄,抽取成行的形式導出,需要設置mode屬性并配置列名。
"parameter": { #parameter中配置下面三項配置(例如datasource、table等其它配置項照常配置)。 "mode": "single_version_and_update_only", # 配置導出模式。 "column":[ #按照需求添加需要導出TableStore中的列,您可以自定義設置配置個數。 { "name": "uid" #列名示例,可以是主鍵或屬性列。 }, { "name": "name" #列名示例,可以是主鍵或屬性列。 }, ], "isExportSequenceInfo": false, #single_version_and_update_only模式下只能是false。 }
時序表
時序表在創建時會自動開啟Stream,因此不需要手動開啟Stream功能。
Tablestore Stream Reader支持導出時序表中的增量數據,當表為時序表時,需要配置的信息如下:
"parameter": { #parameter中配置下面四項配置(例如datasource、table等其它配置項照常配置)。 "mode": "single_version_and_update_only", # 配置導出模式。 "isTimeseriesTable":"true", # 配置導出為時序表。 "column":[ #按照需求添加需要導出TableStore中的列,您可以自定義設置配置個數。 { "name": "_m_name" #度量名稱字段。 }, { "name": "_data_source" #數據源字段。 }, { "name": "_tags" #標簽字段,將tags轉換為string類型。 }, { "name": "tag1", #標簽內部字段鍵名稱。 "is_timeseries_tag":"true" #表明該字段為tags內部字段。 }, { "name": "time" #時間戳字段。 }, { "name": "name" #屬性列名稱。 }, ], "isExportSequenceInfo": false, #single_version_and_update_only模式下只能是false。 }
行模式導出的數據更接近于原始的行,易于后續處理,但需要注意以下問題:
每次導出的行是從用戶每次更新的記錄中抽取,每一行數據與用戶的寫入或更新操作一一對應。如果用戶存在單獨更新某些列的行為,則會出現有一些記錄只有被更新的部分列,其它列為空的情況。
行模式不會導出數據的版本號(即每列的時間戳),也無法進行刪除操作。
數據類型轉換列表
目前Tablestore Stream Reader支持所有的Tablestore類型,其針對Tablestore類型的轉換列表,如下所示。
類型分類 | Tablestore Stream數據類型 |
整數類 | INTEGER |
浮點類 | DOUBLE |
字符串類 | STRING |
布爾類 | BOOLEAN |
二進制類 | BINARY |
數據同步任務開發
數據同步任務的配置入口和通用配置流程可參見下文的配置指導。
操作流程請參見通過向導模式配置離線同步任務、通過腳本模式配置離線同步任務。
腳本模式配置的全量參數和腳本Demo請參見下文的附錄:腳本Demo與參數說明。
附錄:腳本Demo與參數說明
離線任務腳本配置方式
如果您配置離線任務時使用腳本模式的方式進行配置,您需要按照統一的腳本格式要求,在任務腳本中編寫相應的參數,詳情請參見通過腳本模式配置離線同步任務,以下為您介紹腳本模式下數據源的參數配置詳情。
Reader腳本Demo
列模式
{ "type":"job", "version":"2.0",//版本號。 "steps":[ { "stepType":"otsstream",//插件名。 "parameter":{ "datasource":"$srcDatasource",//數據源。 "dataTable":"",//表名。 "statusTable":"TableStoreStreamReaderStatusTable",//用于記錄狀態的表的名稱。 "maxRetries":30,//從 TableStore 中讀增量數據時,每次請求的最大重試次數,默認為30。 "isExportSequenceInfo":false,//是否導出時序信息。 "startTimeString":"${startTime}${hh}",//增量數據的時間范圍(左閉右開)的左邊界。參數配置中配置startTime=${yyyymmdd} hh=$[hh24miss],表示ots讀取開始時間為業務日期的定時時間 "endTimeString":"${endTime}${hh}"http://增量數據的時間范圍(左閉右開)的右邊界。參數配置中配置endTime=${yyyymmdd} hh=$[hh24miss],表示ots讀取結束時間為業務日期的定時時間 }, "name":"Reader", "category":"reader" }, { "stepType":"stream", "parameter":{}, "name":"Writer", "category":"writer" } ], "setting":{ "errorLimit":{ "record":"0"http://錯誤記錄數。 }, "speed":{ "throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。 "concurrent":1,//作業并發數。 "mbps":"12"http://限流,此處1mbps = 1MB/s。 } }, "order":{ "hops":[ { "from":"Reader", "to":"Writer" } ] } }
行模式讀取寬表
{ "type":"job", "version":"2.0",//版本號。 "steps":[ { "stepType":"otsstream",//插件名。 "parameter":{ "datasource":"$srcDatasource",//數據源。 "dataTable":"",//表名。 "statusTable":"TableStoreStreamReaderStatusTable",//用于記錄狀態的表的名稱。 "maxRetries":30,//從 TableStore 中讀增量數據時,每次請求的最大重試次數,默認為30。 "isExportSequenceInfo":false,//是否導出時序信息。 "startTimeString":"${startTime}${hh}",//增量數據的時間范圍(左閉右開)的左邊界。參數配置中配置startTime=${yyyymmdd} hh=$[hh24miss],表示ots讀取開始時間為業務日期的定時時間 "endTimeString":"${endTime}${hh}",//增量數據的時間范圍(左閉右開)的右邊界。參數配置中配置endTime=${yyyymmdd} hh=$[hh24miss],表示ots讀取結束時間為業務日期的定時時間 "mode": "single_version_and_update_only", "column":[ { "name":"pId" }, { "name": "uId" }, { "name":"col0" }, { "name": "col1" } ], }, "name":"Reader", "category":"reader" }, { "stepType":"stream", "parameter":{}, "name":"Writer", "category":"writer" } ], "setting":{ "errorLimit":{ "record":"0"http://錯誤記錄數。 }, "speed":{ "throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。 "concurrent":1,//作業并發數。 "mbps":"12"http://限流 } }, "order":{ "hops":[ { "from":"Reader", "to":"Writer" } ] } }
行模式讀取時序表
{ "type":"job", "version":"2.0",//版本號。 "steps":[ { "stepType":"otsstream",//插件名。 "parameter":{ "datasource":"$srcDatasource",//數據源。 "dataTable":"",//表名。 "statusTable":"TableStoreStreamReaderStatusTable",//用于記錄狀態的表的名稱。 "maxRetries":30,//從 TableStore 中讀增量數據時,每次請求的最大重試次數,默認為30。 "isExportSequenceInfo":false,//是否導出時序信息。 "startTimeString":"${startTime}${hh}",//增量數據的時間范圍(左閉右開)的左邊界。參數配置中配置startTime=${yyyymmdd} hh=$[hh24miss],表示ots讀取開始時間為業務日期的定時時間 "endTimeString":"${endTime}${hh}",//增量數據的時間范圍(左閉右開)的右邊界。參數配置中配置endTime=${yyyymmdd} hh=$[hh24miss],表示ots讀取結束時間為業務日期的定時時間 "mode": "single_version_and_update_only", "isTimeseriesTable":"true", "column": [ { "name": "_m_name" }, { "name": "_data_source", }, { "name": "_tags", }, { "name": "string_column", } ] }, "name":"Reader", "category":"reader" }, { "stepType":"stream", "parameter":{}, "name":"Writer", "category":"writer" } ], "setting":{ "errorLimit":{ "record":"0"http://錯誤記錄數。 }, "speed":{ "throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。 "concurrent":1,//作業并發數。 "mbps":"12"http://限流,此處1mbps = 1MB/s。 } }, "order":{ "hops":[ { "from":"Reader", "to":"Writer" } ] } }
Reader腳本參數
參數 | 描述 | 是否必選 | 默認值 |
datasource | 數據源名稱,腳本模式支持添加數據源,該配置項填寫的內容必須與添加的數據源名稱保持一致。 | 是 | 無 |
dataTable | 導出增量數據的表的名稱。該表需要開啟Stream,可以在建表時開啟,或者使用UpdateTable接口開啟。 | 是 | 無 |
statusTable | Reader插件用于記錄狀態的表的名稱,這些狀態可用于減少對非目標范圍內的數據的掃描,從而加快導出速度。statusTable是Reader用于保存狀態的表,如果該表不存在,Reader會自動創建該表。一次離線導出任務完成后,您無需刪除該表,該表中記錄的狀態可用于下次導出任務中:
您配置一個類似TableStoreStreamReaderStatusTable的名稱即可,請注意不要與業務相關的表重名。 | 是 | 無 |
startTimestampMillis | 增量數據的時間范圍(左閉右開)的左邊界,單位為毫秒:
| 否 | 無 |
endTimestampMillis | 增量數據的時間范圍(左閉右開)的右邊界,單位為毫秒:
| 否 | 無 |
date | 日期格式為yyyyMMdd,例如20151111,表示導出該日的數據。如果沒有指定date,則需要指定startTimestampMillis和endTimestampMillis或startTimeString和endTimeString,反之也成立。例如,采云間調度僅支持天級別,所以提供該配置,作用與startTimestampMillis和endTimestampMillis或startTimeString和endTimeString類似。 | 否 | 無 |
isExportSequenceInfo | 是否導出時序信息,時序信息包含了數據的寫入時間等。默認該值為false,即不導出。 | 否 | false |
maxRetries | 從TableStore中讀增量數據時,每次請求的最大重試次數,默認為30次。重試之間有間隔,重試30次的總時間約為5分鐘,通常無需更改。 | 否 | 30 |
startTimeString | 任務的開始時間,即增量數據的時間范圍(左閉右開)的左邊界,格式為 | 否 | 無 |
endTimeString | 任務的結束時間,即增量數據的時間范圍(左閉右開)的右邊界,格式為 | 否 | 無 |
enableSeekIterator | Reader插件需要先確定增量位點,然后再拉取數據,如果是經常運行的任務,插件會根據之前掃描的位點來確定位置。如果之前沒運行過這個插件,將會從增量開始位置(默認增量保留7天,即7天前)開始掃描,因此當還沒有掃描到設置的開始時間之后的數據時,會存在開始一段時間沒有數據導出的情況,您可以在reader的配置參數里增加 | 否 | false |
mode | 導出模式,設置為single_version_and_update_only時為行模式,默認不設置為列模式。 | 否 | 無 |
isTimeseriesTable | 是否為時序表,只有在行模式,即mode為single_version_and_update_only時配置生效。 | 否 | false |
column | column配置
說明 在行模式下必須配置,否則不會導出數據。 |
| 無 |