Lindorm數(shù)據(jù)源
DataWorks數(shù)據(jù)集成支持使用Lindorm Reader和Lindorm Writer插件讀取和寫(xiě)入Lindorm雙向通道的功能,本文為您介紹DataWorks的Lindorm數(shù)據(jù)讀取與寫(xiě)入能力。
使用限制
Lindorm需要在DataWorks上配置數(shù)據(jù)源,通過(guò)Lindorm Reader和Lindorm Writer插件讀取與寫(xiě)入Lindorm數(shù)據(jù)。
Lindorm Reader和LindormWriter支持使用Serverless資源組(推薦)和獨(dú)享數(shù)據(jù)集成資源組。
Lindorm 時(shí)序引擎目前不支持作為DataWorks數(shù)據(jù)集成的數(shù)據(jù)源。
LindormReader和LindormWriter的必填配置項(xiàng)configuration,可以通過(guò)Lindorm集群控制臺(tái)查看連接Lindorm的相關(guān)配置項(xiàng)進(jìn)行獲取,并以JSON格式填寫(xiě)相關(guān)信息。
說(shuō)明Lindorm為多模數(shù)據(jù)庫(kù),LindormReader和LindormWriter支持讀取table和widecolumn類型的數(shù)據(jù),關(guān)于table和widecolumn類型的詳細(xì)介紹請(qǐng)參見(jiàn)Lindorm使用文檔,您也可以通過(guò)釘釘咨詢Lindorm值班人員。
支持的字段類型
Lindorm Reader和Lindorm Writer支持大部分Lindorm類型,但也存在個(gè)別沒(méi)有支持的情況,請(qǐng)注意檢查您的數(shù)據(jù)類型。
Lindorm Reader和Lindorm Writer針對(duì)Lindorm類型的轉(zhuǎn)換列表,如下所示。
類型分類 | 數(shù)據(jù)類型 |
整數(shù)類 | INT、LONG、SHORT |
浮點(diǎn)類 | DOUBLE、FLOAT、DOUBLE |
字符串類 | STRING |
日期時(shí)間類 | DATE |
布爾類 | BOOLEAN |
二進(jìn)制類 | BINARYSTRING |
數(shù)據(jù)同步任務(wù)開(kāi)發(fā)
數(shù)據(jù)同步任務(wù)的配置入口和通用配置流程可參見(jiàn)下文的配置指導(dǎo)。
操作流程請(qǐng)參見(jiàn)通過(guò)腳本模式配置離線同步任務(wù)。
腳本模式配置的全量參數(shù)和腳本Demo請(qǐng)參見(jiàn)下文的附錄:腳本Demo與參數(shù)說(shuō)明。
附錄:腳本Demo與參數(shù)說(shuō)明
離線任務(wù)腳本配置方式
如果您配置離線任務(wù)時(shí)使用腳本模式的方式進(jìn)行配置,您需要按照統(tǒng)一的腳本格式要求,在任務(wù)腳本中編寫(xiě)相應(yīng)的參數(shù),詳情請(qǐng)參見(jiàn)通過(guò)腳本模式配置離線同步任務(wù),以下為您介紹腳本模式下數(shù)據(jù)源的參數(shù)配置詳情。
Reader腳本Demo
配置一個(gè)Lindorm Table(對(duì)應(yīng)SDK中的TableService模型)抽取數(shù)據(jù)到本地的作業(yè)。
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "lindorm", "parameter": { "mode": "FixedColumn", "caching": 128, "configuration": { //lindorm控制臺(tái)中與連接相關(guān)的配置項(xiàng),以JSON格式填寫(xiě) "lindorm.client.username": "", "lindorm.client.seedserver": "seddserver.et2sqa.tbsite.net:30020", "lindorm.client.namespace": "namespace", "lindorm.client.password": "" }, "columns": [ "id", "name", "age", "birthday", "gender" ], "envType": 1, "datasource": "_LINDORM", "namespace": "namespace", "table": "lindorm_table" }, "name": "lindormreader", "category": "reader" }, { "stepType": "mysql", "parameter": { "postSql": [], "datasource": "_IDB.TAOBAO", "session": [], "envType": 1, "columns": "columns": [ "id", "name", "age", "birthday", "gender" ], "selects": [ "where(compare(\"id\", LESS, 5))", "where(and(compare(\"id\", GREATER_OR_EQUAL, 5), compare(\"id\", LESS, 10)))", "where(compare(\"id\", GREATER_OR_EQUAL, 10))" ], "socketTimeout": 3600000, "guid": "", "writeMode": "insert", "batchSize": 1024, "encoding": "UTF-8", "table": "", "preSql": [] }, "name": "Writer", "category": "writer" } ], "setting": { "jvmOption": "", "executeMode": null, "errorLimit": { "record": "0" }, "speed": { //設(shè)置傳輸速度,單位為byte/s,DataX運(yùn)行會(huì)盡可能達(dá)到該速度但是不超過(guò)它. "byte": 1048576 } //出錯(cuò)限制 "errorLimit": { //出錯(cuò)的record條數(shù)上限,當(dāng)大于該值即報(bào)錯(cuò)。 "record": 0, //出錯(cuò)的record百分比上限 1.0表示100%,0.02表示2% "percentage": 0.02 } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }
配置一個(gè)Lindorm wideColumn(對(duì)應(yīng)SDK中的WideColumnService模型)抽取數(shù)據(jù)到本地的作業(yè)。
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "lindorm", "parameter": { "mode": "FixedColumn", "configuration": { //lindorm控制臺(tái)中與連接相關(guān)的配置項(xiàng),以JSON格式填寫(xiě) "lindorm.client.username": "", "lindorm.client.seedserver": "seddserver.et2sqa.tbsite.net:30020", "lindorm.client.namespace": "namespace", "lindorm.client.password": "" }, "columns": [ "STRING|rowkey", "INT|f:a", "DOUBLE|f:b" ], "envType": 1, "datasource": "_LINDORM", "namespace": "namespace", "tableMode": "wideColumn", "table":"yourTableName" }, "name": "lindormreader", "category": "reader" }, { "stepType": "mysql", "parameter": { "postSql": [], "datasource": "_IDB.TAOBAO", "session": [], "envType": 1, "column": [ "id", "value" ], "socketTimeout": 3600000, "guid": "", "writeMode": "insert", "batchSize": 1024, "encoding": "UTF-8", "table": "", "preSql": [] }, "name": "Writer", "category": "writer" } ], "setting": { "jvmOption": "", "executeMode": null, "errorLimit": { "record": "0" }, "speed": { //設(shè)置傳輸速度,單位為byte/s,DataX運(yùn)行會(huì)盡可能達(dá)到該速度但是不超過(guò)它。 "byte": 1048576 } //出錯(cuò)限制 "errorLimit": { //出錯(cuò)的record條數(shù)上限,當(dāng)大于該值即報(bào)錯(cuò)。 "record": 0, //出錯(cuò)的record百分比上限 1.0表示100%,0.02表示2%。 "percentage": 0.02 } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }
Reader腳本參數(shù)
參數(shù) | 描述 | 是否必選 | 默認(rèn)值 |
configuration | 表示每個(gè)lindorm集群提供給DataX客戶端連接的配置信息,可以通過(guò)lindorm集群控制臺(tái)查詢,獲取到配置信息后可以聯(lián)系lindorm數(shù)據(jù)庫(kù)管理員將其轉(zhuǎn)換為如下JSON格式:{"key1":"value1","key2":"value2"}。 例如:{"lindorm.zookeeper.quorum":"????","lindorm.zookeeper.property.clientPort":"????"} 說(shuō)明 如果是手工編寫(xiě)的JSON代碼,則需要將JSON格式中value值的雙引號(hào)轉(zhuǎn)義為\"。 | 是 | 無(wú) |
mode | 表示數(shù)據(jù)讀取模式,包括固定列模式FixedColumn和動(dòng)態(tài)列模式DynamicColumn。默認(rèn)選擇FixedColumn。 | 是 | FixedColumn |
tableMode | 包括普通表模式table和寬表模式wideColumn。默認(rèn)為table,如果選擇table模式,可不填寫(xiě)。 | 否 | 默認(rèn)不填寫(xiě) |
table | 表示所要讀取的lindorm表名。lindorm表名對(duì)大小寫(xiě)敏感。 | 是 | 無(wú) |
namespace | 表示所要讀取的lindorm表的命名空間。lindorm表的命名空間對(duì)大小寫(xiě)敏感。 | 是 | 無(wú) |
encoding | 編碼方式,取值為UTF-8或GBK。一般用于將二進(jìn)制存儲(chǔ)的lindorm byte[]類型轉(zhuǎn)換為String類型。 | 否 | UTF-8 |
caching | 一次性批量獲取的記錄數(shù)大小,該值可以極大減少數(shù)據(jù)同步系統(tǒng)與Lindorm的網(wǎng)絡(luò)交互次數(shù),并提升整體吞吐量。如果該值設(shè)置過(guò)大,會(huì)導(dǎo)致Lindorm服務(wù)端壓力過(guò)大或者數(shù)據(jù)同步運(yùn)行進(jìn)程O(píng)OM異常。 | 否 | 100 |
selects | 當(dāng)前讀取的Table類型數(shù)據(jù)不支持自動(dòng)切割分片,默認(rèn)單并發(fā)運(yùn)行,因此需要手動(dòng)配置selects參數(shù)進(jìn)行數(shù)據(jù)切片,例如:
| 否 | 無(wú) |
columns | 讀取字段列表。讀取字段列表支持列裁剪和列換序,列裁剪指可以選擇部分列進(jìn)行導(dǎo)出,列換序指可以不按照表schema信息順序進(jìn)行導(dǎo)出。
| 是 | 無(wú) |
Writer腳本Demo
配置一個(gè)數(shù)據(jù)源為MySQL,需要寫(xiě)入數(shù)據(jù)到Lindorm Table(對(duì)應(yīng)SDK中的TableService模型)的作業(yè)。
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "mysql", "parameter": { "checkSlave": true, "datasource": " ", "envType": 1, "column": [ "id", "value", "table" ], "socketTimeout": 3600000, "masterSlave": "slave", "connection": [ { "datasource": " ", "table": [] } ], "where": "", "splitPk": "", "encoding": "UTF-8", "print": true }, "name": "mysqlReader", "category": "reader" }, { "stepType": "lindorm", "parameter": { "configuration": { "lindorm.client.seedserver": "xxxxxxx:30020", "lindorm.client.username": "xxxxxx", "lindorm.client.namespace": "default", "lindorm.client.password": "xxxxxx" }, "nullMode": "skip", "datasource": "", "writeMode": "api", "envType": 1, "columns": [ "id", "name", "age", "birthday", "gender" ], "dynamicColumn": "false", "table": "lindorm_table", "encoding": "utf8" }, "name": "Writer", "category": "writer" } ], "setting": { "jvmOption": "", "executeMode": null, "speed": { //設(shè)置傳輸速度,單位為byte/s,DataX運(yùn)行會(huì)盡可能達(dá)到該速度但是不超過(guò)它。 "byte": 1048576 }, //出錯(cuò)限制 "errorLimit": { //出錯(cuò)的record條數(shù)上限,當(dāng)大于該值即報(bào)錯(cuò)。 "record": 0, //出錯(cuò)的record百分比上限 1.0表示100%,0.02表示2%。 "percentage": 0.02 } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }
配置一個(gè)數(shù)據(jù)源為MySQL,需要寫(xiě)入數(shù)據(jù)到Lindorm wideColumn(對(duì)應(yīng)SDK中的WideColumnService模型)作業(yè)。
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "mysql", "parameter": { "envType": 0, "datasource": " ", "column": [ "id", "name", "age", "birthday", "gender" ], "connection": [ { "datasource": " ", "table": [] } ], "where": "", "splitPk": "", "encoding": "UTF-8" }, "name": "Reader", "category": "reader" }, { "stepType": "lindorm", "parameter": { "configuration": { "lindorm.client.seedserver": "xxxxxxx:30020", "lindorm.client.username": "xxxxxx", "lindorm.client.namespace": "default", "lindorm.client.password": "xxxxxx" }, "writeMode": "api", "namespace": "default", "table": "xxxxxx", "encoding": "utf8", "nullMode": "skip", "dynamicColumn": "false", "caching": 128, "columns": [ "ROW|STRING", "cf:id|STRING", "cf:age|INT", "cf:birthday|STRING" ] }, "name":"Writer", "category":"writer" } ], "setting": { "jvmOption": "", "errorLimit": { "record": "0" }, "speed": { "concurrent": 3, "throttle": false } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }
Writer腳本參數(shù)
參數(shù) | 描述 | 是否必選 | 默認(rèn)值 |
configuration | 每個(gè)lindorm集群提供給DataX客戶端連接的配置信息,可以通過(guò)lindorm集群控制臺(tái)查詢,獲取到配置信息后可以聯(lián)系lindorm數(shù)據(jù)庫(kù)管理員將其轉(zhuǎn)換為如下JSON格式:{"key1":"value1","key2":"value2"}。 例如:{"lindorm.zookeeper.quorum":"????","lindorm.zookeeper.property.clientPort":"????"} 說(shuō)明 如果是手寫(xiě)的JSON代碼,則需要將雙引號(hào)轉(zhuǎn)義為\"。 | 是 | 無(wú) |
table | 表示所要寫(xiě)入的lindorm表名。lindorm表名對(duì)大小寫(xiě)敏感。 | 是 | 無(wú) |
namespace | 表示所要寫(xiě)入的lindorm表的命名空間。lindorm表的命名空間對(duì)大小寫(xiě)敏感。 | 是 | 無(wú) |
encoding | 編碼方式,取值為UTF-8或GBK。一般用于將二進(jìn)制存儲(chǔ)的lindorm byte[]類型轉(zhuǎn)換為String類型。 | 否 | UTF-8 |
columns | 寫(xiě)入字段列表。寫(xiě)入字段列表支持列裁剪和列換序,列裁剪指可以選擇部分列進(jìn)行導(dǎo)出,列換序指可以不按照表schema信息順序進(jìn)行導(dǎo)出。
| 是 | 無(wú) |
nullMode | 表示在讀取源頭數(shù)據(jù)的值為null時(shí),Lindorm Writer 中的nullMode參數(shù)可通過(guò)配置不同內(nèi)容,實(shí)現(xiàn)不同的處理方式。
| 否 | EMPTY_BYTES |