DataHub數(shù)據(jù)源
DataHub數(shù)據(jù)源作為數(shù)據(jù)中樞,為您提供讀取和寫入DataHub數(shù)據(jù)庫的雙向通道,能夠快速解決海量數(shù)據(jù)的計(jì)算問題。本文為您介紹DataWorks的DataHub數(shù)據(jù)同步的能力支持情況。
支持的版本
DataHub Reader通過DataHub的Java SDK讀取DataHub中的數(shù)據(jù),具體使用的Java SDK版本,如下所示。
<dependency> <groupId>com.aliyun.DataHub</groupId> <artifactId>aliyun-sdk-DataHub</artifactId> <version>2.9.1</version> </dependency>
DataHub Writer通過DataHub服務(wù)的Java SDK向DataHub寫入數(shù)據(jù),使用的日志服務(wù)Java SDK版本如下。
<dependency> <groupId>com.aliyun.datahub</groupId> <artifactId>aliyun-sdk-datahub</artifactId> <version>2.5.1</version> </dependency>
使用限制
離線讀寫
STRING字符串僅支持UTF-8編碼,單個(gè)STRING列最長允許1MB。
實(shí)時(shí)讀寫
實(shí)時(shí)數(shù)據(jù)同步任務(wù)支持使用Serverless資源組(推薦)和獨(dú)享數(shù)據(jù)集成資源組。
實(shí)時(shí)同步至DataHub時(shí),按源端校驗(yàn)哈希值,同一個(gè)哈希值的數(shù)據(jù)會(huì)同步到同一個(gè)shard中。
全增量實(shí)時(shí)寫
運(yùn)行同步任務(wù)后,生成的離線同步任務(wù)將全量數(shù)據(jù)寫入DataHub,待全量數(shù)據(jù)執(zhí)行完成后,啟動(dòng)實(shí)時(shí)同步任務(wù),將源端增量數(shù)據(jù)實(shí)時(shí)同步至目標(biāo)端。數(shù)據(jù)寫入格式如下:
僅支持將數(shù)據(jù)寫入DataHub Tuple類型的Topic中。關(guān)于DataHub TUPLE數(shù)據(jù)類型說明,詳情請(qǐng)參見:數(shù)據(jù)類型介紹。
實(shí)時(shí)同步至DataHub會(huì)在源表字段基礎(chǔ)上,新增5個(gè)附加字段,并支持您在配任務(wù)配置時(shí),自行添加額外的字段。最終發(fā)送給DataHub的消息格式,詳情請(qǐng)參見:附錄:DataHub消息格式。
支持的字段類型
DataHub同步數(shù)據(jù)時(shí),會(huì)根據(jù)DataHub Field的數(shù)據(jù)類型同步到對(duì)應(yīng)的數(shù)據(jù)類型中,DataHub僅支持BIGINT、STRING、BOOLEAN、DOUBLE、TIMESTAMP、DECIMAL數(shù)據(jù)類型。
創(chuàng)建數(shù)據(jù)源
在進(jìn)行數(shù)據(jù)同步任務(wù)開發(fā)時(shí),您需要在DataWorks上創(chuàng)建一個(gè)對(duì)應(yīng)的數(shù)據(jù)源,操作流程請(qǐng)參見創(chuàng)建并管理數(shù)據(jù)源,詳細(xì)的配置參數(shù)解釋可在配置界面查看對(duì)應(yīng)參數(shù)的文案提示。
數(shù)據(jù)同步任務(wù)開發(fā)
數(shù)據(jù)同步任務(wù)的配置入口和通用配置流程可參見下文的配置指導(dǎo)。
單表離線同步任務(wù)配置指導(dǎo)
操作流程請(qǐng)參見通過向?qū)J脚渲秒x線同步任務(wù)、通過腳本模式配置離線同步任務(wù)。
腳本模式配置的全量參數(shù)和腳本Demo請(qǐng)參見下文的附錄:腳本Demo與參數(shù)說明。
單表、整庫實(shí)時(shí)同步任務(wù)配置指導(dǎo)
操作流程請(qǐng)參見DataStudio側(cè)實(shí)時(shí)同步任務(wù)配置。
DataHub不同數(shù)據(jù)類型對(duì)應(yīng)操作的支持情況,不同數(shù)據(jù)類型的分片策略、數(shù)據(jù)格式及相關(guān)消息示例。詳情請(qǐng)參見:附錄:DataHub消息格式。
單表、整庫全增量(實(shí)時(shí))同步配置指導(dǎo)
操作流程請(qǐng)參見數(shù)據(jù)集成側(cè)同步任務(wù)配置。
常見問題
附錄:腳本Demo與參數(shù)說明
離線任務(wù)腳本配置方式
如果您配置離線任務(wù)時(shí)使用腳本模式的方式進(jìn)行配置,您需要按照統(tǒng)一的腳本格式要求,在任務(wù)腳本中編寫相應(yīng)的參數(shù),詳情請(qǐng)參見通過腳本模式配置離線同步任務(wù),以下為您介紹腳本模式下數(shù)據(jù)源的參數(shù)配置詳情。
Reader腳本Demo
{
"type":"job",
"version":"2.0",//版本號(hào)
"steps":[
{
"job": {
"content": [
{
"reader": {
"name": "DataHubreader",
"parameter": {
"endpoint": "xxx" //DataHub的endpoint。
"accessId": "xxx", //訪問DataHub的用戶accessId。
"accessKey": "xxx", //訪問DataHub的用戶accessKey。
"project": "xxx", //目標(biāo)DataHub的項(xiàng)目名稱。
"topic": "xxx" //目標(biāo)DataHub的topic名稱。
"batchSize": 1000, //一次讀取的數(shù)據(jù)量。
"beginDateTime": "20180910111214", //數(shù)據(jù)消費(fèi)的開始時(shí)間位點(diǎn)。
"endDateTime": "20180910111614", //數(shù)據(jù)消費(fèi)的結(jié)束時(shí)間位點(diǎn)。
"column": [
"col0",
"col1",
"col2",
"col3",
"col4"
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": false
}
}
}
]
}
}
],
"setting":{
"errorLimit":{
"record":"0"http://錯(cuò)誤記錄數(shù)
},
"speed":{
"throttle":true,//當(dāng)throttle值為false時(shí),mbps參數(shù)不生效,表示不限流;當(dāng)throttle值為true時(shí),表示限流。
"concurrent":1,//并發(fā)數(shù)
"mbps":"12"http://限流,此處1mbps = 1MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
Reader腳本參數(shù)
參數(shù) | 描述 | 是否必選 |
endpoint | DataHub的endpoint。 | 是 |
accessId | 訪問DataHub的用戶accessId。 | 是 |
accessKey | 訪問DataHub的用戶accessKey。 | 是 |
project | 目標(biāo)DataHub的項(xiàng)目名稱。project是DataHub中的資源管理單元,用于資源隔離和控制。 | 是 |
topic | 目標(biāo)DataHub的topic名稱。 | 是 |
batchSize | 一次讀取的數(shù)據(jù)量,默認(rèn)為1,024條。 | 否 |
beginDateTime | 數(shù)據(jù)消費(fèi)的開始時(shí)間位點(diǎn)。該參數(shù)是時(shí)間范圍(左閉右開)的左邊界,yyyyMMddHHmmss格式的時(shí)間字符串,可以和DataWorks的調(diào)度時(shí)間參數(shù)配合使用。 說明 beginDateTime和endDateTime需要互相組合配套使用。 | 是 |
endDateTime | 數(shù)據(jù)消費(fèi)的結(jié)束時(shí)間位點(diǎn)。該參數(shù)是時(shí)間范圍(左閉右開)的右邊界,yyyyMMddHHmmss格式的時(shí)間字符串,可以和DataWorks的調(diào)度時(shí)間參數(shù)配合使用。 說明 beginDateTime和endDateTime需要互相組合配套使用。 | 是 |
Writer腳本Demo
{
"type": "job",
"version": "2.0",//版本號(hào)。
"steps": [
{
"stepType": "stream",
"parameter": {},
"name": "Reader",
"category": "reader"
},
{
"stepType": "datahub",//插件名。
"parameter": {
"datasource": "",//數(shù)據(jù)源。
"topic": "",//Topic是DataHub訂閱和發(fā)布的最小單位,您可以用Topic來表示一類或者一種流數(shù)據(jù)。
"maxRetryCount": 500,//任務(wù)失敗的重試的最多次數(shù)。
"maxCommitSize": 1048576//待積累的數(shù)據(jù)Buffer大小達(dá)到maxCommitSize大小(單位Byte)時(shí),批量提交至目的端。
//datahub側(cè)對(duì)于一次request請(qǐng)求寫入的數(shù)據(jù)條數(shù)限制是10000條,超出10000條數(shù)據(jù)會(huì)超出限制導(dǎo)致任務(wù)出錯(cuò),請(qǐng)根據(jù)您單條數(shù)據(jù)平均數(shù)據(jù)量*10000條數(shù)據(jù)的數(shù)據(jù)總量來從側(cè)方面進(jìn)行單次寫入datahub的數(shù)據(jù)條數(shù)控制。比如每條數(shù)據(jù)10 k,那么此參數(shù)的設(shè)置值要低于10*10000 k。
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {
"record": ""http://錯(cuò)誤記錄數(shù)。
},
"speed": {
"throttle":true,//當(dāng)throttle值為false時(shí),mbps參數(shù)不生效,表示不限流;當(dāng)throttle值為true時(shí),表示限流。
"concurrent":20, //作業(yè)并發(fā)數(shù)。
"mbps":"12"http://限流,此處1mbps = 1MB/s。
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}
Writer腳本參數(shù)
參數(shù) | 描述 | 是否必選 | 默認(rèn)值 |
accessId | DataHub的accessId。 | 是 | 無 |
accessKey | DataHub的accessKey。 | 是 | 無 |
endPoint | 對(duì)DataHub資源的訪問請(qǐng)求,需要根據(jù)資源所屬服務(wù),選擇正確的域名。 詳情請(qǐng)參見域名列表。 | 是 | 無 |
maxRetryCount | 任務(wù)失敗的最多重試次數(shù)。 | 否 | 無 |
mode | Value是STRING類型時(shí),寫入的模式。 | 是 | 無 |
parseContent | 解析內(nèi)容。 | 是 | 無 |
project | 項(xiàng)目(Project)是DataHub數(shù)據(jù)的基本組織單元,一個(gè)Project下包含多個(gè)Topic。 說明 DataHub的項(xiàng)目空間與MaxCompute的項(xiàng)目相互獨(dú)立,您在MaxCompute中創(chuàng)建的項(xiàng)目不能復(fù)用于DataHub,需要單獨(dú)創(chuàng)建。 | 是 | 無 |
topic | Topic是DataHub訂閱和發(fā)布的最小單位,您可以用Topic來表示一類或者一種流數(shù)據(jù)。 詳情請(qǐng)參見Project及Topic的數(shù)量限制。 | 是 | 無 |
maxCommitSize | 為提高寫出效率,DataX會(huì)積累Buffer數(shù)據(jù),待積累的數(shù)據(jù)大小達(dá)到maxCommitSize 大小(單位Byte)時(shí),批量提交到目的端。默認(rèn)是1,048,576,即1 MB數(shù)據(jù)。另外datahub側(cè)對(duì)于一次request請(qǐng)求寫入的數(shù)據(jù)條數(shù)限制是10000條,超出10000條數(shù)據(jù)會(huì)超出限制導(dǎo)致任務(wù)出錯(cuò),請(qǐng)根據(jù)您單條數(shù)據(jù)平均數(shù)據(jù)量*10000條的數(shù)據(jù)總量來從側(cè)方面進(jìn)行寫入datahub的數(shù)據(jù)條數(shù)控制。 | 否 | 1MB |