HBase數據源為您提供讀取和寫入HBase的雙向通道,本文為您介紹DataWorks的HBase數據同步的能力支持情況。
支持的版本
支持HBase0.94.x、HBase1.1.x、HBase2.x和Phoenix5.x版本。
如果您的HBase版本為HBase0.94.x,Reader和Writer端的插件請選擇094x。
"reader": { "plugin": "094x" }
"writer": { "hbaseVersion": "094x" }
如果您的HBase版本為HBase1.1.x或HBase2.x,Reader和Writer端的插件請選擇11x。
"reader": { "plugin": "11x" }
"writer": { "hbaseVersion": "11x" }
說明HBase1.1.x插件當前可以兼容HBase 2.0。
HBase11xsql Writer插件實現了向Hbase中的SQL表(phoenix)批量導入數據。因為Phoenix對rowkey進行了數據編碼,如果您直接使用HBaseAPI寫入,需要手動轉換數據,麻煩且易錯。HBase11xsql Writer插件為您提供了單間的SQL表的數據導入方式。
說明通過Phoenix的JDBC驅動,執行UPSERT語句向表中批量寫入數據。因為使用上層接口,所以可以同步更新索引表。
使用限制
HBase Reader | HBase20xsql Reader | HBase11xsql Writer |
|
|
|
支持的功能
HBase Reader
支持normal和multiVersionFixedColumn模式
normal模式:把HBase中的表當成普通二維表(橫表)進行讀取,獲取最新版本數據。
hbase(main):017:0> scan 'users' ROW COLUMN+CELL lisi column=address:city, timestamp=1457101972764, value=beijing lisi column=address:contry, timestamp=1457102773908, value=china lisi column=address:province, timestamp=1457101972736, value=beijing lisi column=info:age, timestamp=1457101972548, value=27 lisi column=info:birthday, timestamp=1457101972604, value=1987-06-17 lisi column=info:company, timestamp=1457101972653, value=baidu xiaoming column=address:city, timestamp=1457082196082, value=hangzhou xiaoming column=address:contry, timestamp=1457082195729, value=china xiaoming column=address:province, timestamp=1457082195773, value=zhejiang xiaoming column=info:age, timestamp=1457082218735, value=29 xiaoming column=info:birthday, timestamp=1457082186830, value=1987-06-17 xiaoming column=info:company, timestamp=1457082189826, value=alibaba 2 row(s) in 0.0580 seconds }
讀取后的數據如下所示。
rowKey
address:city
address:contry
address:province
info:age
info:birthday
info:company
lisi
beijing
china
beijing
27
1987-06-17
baidu
xiaoming
hangzhou
china
zhejiang
29
1987-06-17
alibaba
multiVersionFixedColumn模式:把HBase中的表當成豎表進行讀取。讀出的每條記錄是四列形式,依次為rowKey、family:qualifier、timestamp和value。讀取時需要明確指定要讀取的列,把每一個cell中的值,作為一條記錄(record),若有多個版本則存在多條記錄。
hbase(main):018:0> scan 'users',{VERSIONS=>5} ROW COLUMN+CELL lisi column=address:city, timestamp=1457101972764, value=beijing lisi column=address:contry, timestamp=1457102773908, value=china lisi column=address:province, timestamp=1457101972736, value=beijing lisi column=info:age, timestamp=1457101972548, value=27 lisi column=info:birthday, timestamp=1457101972604, value=1987-06-17 lisi column=info:company, timestamp=1457101972653, value=baidu xiaoming column=address:city, timestamp=1457082196082, value=hangzhou xiaoming column=address:contry, timestamp=1457082195729, value=china xiaoming column=address:province, timestamp=1457082195773, value=zhejiang xiaoming column=info:age, timestamp=1457082218735, value=29 xiaoming column=info:age, timestamp=1457082178630, value=24 xiaoming column=info:birthday, timestamp=1457082186830, value=1987-06-17 xiaoming column=info:company, timestamp=1457082189826, value=alibaba 2 row(s) in 0.0260 seconds }
讀取后的數據(4列) 如下所示。
rowKey
column:qualifier
timestamp
value
lisi
address:city
1457101972764
beijing
lisi
address:contry
1457102773908
china
lisi
address:province
1457101972736
beijing
lisi
info:age
1457101972548
27
lisi
info:birthday
1457101972604
1987-06-17
lisi
info:company
1457101972653
beijing
xiaoming
address:city
1457082196082
hangzhou
xiaoming
address:contry
1457082195729
china
xiaoming
address:province
1457082195773
zhejiang
xiaoming
info:age
1457082218735
29
xiaoming
info:age
1457082178630
24
xiaoming
info:birthday
1457082186830
1987-06-17
xiaoming
info:company
1457082189826
alibaba
HBase Writer
支持源端多個字段拼接作為rowkey
目前HBase Writer支持源端多個字段拼接作為HBase表的rowkey。
寫入HBase的版本支持
寫入HBase的時間戳(版本)支持:
當前時間作為版本。
指定源端列作為版本。
指定一個時間作為版本。
支持的字段類型
離線讀
支持讀取HBase數據類型及HBase Reader針對HBase類型的轉換列表如下表所示。
類型分類 | 數據集成column配置類型 | 數據庫數據類型 |
整數類 | long | short、int和long |
浮點類 | double | float和double |
字符串類 | string | binary_string和string |
日期時間類 | date | date |
字節類 | bytes | bytes |
布爾類 | boolean | boolean |
HBase20xsql Reader支持大部分Phoenix類型,但也存在個別類型沒有支持的情況,請注意檢查你的類型。
HBase20xsql Reader針對Phoenix類型的轉換列表,如下所示。
DataX內部類型 | Phoenix數據類型 |
long | INTEGER、TINYINT、SMALLINT、BIGINT |
double | FLOAT、DECIMAL、DOUBLE |
string | CHAR、VARCHAR |
date | DATE、TIME、TIMESTAMP |
bytes | BINARY、VARBINARY |
boolean | BOOLEAN |
離線寫
支持讀取HBase數據類型,HBase Writer針對HBase類型的轉換列表,如下表所示。
column的配置需要和HBase表對應的列類型保持一致。
除下表中羅列的字段類型外,其它類型均不支持。
類型分類 | 數據庫數據類型 |
整數類 | INT、LONG和SHORT |
浮點類 | FLOAT和DOUBLE |
布爾類 | BOOLEAN |
字符串類 | STRING |
注意事項
如果您在測試連通性時遇到 "tried to access method com.google.common.base.Stopwatch" 的錯誤信息,請在新增數據源頁面的JSON配置項內添加 "hbaseVersion": "" 字段,指定您的hbase版本,例如, "hbaseVersion:"2.0.14"。
創建數據源
在進行數據同步任務開發時,您需要在DataWorks上創建一個對應的數據源,操作流程請參見創建并管理數據源,詳細的配置參數解釋可在配置界面查看對應參數的文案提示。
數據同步任務開發
數據同步任務的配置入口和通用配置流程可參見下文的配置指導。
單表離線同步任務配置指導
操作流程請參見通過向導模式配置離線同步任務、通過腳本模式配置離線同步任務。
腳本模式配置的全量參數和腳本Demo請參見下文的附錄:腳本Demo與參數說明。
常見問題
Q:并發設置多少比較合適?速度慢時增加并發有用嗎?
A:數據導入進程默認JVM的堆大小是2GB,并發(channel數)是通過多線程實現的,開過多的線程有時并不能提高導入速度,反而可能因為過于頻繁的GC導致性能下降。一般建議并發數(channel)為5-10。
Q:batchSize設置多少比較合適?
A:默認是256,但應根據每行的大小來計算最合適的batchSize。通常一次操作的數據量在2MB~4MB左右,用該值除以行大小,即可得到batchSize。
附錄:腳本Demo與參數說明
離線任務腳本配置方式
如果您配置離線任務時使用腳本模式的方式進行配置,您需要按照統一的腳本格式要求,在任務腳本中編寫相應的參數,詳情請參見通過腳本模式配置離線同步任務,以下為您介紹腳本模式下數據源的參數配置詳情。
HBase Reader腳本Demo
{
"type":"job",
"version":"2.0",//版本號。
"steps":[
{
"stepType":"hbase",//插件名。
"parameter":{
"mode":"normal",//讀取HBase的模式,支持normal模式、multiVersionFixedColumn模式。
"scanCacheSize":"256",//HBase client每次RPC從服務器端讀取的行數。
"scanBatchSize":"100",//HBase client每次RPC從服務器端讀取的列數。
"hbaseVersion":"094x/11x",//HBase版本。
"column":[//字段。
{
"name":"rowkey",//字段名。
"type":"string"http://數據類型。
},
{
"name":"columnFamilyName1:columnName1",
"type":"string"
},
{
"name":"columnFamilyName2:columnName2",
"format":"yyyy-MM-dd",
"type":"date"
},
{
"name":"columnFamilyName3:columnName3",
"type":"long"
}
],
"range":{//指定HBase Reader讀取的rowkey范圍。
"endRowkey":"",//指定結束rowkey。
"isBinaryRowkey":true,//指定配置的startRowkey和endRowkey轉換為byte[]時的方式,默認值為false。
"startRowkey":""http://指定開始rowkey。
},
"maxVersion":"",//指定在多版本模式下的HBase Reader讀取的版本數。
"encoding":"UTF-8",//編碼格式。
"table":"",//表名。
"hbaseConfig":{//連接HBase集群需要的配置信息,JSON格式。
"hbase.zookeeper.quorum":"hostname",
"hbase.rootdir":"hdfs://ip:port/database",
"hbase.cluster.distributed":"true"
}
},
"name":"Reader",
"category":"reader"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"http://錯誤記錄數。
},
"speed":{
"throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
"concurrent":1,//作業并發數。
"mbps":"12"http://限流,此處1mbps = 1MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
HBase Reader腳本參數
參數 | 描述 | 是否必選 | 默認值 |
haveKerberos | haveKerberos值為true時,表示HBase集群需要kerberos認證。 說明
| 否 | false |
hbaseConfig | 連接HBase集群需要的配置信息,JSON格式。必填的配置為hbase.zookeeper.quorum,表示HBase的ZK鏈接地址。同時可以補充更多HBase client的配置,例如設置scan的cache、batch來優化與服務器的交互。 說明 如果是云HBase的數據庫,需要使用內網地址連接訪問。 | 是 | 無 |
mode | 讀取HBase的模式,支持normal模式和multiVersionFixedColumn模式。 | 是 | 無 |
table | 讀取的HBase表名(大小寫敏感) 。 | 是 | 無 |
encoding | 編碼方式,UTF-8或GBK,用于對二進制存儲的HBase byte[]轉為String時的編碼。 | 否 | utf-8 |
column | 要讀取的HBase字段,normal模式與multiVersionFixedColumn模式下必填。
| 是 | 無 |
maxVersion | 指定在多版本模式下的HBase Reader讀取的版本數,取值只能為-1或大于1的數字,-1表示讀取所有版本。 | multiVersionFixedColumn模式下必填項 | 無 |
range | 指定HBase Reader讀取的rowkey范圍。
| 否 | 無 |
scanCacheSize | HBase Reader每次從HBase中讀取的行數。 | 否 | 256 |
scanBatchSize | HBase Reader每次從HBase中讀取的列數。 | 否 | 100 |
HBase Writer腳本Demo
{
"type":"job",
"version":"2.0",//版本號
"steps":[
{
"stepType":"stream",
"parameter":{},
"name":"Reader",
"category":"reader"
},
{
"stepType":"hbase",//插件名。
"parameter":{
"mode":"normal",//寫入HBase的模式。
"walFlag":"false",//關閉(false)放棄寫WAL日志。
"hbaseVersion":"094x",//Hbase版本。
"rowkeyColumn":[//要寫入的HBase的rowkey列。
{
"index":"0",//序列號。
"type":"string"http://數據類型。
},
{
"index":"-1",
"type":"string",
"value":"_"
}
],
"nullMode":"skip",//讀取的為null值時,如何處理。
"column":[//要寫入的HBase字段。
{
"name":"columnFamilyName1:columnName1",//字段名。
"index":"0",//索引號。
"type":"string"http://數據類型。
},
{
"name":"columnFamilyName2:columnName2",
"index":"1",
"type":"string"
},
{
"name":"columnFamilyName3:columnName3",
"index":"2",
"type":"string"
}
],
"encoding":"utf-8",//編碼格式。
"table":"",//表名。
"hbaseConfig":{//連接HBase集群需要的配置信息,JSON格式。
"hbase.zookeeper.quorum":"hostname",
"hbase.rootdir":"hdfs: //ip:port/database",
"hbase.cluster.distributed":"true"
}
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"http://錯誤記錄數。
},
"speed":{
"throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
"concurrent":1, //作業并發數。
"mbps":"12"http://限流
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
HBase Writer腳本參數
參數 | 描述 | 是否必選 | 默認值 |
haveKerberos | haveKerberos值為true時,表示HBase集群需要kerberos認證。 說明
| 否 | false |
hbaseConfig | 連接HBase集群需要的配置信息,JSON格式。必填的配置為hbase.zookeeper.quorum,表示HBase的ZK鏈接地址。同時可以補充更多HBase client的配置,例如設置scan的cache、batch來優化與服務器的交互。 說明 如果是云HBase的數據庫,需要使用內網地址連接訪問。 | 是 | 無 |
mode | 寫入HBase的模式,目前僅支持normal模式,后續考慮動態列模式。 | 是 | 無 |
table | 要寫入的HBase表名(大小寫敏感) 。 | 是 | 無 |
encoding | 編碼方式,UTF-8或GBK,用于STRING轉HBase byte[]時的編碼。 | 否 | utf-8 |
column | 要寫入的HBase字段:
| 是 | 無 |
rowkeyColumn | 要寫入的HBase的rowkey列:
配置格式如下所示。
| 是 | 無 |
versionColumn | 指定寫入HBase的時間戳。支持當前時間、指定時間列或指定時間(三者選一),如果不配置則表示用當前時間。
配置格式如下所示。
| 否 | 無 |
nullMode | 讀取的數據為null值時,您可以通過以下兩種方式解決:
| 否 | skip |
walFlag | HBase Client向集群中的RegionServer提交數據時(Put/Delete操作),首先會先寫WAL(Write Ahead Log)日志(即HLog,一個RegionServer上的所有Region共享一個HLog),只有當WAL日志寫成功后,才會接著寫MemStore,最后客戶端被通知提交數據成功。 如果寫WAL日志失敗,客戶端則被通知提交失敗。關閉(false)放棄寫WAL日志,從而提高數據寫入的性能。 | 否 | false |
writeBufferSize | 設置HBase Client的寫Buffer大小,單位字節,配合autoflush使用。 autoflush(默認處于關閉狀態):
| 否 | 8M |
HBase20xsql Reader腳本Demo
{
"type":"job",
"version":"2.0",//版本號。
"steps":[
{
"stepType":"hbase20xsql",//插件名。
"parameter":{
"queryServerAddress": "http://127.0.0.1:8765", //填寫連接Phoenix QueryServer地址。
"serialization": "PROTOBUF", //QueryServer序列化格式。
"table": "TEST", //讀取表名。
"column": ["ID", "NAME"], //所要讀取列名。
"splitKey": "ID" //切分列,必須是表主鍵。
},
"name":"Reader",
"category":"reader"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"http://錯誤記錄數。
},
"speed":{
"throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
"concurrent":1,//作業并發數。
"mbps":"12"http://限流,此處1mbps = 1MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
HBase20xsql Reader腳本參數
參數 | 描述 | 是否必選 | 默認值 |
queryServerAddress | HBase20xsql Reader插件需要通過Phoenix輕客戶端去連接Phoenix QueryServer,因此,您需要在此處填寫對應的QueryServer地址。如果HBase增強版(Lindorm)用戶需要透傳user、password參數,可以在queryServerAddress后增加對應的可選屬性。格式為: | 是 | 無 |
serialization | QueryServer使用的序列化協議。 | 否 | PROTOBUF |
table | 所要讀取的表名(大小寫敏感)。 | 是 | 無 |
schema | 表所在的schema。 | 否 | 無 |
column | 所配置的表中需要同步的列名集合,使用JSON的數組描述字段信息,空值表示讀取所有列。默認值為空值。 | 否 | 全部列 |
splitKey | 讀取表時對表進行切分,如果指定splitKey,表示您希望使用splitKey代表的字段進行數據分片,數據同步因此會啟動并發任務進行數據同步,提高了數據同步的效能。您可以選擇兩種不同的切分方式,如果splitPoint為空,默認根據方法一自動切分:
| 是 | 無 |
splitPoints | 根據切分列的最大值和最小值切分時不能保證避免數據熱點,因此,建議切分點根據Region的startkey和endkey進行設置,保證每個查詢對應單個Region。 | 否 | 無 |
where | 篩選條件,支持對表查詢增加過濾條件。HBase20xsql Reader根據指定的column、table、where條件拼接SQL,并根據該SQL進行數據抽取。 | 否 | 無 |
querySql | 在部分業務場景中,where配置項不足以描述所篩選的條件,您可以通過該配置型來自定義篩選SQL。配置該項后,除queryserverAddress參數必須設置外,HBase20xsql Reader會直接忽略column、table、where和splitKey條件的配置,使用該項配置的內容對數據進行篩選。 | 否 | 無 |
HBase11xsql Writer腳本Demo
{
"type": "job",
"version": "1.0",
"configuration": {
"setting": {
"errorLimit": {
"record": "0"
},
"speed": {
"throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
"concurrent":1, //作業并發數。
"mbps":"1"http://限流,此處1mbps = 1MB/s。
}
},
"reader": {
"plugin": "odps",
"parameter": {
"datasource": "",
"table": "",
"column": [],
"partition": ""
}
},
"plugin": "hbase11xsql",
"parameter": {
"table": "目標hbase表名,大小寫有關",
"hbaseConfig": {
"hbase.zookeeper.quorum": "目標hbase集群的ZK服務器地址,向PE咨詢",
"zookeeper.znode.parent": "目標hbase集群的znode,向PE咨詢"
},
"column": [
"columnName"
],
"batchSize": 256,
"nullMode": "skip"
}
}
}
HBase11xsql Writer腳本參數
參數 | 描述 | 是否必選 | 默認值 |
plugin | 插件名字,必須是hbase11xsql。 | 是 | 無 |
table | 要導入的表名,大小寫敏感,通常phoenix表都是大寫表名。 | 是 | 無 |
column | 列名,大小寫敏感。通常phoenix的列名都是大寫。 說明
| 是 | 無 |
hbaseConfig | hbase集群地址,zk為必填項,格式為ip1, ip2, ip3。 說明
| 是 | 無 |
batchSize | 批量寫入的最大行數。 | 否 | 256 |
nullMode | 讀取到的列值為null時,您可以通過以下兩種方式進行處理:
| 否 | skip |