表格存儲Tablestore是構建在阿里云飛天分布式系統之上的NoSQL數據存儲服務,Tablestore數據源為您提供讀取和寫入Tablestore雙向通道的功能,本文為您介紹DataWorks的Tablestore數據同步的能力支持情況。
使用限制
Tablestore Reader和Writer插件實現了從Tablestore讀取和寫入數據,包含行模式、列模式兩種數據讀取與寫入方式,可針對寬表與時序表進行數據讀取與寫入。
列模式:在Tablestore多版本模型下,表中的數據組織為 三級的模式, 一行可以有任意列,列名并不是固定的,每一列可以含有多個版本,每個版本都有一個特定的時間戳(版本號)。列模式會將數據導出為(主鍵值,列名,時間戳,列值)的四元組格式,列模式下導入的數據也是(主鍵值,列名,時間戳,列值)的四元組格式。
行模式:該模式將用戶每次更新的記錄,抽取成行的形式導出,即(主鍵值,列值)的格式。
行模式下每一行數據對應TableStore表中的一條數據。寫入行模式的數據包含主鍵列列值、普通列列值兩部分。
Tablestore列由主鍵列primaryKey+普通列column組成,源端列順序需要和Tablestore目的端主鍵列+普通列保持一致,否則會產生列映射錯誤。
Tablestore Reader會根據一張表中待讀取的數據的范圍,按照數據同步并發的數目N,將范圍等分為N份Task。每個Task都會有一個Tablestore Reader線程來執行。
支持的字段類型
目前Tablestore Reader和Tablestore Writer支持所有Tablestore類型,其針對Tablestore類型的轉換列表,如下所示。
類型分類 | Tablestore數據類型 |
整數類 | INTEGER |
浮點類 | DOUBLE |
字符串類 | STRING |
布爾類 | BOOLEAN |
二進制類 | BINARY |
Tablestore本身不支持日期型類型。應用層通常使用Long保存時間的Unix TimeStamp。
您需要將INTEGER類型的數據,在腳本模式中配置為INT類型,DataWorks會將其轉換為INTEGER類型。如果您直接配置為INTEGER類型,日志將會報錯,導致任務無法順利完成。
創建數據源
在進行數據同步任務開發時,您需要在DataWorks上創建一個對應的數據源,操作流程請參見創建并管理數據源,詳細的配置參數解釋可在配置界面查看對應參數的文案提示。
數據同步任務開發
數據同步任務的配置入口和通用配置流程可參見下文的配置指導。
單表離線同步任務配置指導
操作流程請參見通過腳本模式配置離線同步任務。
腳本模式配置的全量參數和腳本Demo請參見下文的附錄二:Writer腳本Demo與參數說明。
附錄一:Reader腳本Demo與參數說明
離線任務腳本配置方式
如果您配置離線任務時使用腳本模式的方式進行配置,您需要按照統一的腳本格式要求,在任務腳本中編寫相應的參數,詳情請參見通過腳本模式配置離線同步任務,以下為您介紹腳本模式下數據源的參數配置詳情。
Reader腳本Demo
行模式讀取寬表配置
{
"type":"job",
"version":"2.0",//版本號。
"steps":[
{
"stepType":"ots",//插件名。
"parameter":{
"datasource":"",//數據源。
"newVersion":"true",//使用新版otsreader
"mode": "normal",// 行模式讀取數據
"isTimeseriesTable":"false",// 配置該表為寬表(非時序表)
"column":[//字段。
{
"name":"column1"http://字段名。
},
{
"name":"column2"
},
{
"name":"column3"
},
{
"name":"column4"
},
{
"name":"column5"
}
],
"range":{
"split":[
{
"type":"STRING",
"value":"beginValue"
},
{
"type":"STRING",
"value":"splitPoint1"
},
{
"type":"STRING",
"value":"splitPoint2"
},
{
"type":"STRING",
"value":"splitPoint3"
},
{
"type":"STRING",
"value":"endValue"
}
],
"end":[
{
"type":"STRING",
"value":"endValue"
},
{
"type":"INT",
"value":"100"
},
{
"type":"INF_MAX"
},
{
"type":"INF_MAX"
}
],
"begin":[
{
"type":"STRING",
"value":"beginValue"
},
{
"type":"INT",
"value":"0"
},
{
"type":"INF_MIN"
},
{
"type":"INF_MIN"
}
]
},
"table":""http://表名。
},
"name":"Reader",
"category":"reader"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"http://錯誤記錄數。
},
"speed":{
"throttle":true,//false代表不限流,下面的限流的速度不生效,true代表限流。
"concurrent":1 //作業并發數。
"mbps":"12"http://限流
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
行模式讀時序表配置
{
"type":"job",
"version":"2.0",//版本號。
"steps":[
{
"stepType":"ots",//插件名。
"parameter":{
"datasource":"",//數據源。
"table": "",//表名
// 讀時序數據mode必須為normal
"mode": "normal",
// 讀時序數據newVersion必須為true
"newVersion": "true",
// 配置該表為時序表
"isTimeseriesTable":"true",
// measurementName:配置需要讀取時序數據的度量名稱,非必需,為空則讀取全表數據
"measurementName":"measurement_1",
"column": [
{
"name": "_m_name"
},
{
"name": "tagA",
"is_timeseries_tag":"true"
},
{
"name": "double_0",
"type":"DOUBLE"
},
{
"name": "string_0",
"type":"STRING"
},
{
"name": "long_0",
"type":"INT"
},
{
"name": "binary_0",
"type":"BINARY"
},
{
"name": "bool_0",
"type":"BOOL"
},
{
"type":"STRING",
"value":"testString"
}
]
},
"name":"Reader",
"category":"reader"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"http://錯誤記錄數。
},
"speed":{
"throttle":true,//false代表不限流,下面的限流的速度不生效,true代表限流。
"concurrent":1 //作業并發數。
"mbps":"12"http://限流
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
列模式讀取寬表配置
{
"type":"job",
"version":"2.0",//版本號。
"steps":[
{
"stepType":"ots",//插件名。
"parameter":{
"datasource":"",//數據源。
"table":"",//表名
"newVersion":"true",//新版otsreader
"mode": "multiversion",//多版本模式
"column":[//配置需要導出的列名稱(必須是非主鍵列)
{"name":"mobile"},
{"name":"name"},
{"name":"age"},
{"name":"salary"},
{"name":"marry"}
],
"range":{//導出的范圍
"begin":[
{"type":"INF_MIN"},
{"type":"INF_MAX"}
],
"end":[
{"type":"INF_MAX"},
{"type":"INF_MIN"}
],
"split":[
]
},
},
"name":"Reader",
"category":"reader"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"http://錯誤記錄數。
},
"speed":{
"throttle":true,//false代表不限流,下面的限流的速度不生效,true代表限流。
"concurrent":1 //作業并發數。
"mbps":"12"http://限流
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
Reader腳本參數通用配置
參數 | 描述 | 是否必選 | 默認值 |
endpoint | Tablestore Server的EndPoint(服務地址),詳情請參見服務地址。 | 是 | 無 |
accessId | Tablestore的AccessKey ID。 | 是 | 無 |
accessKey | Tablestore的AccessKey Secret。 | 是 | 無 |
instanceName | Tablestore的實例名稱,實例是您使用和管理Tablestore服務的實體。 您在開通Tablestore服務后,需要通過管理控制臺來創建實例,然后在實例內進行表的創建和管理。 實例是Tablestore資源管理的基礎單元,Tablestore對應用程序的訪問控制和資源計量都在實例級別完成。 | 是 | 無 |
table | 所選取的需要抽取的表名稱,這里有且只能填寫一張表。在Tablestore不存在多表同步的需求。 | 是 | 無 |
newVersion | 定義了使用的Tablestore Reader插件的版本。
新版Tablestore Reader除了支持新的功能,對系統資源的開銷也相對較低,因此推薦使用新版Tablestore Reader。 新版本插件配置兼容了舊版本插件的配置,即舊任務增加newVersion=true配置之后,可以正常運行。 | 否 | false |
mode | 定義了讀取數據的模式,當前支持兩種模式:
本配置僅在新版Tablestore Reader(newVersion:true)配置下生效。 舊版Tablestore Reader會忽略mode配置,并僅支持行模式讀取。 | 否 | normal |
isTimeseriesTable | 定義了操作的數據表是否為時序數據表:
本配置僅在newVersion:true & mode:normal配置下生效。 舊版Tablestore Reader不支持時序表,且時序表無法按列模式讀取。 | 否 | false |
Reader腳本參數附加配置
Tablestore Reader支持行模式讀取寬表、行模式讀取時序表、列模式讀取寬表。下面為您介紹各模式的附加配置。
行模式讀寬表參數
參數 | 描述 | 是否必選 | 默認值 |
column | 所配置的表中需要同步的列名集合,使用JSON的數組描述字段信息。由于Tablestore本身是NoSQL系統,在Tablestore Reader抽取數據過程中,必須指定相應的字段名稱。
| 是 | 無 |
begin和end | begin和end配置項用于表示抽取Tablestore表數據的范圍。 begin和end描述的是Tablestore PrimaryKey的區間分布狀態,對于無限大小的區間,您可以使用 說明
例如,對一張主鍵為
| 否 | (INF_MIN,INF_MAX) |
split | 該配置項屬于高級配置項,是您自己定義切分配置信息,普通情況下不建議使用。 通過配置split參數可以自己定義分片的數據范圍,通常在Tablestore 數據存儲發生熱點,可以使用自定義的切分規則,以下任務配置為例:
任務運行時,數據會切分成6個段,并發讀取。分段數量建議比任務并發數大。
| 否 | 當split參數未配置時,使用自動切分邏輯。 自動切分邏輯會找出 Partition Key最大、最小值,并進行均勻分段。 Partition Key支持整型和字符串,整型使用整除來分段,字符串使用第一個字符的unicode碼來分段。 |
行模式讀時序表參數
參數 | 描述 | 是否必選 | 默認值 |
column | column是一個數組,每個元素表示一列,可配置常量列與普通列兩種類型。 對于常量列,需要配置以下字段:
對于普通列,需要配置以下字段:
讀出四列數據的腳本示例:
| 是 | 無 |
measurementName | 配置需要讀取時間線的度量名稱。若不配置則讀取全表數據。 | 否 | 無 |
timeRange | 請求數據的時間范圍,讀取的范圍是[begin,end),左閉右開的區間,且begin必須小于end,時間戳單位為毫秒。格式如下:
| 否 | 全部版本 |
列模式讀寬表參數
參數 | 描述 | 是否必選 | 默認值 |
column | 指定要導出的列,在列模式下只支持普通列。 格式:
說明
| 是 | 所有列 |
range | 讀取的數據范圍,讀取的范圍為[begin,end),左閉右開的區間,并且:
type支持的類型有如下幾類:
格式:
| 否 | 全部數據 |
timeRange | 請求數據的時間范圍,讀取的范圍是[begin,end),左閉右開的區間,且begin必須小于end,時間戳單位為毫秒。 格式:
| 否 | 全部版本 |
maxVersion | 請求的最大數據版本數,取值范圍是1~INT32_MAX。 | 否 | 全部版本 |
附錄二:Writer腳本Demo與參數說明
離線任務腳本配置方式
如果您配置離線任務時使用腳本模式的方式進行配置,您需要按照統一的腳本格式要求,在任務腳本中編寫相應的參數,詳情請參見通過腳本模式配置離線同步任務,以下為您介紹腳本模式下數據源的參數配置詳情。
Writer腳本Demo
行模式寫入寬表配置
{
"type":"job",
"version":"2.0",//版本號。
"steps":[
{
"stepType":"stream",
"parameter":{},
"name":"Reader",
"category":"reader"
},
{
"stepType":"ots",//插件名。
"parameter":{
"datasource":"",//數據源。
"table":"",//表名。
"newVersion":"true",//使用新版otswriter
"mode": "normal",// 行模式寫入數據
"isTimeseriesTable":"false",// 配置該表為寬表(非時序表)
"primaryKey" : [//Tablestore的主鍵信息。
{"name":"gid", "type":"INT"},
{"name":"uid", "type":"STRING"}
],
"column" : [//字段。
{"name":"col1", "type":"INT"},
{"name":"col2", "type":"DOUBLE"},
{"name":"col3", "type":"STRING"},
{"name":"col4", "type":"STRING"},
{"name":"col5", "type":"BOOL"}
],
"writeMode" : "PutRow" //寫入模式。
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"http://錯誤記錄數。
},
"speed":{
"throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
"concurrent":1, //作業并發數。
"mbps":"12"http://限流,此處1mbps = 1MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
行模式寫時序表配置
{
"type":"job",
"version":"2.0",//版本號。
"steps":[
{
"stepType":"stream",
"parameter":{},
"name":"Reader",
"category":"reader"
},
{
"stepType":"ots",//插件名。
"parameter":{
"datasource":"",//數據源。
"table": "testTimeseriesTableName01",
"mode": "normal",
"newVersion": "true",
"isTimeseriesTable":"true",
"timeunit":"microseconds",
"column": [
{
"name": "_m_name"
},
{
"name": "_data_source",
},
{
"name": "_tags",
},
{
"name": "_time",
},
{
"name": "string_1",
"type":"string"
},
{
"name":"tag3",
"is_timeseries_tag":"true",
}
]
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"http://錯誤記錄數。
},
"speed":{
"throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
"concurrent":1, //作業并發數。
"mbps":"12"http://限流,此處1mbps = 1MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
列模式寫入寬表配置
{
"type":"job",
"version":"2.0",//版本號。
"steps":[
{
"stepType":"stream",
"parameter":{},
"name":"Reader",
"category":"reader"
},
{
"stepType":"ots",//插件名。
"parameter":{
"datasource":"",//數據源。
"table":"",
"newVersion":"true",
"mode":"multiVersion",
"primaryKey" : [
"gid",
"uid"
]
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"http://錯誤記錄數。
},x`
"speed":{
"throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
"concurrent":1, //作業并發數。
"mbps":"12"http://限流,此處1mbps = 1MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
Writer腳本參數通用配置
參數 | 描述 | 是否必選 | 默認值 |
datasource | 數據源名稱,腳本模式支持添加數據源,該配置項填寫的內容必須與添加的數據源名稱保持一致。 | 是 | 無 |
endPoint | Tablestore Server的EndPoint(服務地址),詳情請參見服務地址。 | 是 | 無 |
accessId | Tablestore的AccessKey ID。 | 是 | 無 |
accessKey | Tablestore的AccessKey Secret。 | 是 | 無 |
instanceName | Tablestore的實例名稱,實例是您使用和管理Tablestore服務的實體。 您在開通Tablestore服務后,需要通過管理控制臺來創建實例,然后在實例內進行表的創建和管理。實例是Tablestore資源管理的基礎單元,Tablestore對應用程序的訪問控制和資源計量都在實例級別完成。 | 是 | 無 |
table | 所選取的需要抽取的表名稱,此處能且只能填寫一張表。在Tablestore中不存在多表同步的需求。 | 是 | 無 |
newVersion | 定義了使用的Tablestore Writer插件版本。
新版Tablestore Writer除了支持新的功能,對系統資源的開銷也相對較低,因此推薦使用新版Tablestore Writer。 新版插件兼容舊版本插件配置,即舊任務增加newVersion=true配置之后,可以正常運行。 | 是 | false |
mode | 定義了寫入數據的模式,當前支持兩種模式
本配置僅在newVersion:true配置下生效 舊版Tablestore Writer會忽略mode配置,并僅支持行模式讀取。 | 否 | normal |
isTimeseriesTable | 定義了操作的數據表是否為時序數據表。
本配置僅在newVersion:true & mode:normal配置下生效(列模式不兼容時序表)。 | 否 | false |
Writer腳本參數附加配置
Tablestore Writer支持行模式寫入寬表、行模式寫入時序表、列模式寫入寬表。下面介紹各模式的附加配置。
行模式寫寬表參數
參數 | 描述 | 是否必選 | 默認值 |
primaryKey | Tablestore的主鍵信息,使用JSON的數組描述字段信息。Tablestore本身是NoSQL系統,在Tablestore Writer導入數據過程中,必須指定相應的字段名稱。 數據同步系統本身支持類型轉換的,因此對于源頭數據非STRING/INT,Tablestore Writer會進行數據類型轉換。配置示例如下。
說明 Tablestore的PrimaryKey僅支持STRING和INT類型,因此Tablestore Writer本身也限定填寫STRING和INT兩種類型。 | 是 | 無 |
column | 所配置的表中需要同步的列名集合,使用JSON的數組描述字段信息。 配置示例:
其中的name為寫入的Tablestore列名稱,type為寫入的類型。Tablestore支持STRING、INT、DOUBLE、BOOL和BINARY類型。 說明 寫入過程不支持常量、函數或者自定義表達式。 | 是 | 無 |
writeMode | 數據寫入表格存儲的模式,目前支持以下兩種模式:
| 是 | 無 |
enableAutoIncrement | 是否允許向包含主鍵自增列的Tablestore表中寫入數據。
| 否 | false |
requestTotalSizeLimitation | 該配置限制寫入Tablestore時單行數據的大小,配置類型為數字。 | 否 | 1MB |
attributeColumnSizeLimitation | 該配置限制寫入Tablestore時單個屬性列的大小,配置類型為數字。 | 否 | 2MB |
primaryKeyColumnSizeLimitation | 該配置限制寫入Tablestore時單個主鍵列的大小,配置類型為數字。 | 否 | 1KB |
attributeColumnMaxCount | 該配置限制寫入Tablestore時屬性列的個數,配置類型為數字。 | 否 | 1,024 |
行模式寫時序表參數
參數 | 描述 | 是否必選 | 默認值 |
column | column中的每個元素對應時序數據中的一個字段,每個元素可配置以下參數。
由于時序數據的度量名稱與時間戳不能為空,因此必須配置 示例:一條待寫入數據如下,包含六個字段:
使用如下配置:
寫入Tablestore后,控制臺查看結果如下 | 是 | 無 |
timeunit | 所配置的時間戳_time字段的單位,支持NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS、MINUTES。 | 否 | MICROSECONDS |
列模式寫寬表參數
參數 | 描述 | 是否必選 | 默認值 |
primaryKey | 表的主鍵列。 考慮到配置成本,并不需要配置primaryKey在Record(Line)中的位置,但Record的格式需要固定,要求primaryKey一定在行首,primaryKey之后是columnName。record的格式為: 例如,有以下9條數據:
配置示例如下:
寫入寬行結果:
| 是 | 無 |
columnNamePrefixFilter | 列名前綴過濾。 Hbase導入的數據,cf和qulifier共同組成了columnName,但Tablestore不支持cf,所以需要將cf過濾掉。 配置示例: 說明
| 否 | 無 |
常見問題
問題一
Q:向包含主鍵自增列的目標表寫入數據,需要如何配置Tablestore Writer?
Tablestore Writer的配置中必須包含以下兩條:
"newVersion": "true", "enableAutoIncrement": "true",
Tablestore Writer中不需要配置主鍵自增列的列名。
Tablestore Writer中配置的primaryKey條數+column條數需要等于上游Tablestore Reader數據的列數。
問題二
Q:在時序模型的配置中,如何理解_tag
和is_timeseries_tag
兩個字段?
示例:某條數據共有三個標簽,標簽為:【手機=小米,內存=8G,鏡頭=萊卡】。
數據導出示例(Tablestore Reader)
如果想將上述標簽合并到一起作為一列導出,則配置為:
"column": [ { "name": "_tags", } ],
DataWorks會將標簽導出為一列數據,形式如下:
["phone=xiaomi","camera=LEICA","RAM=8G"]
如果希望導出
phone
標簽和camera
標簽,并且每個標簽單獨作為一列導出,則配置為:"column": [ { "name": "phone", "is_timeseries_tag":"true", }, { "name": "camera", "is_timeseries_tag":"true", } ],
DataWorks會導出兩列數據,形式如下:
xiaomi, LEICA
數據導入示例(Tablestore Writer)
現在上游數據源(Reader)有兩列數據:
一列數據為:
["phone=xiaomi","camera=LEICA","RAM=8G"]
。另一列數據為:6499。
現希望將這兩列數據都添加到標簽里面,預期的寫入后標簽字段格式如下所示:則配置為:
"column": [ { "name": "_tags", }, { "name": "price", "is_timeseries_tag":"true", }, ],
第一列配置將
["phone=xiaomi","camera=LEICA","RAM=8G"]
整體導入標簽字段。第二列配置將
price=6499
單獨導入標簽字段。