MongoDB數(shù)據(jù)源
MongoDB數(shù)據(jù)源為您提供讀取和寫入MongoDB雙向通道的功能,本文為您介紹DataWorks的MongoDB數(shù)據(jù)同步的能力支持情況。
支持的版本
僅支持4.x、5.x版本的MongoDB。
使用限制
數(shù)據(jù)集成支持使用MongoDB數(shù)據(jù)庫(kù)對(duì)應(yīng)賬號(hào)進(jìn)行連接,如果您使用的是云數(shù)據(jù)庫(kù)MongoDB版,默認(rèn)會(huì)有一個(gè)root賬號(hào)。出于安全策略的考慮,在添加使用MongoDB數(shù)據(jù)源時(shí),請(qǐng)避免使用root作為訪問(wèn)賬號(hào)。
如果MongoDB為分片集群,則在配置數(shù)據(jù)源時(shí),需要配置mongos地址,避免配置mongod/shard節(jié)點(diǎn)地址。否則同步任務(wù)在抽取MongoDB中數(shù)據(jù)時(shí),可能會(huì)導(dǎo)致只查詢到指定shard的數(shù)據(jù),而非預(yù)期的全集。關(guān)于mongos、mongod,詳情請(qǐng)參考mongos、mongod。
在并發(fā)大于1的情況下,同步任務(wù)配置的集合中所有
_id
字段類型必須一致(例如,_id
字段都為string類型或者ObjectId類型),否則會(huì)出現(xiàn)部分?jǐn)?shù)據(jù)無(wú)法同步的問(wèn)題。說(shuō)明并發(fā)大于1時(shí),任務(wù)拆分會(huì)使用
_id
字段進(jìn)行劃分,因而在此場(chǎng)景下_id
字段不支持混合類型。如果_id
有多種字段類型,您可以使用單并發(fā)的形式進(jìn)行數(shù)據(jù)同步,且不配置splitFactor或splitFactor配置為1。
數(shù)據(jù)集成本身不支持?jǐn)?shù)組類型,但MongoDB支持?jǐn)?shù)組類型,并且數(shù)組類型具有強(qiáng)大的索引功能。您可以通過(guò)參數(shù)的特殊配置,將字符串轉(zhuǎn)換為MongoDB中的數(shù)組。轉(zhuǎn)換類型后,即可并行寫入MongoDB。
自建MongoDB數(shù)據(jù)庫(kù)不支持公網(wǎng)訪問(wèn),僅支持阿里云內(nèi)網(wǎng)訪問(wèn)。
數(shù)據(jù)集成目前不支持在數(shù)據(jù)查詢(參數(shù)query)配置中讀取指定列的數(shù)據(jù)。
離線同步任務(wù)中,如果MongoDB無(wú)法獲取字段結(jié)構(gòu),將默認(rèn)按照6個(gè)字段生成字段映射,字段名分別為
col1
,col2
,col3
,col4
,col5
,col6
。在同步任務(wù)運(yùn)行時(shí),默認(rèn)優(yōu)先使用
splitVector
命令進(jìn)行任務(wù)分片,在部分MongoDB版本中,不支持splitVector
命令,進(jìn)而會(huì)導(dǎo)致報(bào)錯(cuò)no such cmd splitVector
,您可以在同步任務(wù)配置中,單擊按鈕,進(jìn)入腳本模式,在MongoDB的parameter配置中,增加以下參數(shù),避免使用splitVector
。"useSplitVector" : false
支持的字段類型
MongoDB Reader支持的MongoDB數(shù)據(jù)類型
數(shù)據(jù)集成支持大部分MongoDB類型,但也存在部分沒有支持的情況,請(qǐng)注意檢查您的數(shù)據(jù)類型。
對(duì)于支持讀取的數(shù)據(jù)類型,數(shù)據(jù)集成在讀取時(shí):
基本類型的數(shù)據(jù),會(huì)根據(jù)同步任務(wù)配置的讀取字段(column,詳見下文的附錄:MongoDB腳本Demo與參數(shù)說(shuō)明)中的name自動(dòng)讀取對(duì)應(yīng)path下的數(shù)據(jù),并根據(jù)數(shù)據(jù)類型做自動(dòng)轉(zhuǎn)換,您無(wú)需指定column的type屬性。
類型
離線讀(MongoDB Reader)
說(shuō)明
ObjectId
支持
對(duì)象ID類型。
Double
支持
64位浮點(diǎn)數(shù)類型。
32-bit integer
支持
32位整數(shù)。
64-bit integer
支持
64位整數(shù)。
Decimal128
支持
Decimal128類型。
說(shuō)明如果配置為嵌套類型、Combine類型,JSON序列化時(shí)會(huì)被當(dāng)做對(duì)象處理,需增加參數(shù)
decimal128OutputType
為bigDecimal
,才能輸出為decimal。String
支持
字符串類型。
Boolean
支持
布爾類型。
Timestamp
支持
時(shí)間戳類型。
說(shuō)明BsonTimestamp存儲(chǔ)的是時(shí)間戳,無(wú)需考慮時(shí)區(qū)影響,詳情請(qǐng)參見MongoDB中的時(shí)區(qū)問(wèn)題。
Date
支持
日期類型。
部分復(fù)雜類型的數(shù)據(jù),您可通過(guò)配置column的type屬性,進(jìn)行自定義處理。
類型
離線讀(MongoDB Reader)
說(shuō)明
Document
支持
嵌入文檔類型。
如果沒有配置type屬性,則直接將Document轉(zhuǎn)JSON序列化處理。
如果配置了type屬性為
document
,則屬于嵌套類型,MongoDB Reader會(huì)按path讀取Document屬性。詳細(xì)示例請(qǐng)參見下文的數(shù)據(jù)類型示例2:遞歸解析處理多層嵌套的Document。
Array
支持
數(shù)組類型。
如果type配置為
array.json
、arrays
,直接JSON序列化處理。如果type配置為
array
、document.array
,則拼接為字符串,分隔符(column中的splitter)默認(rèn)為英文逗號(hào)。
重要數(shù)據(jù)集成本身不支持?jǐn)?shù)組類型,但MongoDB支持?jǐn)?shù)組類型,并且數(shù)組類型具有強(qiáng)大的索引功能。您可以通過(guò)參數(shù)的特殊配置,將字符串轉(zhuǎn)換為MongoDB中的數(shù)組。轉(zhuǎn)換類型后,即可并行寫入MongoDB。
數(shù)據(jù)集成特殊數(shù)據(jù)類型:combine
類型 | 離線讀(MongoDB Reader) | 說(shuō)明 |
Combine | 支持 | 數(shù)據(jù)集成自定義類型。 如果type配置為 |
MongoDB Reader數(shù)據(jù)類型轉(zhuǎn)換
結(jié)合上文可見,MongoDB Reader針對(duì)MongoDB類型的轉(zhuǎn)換列表,如下表所示。
轉(zhuǎn)換后的類型分類 | MongoDB數(shù)據(jù)類型 |
LONG | INT、LONG、document.INT和document.LONG |
DOUBLE | DOUBLE和document.DOUBLE |
STRING | STRING、ARRAY、document.STRING、document.ARRAY和COMBINE |
DATE | DATE和document.DATE |
BOOLEAN | BOOL和document.BOOL |
BYTES | BYTES和document.BYTES |
MongoDB Writer數(shù)據(jù)類型轉(zhuǎn)換
類型分類 | MongoDB數(shù)據(jù)類型 |
整數(shù)類 | INT和LONG |
浮點(diǎn)類 | DOUBLE |
字符串類 | STRING和ARRAY |
日期時(shí)間類 | DATE |
布爾型 | BOOL |
二進(jìn)制類 | BYTES |
數(shù)據(jù)類型示例1:Combine類型使用示例
MongoDB Reader插件的Combine數(shù)據(jù)類型支持將MongoDB document中的多個(gè)字段合并成一個(gè)JSON串。例如,導(dǎo)入MongoDB中的字段至MaxCompute,有字段如下(下文均省略了value使用key來(lái)代替整個(gè)字段)的三個(gè)document,其中a、b是所有document均有的公共字段,x_n是不固定字段。
doc1: a b x_1 x_2
doc2: a b x_2 x_3 x_4
doc3: a b x_5
配置文件中要明確指出需要一一對(duì)應(yīng)的字段,需要合并的字段則需另取名稱(不可以與document中已存在字段同名),并指定類型為COMBINE,如下所示。
"column": [
{
"name": "a",
"type": "string",
},
{
"name": "b",
"type": "string",
},
{
"name": "doc",
"type": "combine",
}
]
最終導(dǎo)出的MaxCompute結(jié)果如下所示。
odps_column1 | odps_column2 | odps_column3 |
a | b | {x_1,x_2} |
a | b | {x_2,x_3,x_4} |
a | b | {x_5} |
使用COMBINE類型合并MongoDB Document中的多個(gè)字段后,輸出結(jié)果映射至MaxCompute時(shí)會(huì)自動(dòng)刪除公共字段,僅保留Document的特有字段。
例如,a、b為所有Document均有的公共字段,Document文件doc1: a b x_1 x_2
使用COMBINE類型合并字段后,輸出結(jié)果本應(yīng)該為{a,b,x_1,x_2},該結(jié)果映射至MaxCompute后,會(huì)刪除公共字段a和b,最終輸出的結(jié)果為{x_1,x_2}。
數(shù)據(jù)類型示例2:遞歸解析處理多層嵌套的Document
當(dāng)MongoDB中Document存在多層嵌套時(shí),可通過(guò)配置document類型進(jìn)行遞歸解析處理。示例如下:
MongoDB源端數(shù)據(jù)為:
{ "name": "name1", "a": { "b": { "c": "this is value" } } }
MongoDB列可配置為:
{"name":"_id","type":"string"} {"name":"name","type":"string"} {"name":"a.b.c","type":"document"}
如上配置,可將源端嵌套字段a.b.c的值寫入目標(biāo)端c字段中,同步任務(wù)運(yùn)行后,目標(biāo)端寫入數(shù)據(jù)為this is value
。
創(chuàng)建數(shù)據(jù)源
在進(jìn)行數(shù)據(jù)同步任務(wù)開發(fā)時(shí),您需要在DataWorks上創(chuàng)建一個(gè)對(duì)應(yīng)的數(shù)據(jù)源,操作流程請(qǐng)參見創(chuàng)建并管理數(shù)據(jù)源,詳細(xì)的配置參數(shù)解釋可在配置界面查看對(duì)應(yīng)參數(shù)的文案提示。
數(shù)據(jù)同步任務(wù)開發(fā)
數(shù)據(jù)同步任務(wù)的配置入口和通用配置流程可參見下文的配置指導(dǎo)。
單表離線同步任務(wù)配置指導(dǎo)
操作流程請(qǐng)參見通過(guò)向?qū)J脚渲秒x線同步任務(wù)、通過(guò)腳本模式配置離線同步任務(wù)。
腳本模式配置的全量參數(shù)和腳本Demo請(qǐng)參見下文的附錄:MongoDB腳本Demo與參數(shù)說(shuō)明。
單表實(shí)時(shí)同步任務(wù)配置指導(dǎo)
操作流程請(qǐng)參見配置單表增量數(shù)據(jù)實(shí)時(shí)同步、DataStudio側(cè)實(shí)時(shí)同步任務(wù)配置。
整庫(kù)級(jí)別同步任務(wù)配置指導(dǎo)
整庫(kù)離線、整庫(kù)(實(shí)時(shí))全增量、整庫(kù)(實(shí)時(shí))分庫(kù)分表等整庫(kù)級(jí)別同步任務(wù)的配置操作,請(qǐng)參見數(shù)據(jù)集成側(cè)同步任務(wù)配置。
最佳實(shí)踐
常見問(wèn)題
附錄:MongoDB腳本Demo與參數(shù)說(shuō)明
離線任務(wù)腳本配置方式
如果您配置離線任務(wù)時(shí)使用腳本模式的方式進(jìn)行配置,您需要按照統(tǒng)一的腳本格式要求,在任務(wù)腳本中編寫相應(yīng)的參數(shù),詳情請(qǐng)參見通過(guò)腳本模式配置離線同步任務(wù),以下為您介紹腳本模式下數(shù)據(jù)源的參數(shù)配置詳情。
Reader腳本Demo
配置一個(gè)從MongoDB抽取數(shù)據(jù)到本地的作業(yè),詳情請(qǐng)參見下文的參數(shù)說(shuō)明。
實(shí)際運(yùn)行時(shí),請(qǐng)刪除下述代碼中的注釋。
暫時(shí)不支持取出array中的指定元素。
{
"type":"job",
"version":"2.0",//版本號(hào)。
"steps":[
{
"category": "reader",
"name": "Reader",
"parameter": {
"datasource": "datasourceName", //數(shù)據(jù)源名稱。
"collectionName": "tag_data", //集合名稱。
"query": "", // 數(shù)據(jù)查詢過(guò)濾。
"column": [
{
"name": "unique_id", //字段名稱。
"type": "string" //字段類型。
},
{
"name": "sid",
"type": "string"
},
{
"name": "user_id",
"type": "string"
},
{
"name": "auction_id",
"type": "string"
},
{
"name": "content_type",
"type": "string"
},
{
"name": "pool_type",
"type": "string"
},
{
"name": "frontcat_id",
"type": "array",
"splitter": ""
},
{
"name": "categoryid",
"type": "array",
"splitter": ""
},
{
"name": "gmt_create",
"type": "string"
},
{
"name": "taglist",
"type": "array",
"splitter": " "
},
{
"name": "property",
"type": "string"
},
{
"name": "scorea",
"type": "int"
},
{
"name": "scoreb",
"type": "int"
},
{
"name": "scorec",
"type": "int"
},
{
"name": "a.b",
"type": "document.int"
},
{
"name": "a.b.c",
"type": "document.array",
"splitter": " "
}
]
},
"stepType": "mongodb"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"common": {
"column": {
"timeZone": "GMT+0" //時(shí)區(qū)
}
},
"errorLimit":{
"record":"0"http://錯(cuò)誤記錄數(shù)。
},
"speed":{
"throttle":true,//當(dāng)throttle值為false時(shí),mbps參數(shù)不生效,表示不限流;當(dāng)throttle值為true時(shí),表示限流。
"concurrent":1 //作業(yè)并發(fā)數(shù)。
"mbps":"12"http://限流,此處1mbps = 1MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
Reader腳本參數(shù)
參數(shù) | 描述 |
datasource | 數(shù)據(jù)源名稱,腳本模式支持添加數(shù)據(jù)源,此配置項(xiàng)填寫的內(nèi)容必須要與添加的數(shù)據(jù)源名稱保持一致。 |
collectionName | MonogoDB的集合名。 |
hint | MongoDB支持hint參數(shù),使查詢優(yōu)化器使用特定索引來(lái)完成查詢,在某些情況下,可以提高查詢性能。詳情請(qǐng)參見hint參數(shù)。示例如下:
|
column | MongoDB的文檔列名,配置為數(shù)組形式表示MongoDB的多個(gè)列。
|
batchSize | 批量獲取的記錄數(shù),該參數(shù)為選填參數(shù)。默認(rèn)值為 |
cursorTimeoutInMs | 游標(biāo)超時(shí)時(shí)間,該參數(shù)為選填參數(shù)。默認(rèn)值為 說(shuō)明
|
query | 您可以通過(guò)該配置型來(lái)限制返回MongoDB數(shù)據(jù)范圍,僅支持以下時(shí)間格式,不支持直接使用時(shí)間戳類型的格式。 說(shuō)明
常用query示例如下:
說(shuō)明 更多MongoDB的查詢語(yǔ)法請(qǐng)參見MongoDB官方文檔。 |
splitFactor | 如果存在比較嚴(yán)重的數(shù)據(jù)傾斜,可以考慮增加splitFactor,實(shí)現(xiàn)更小粒度的切分,無(wú)需增加并發(fā)數(shù)。 |
Writer腳本Demo
配置寫入MongoDB的數(shù)據(jù)同步作業(yè),詳情請(qǐng)參見下文的參數(shù)說(shuō)明。
{
"type": "job",
"version": "2.0",//版本號(hào)。
"steps": [
{
"stepType": "stream",
"parameter": {},
"name": "Reader",
"category": "reader"
},
{
"stepType": "mongodb",//插件名。
"parameter": {
"datasource": "",//數(shù)據(jù)源名。
"column": [
{
"name": "_id",//列名。
"type": "ObjectId"http://數(shù)據(jù)類型。如果replacekey為_id,則此處的type必須配置為ObjectID。如果配置為string,會(huì)無(wú)法進(jìn)行替換。
},
{
"name": "age",
"type": "int"
},
{
"name": "id",
"type": "long"
},
{
"name": "wealth",
"type": "double"
},
{
"name": "hobby",
"type": "array",
"splitter": " "
},
{
"name": "valid",
"type": "boolean"
},
{
"name": "date_of_join",
"format": "yyyy-MM-dd HH:mm:ss",
"type": "date"
}
],
"writeMode": {//寫入模式。
"isReplace": "true",
"replaceKey": "_id"
},
"collectionName": "datax_test"http://連接名稱。
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {//錯(cuò)誤記錄數(shù)。
"record": "0"
},
"speed": {
"throttle": true,//當(dāng)throttle值為false時(shí),mbps參數(shù)不生效,表示不限流;當(dāng)throttle值為true時(shí),表示限流。
"concurrent": 1,//作業(yè)并發(fā)數(shù)。
"mbps": "1"http://限流的速度,此處1mbps = 1MB/s。
},
"jvmOption": "-Xms1024m -Xmx1024m"
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}
Writer腳本參數(shù)
參數(shù) | 描述 | 是否必選 | 默認(rèn)值 |
datasource | 數(shù)據(jù)源名稱,腳本模式支持添加數(shù)據(jù)源,該配置項(xiàng)填寫的內(nèi)容必須與添加的數(shù)據(jù)源名稱保持一致。 | 是 | 無(wú) |
collectionName | MongoDB的集合名。 | 是 | 無(wú) |
column | MongoDB的文檔列名,配置為數(shù)組形式表示MongoDB的多個(gè)列。
| 是 | 無(wú) |
writeMode | 指定了傳輸數(shù)據(jù)時(shí)是否覆蓋的信息,包括isReplace和replaceKey:
說(shuō)明 當(dāng)isReplace設(shè)置為true,且將非
原因是寫入數(shù)據(jù)中,存在 | 否 | 無(wú) |
preSql | 表示數(shù)據(jù)同步寫出MongoDB前的前置操作,例如清理歷史數(shù)據(jù)等。如果preSql為空,表示沒有配置前置操作。配置preSql時(shí),需要確保preSql符合JSON語(yǔ)法要求。 | 否 | 無(wú) |
執(zhí)行數(shù)據(jù)集成作業(yè)時(shí),會(huì)首先執(zhí)行您已配置的preSql。完成preSql的執(zhí)行后,才可以進(jìn)入實(shí)際的數(shù)據(jù)寫出階段。preSql本身不會(huì)影響寫出的數(shù)據(jù)內(nèi)容。數(shù)據(jù)集成通過(guò)preSql參數(shù),可以具備冪等執(zhí)行特性。例如,您的preSql在每次任務(wù)執(zhí)行前都會(huì)清理歷史數(shù)據(jù)(根據(jù)您的業(yè)務(wù)規(guī)則進(jìn)行清理)。此時(shí),如果任務(wù)失敗,您只需要重新執(zhí)行數(shù)據(jù)集成作業(yè)即可。
preSql的格式要求如下:
需要配置type字段,表示前置操作類別,支持drop和remove,例如
"preSql":{"type":"remove"}
:drop:表示刪除集合和集合內(nèi)的數(shù)據(jù),collectionName參數(shù)配置的集合即是待刪除的集合。
remove:表示根據(jù)條件刪除數(shù)據(jù)。
json:您可以通過(guò)JSON控制待刪除的數(shù)據(jù)條件,例如
"preSql":{"type":"remove", "json":"{'operationTime':{'$gte':ISODate('${last_day}T00:00:00.424+0800')}}"}
。此處的${last_day}
為DataWorks調(diào)度參數(shù),格式為$[yyyy-mm-dd]
。您可以根據(jù)需要具體使用其它MongoDB支持的條件操作符號(hào)($gt、$lt、$gte和$lte等)、邏輯操作符(and和or等)或函數(shù)(max、min、sum、avg和ISODate等)。數(shù)據(jù)集成通過(guò)如下MongoDB標(biāo)準(zhǔn)API執(zhí)行您的數(shù)據(jù),刪除query。
query=(BasicDBObject) com.mongodb.util.JSON.parse(json); col.deleteMany(query);
說(shuō)明如果您需要條件刪除數(shù)據(jù),建議優(yōu)先使用JSON配置形式。
item:您可以在item中配置數(shù)據(jù)過(guò)濾的列名(name)、條件(condition)和列值(value)。例如
"preSql":{"type":"remove","item":[{"name":"pv","value":"100","condition":"$gt"},{"name":"pid","value":"10"}]}
。數(shù)據(jù)集成會(huì)基于您配置的item條件項(xiàng),構(gòu)造查詢query條件,進(jìn)而通過(guò)MongoDB標(biāo)準(zhǔn)API執(zhí)行刪除。例如
col.deleteMany(query);
。
不識(shí)別的preSql,無(wú)需進(jìn)行任何前置刪除操作。