OSS數據源為您提供讀取和寫入OSS的雙向通道,本文為您介紹DataWorks的OSS數據同步的能力支持情況。
支持的字段類型與使用限制
離線讀
OSS Reader實現了從OSS讀取數據并轉為數據集成協議的功能,OSS本身是無結構化數據存儲。對于數據集成而言,OSS Reader支持的功能如下。
支持 | 不支持 |
|
|
準備OSS數據時,如果數據為CSV文件,則必須為標準格式的CSV文件。例如,如果列內容在半角引號(")內,需要替換成兩個半角引號(""),否則會造成文件被錯誤分割。
OSS屬于非結構化數據源,里面存放的都是文件類型數據,因此在使用同步時,需要先自行確認同步的字段結構是否符合預期。同理,非結構化數據源中數據結構發生變化時必須要在任務配置中重新確認字段結構,否則可能會造成同步數據錯亂。
離線寫
OSS Writer實現了從數據同步協議轉為OSS中的文本文件功能,OSS本身是無結構化數據存儲,目前OSS Writer支持的功能如下。
支持 | 不支持 |
|
|
類型分類 | 數據集成column配置類型 |
整數類 | LONG |
字符串類 | STRING |
浮點類 | DOUBLE |
布爾類 | BOOLEAN |
日期時間類 | DATE |
實時寫
支持實時寫入的能力。
支持實時寫入Hudi格式版本:0.12.x。
創建數據源
在進行數據同步任務開發時,您需要在DataWorks上創建一個對應的數據源,操作流程請參見創建并管理數據源,詳細的配置參數解釋可在配置界面查看對應參數的文案提示。
跨賬號創建OSS數據源時需對相應的賬號進行授權,詳情可參見:基于Bucket Policy實現跨賬號訪問OSS。
如您在配置OSS數據源時使用RAM角色授權模式配置數據源,詳情可參見通過RAM角色授權模式配置數據源。
數據同步任務開發
數據同步任務的配置入口和通用配置流程可參見下文的配置指導。
單表離線同步任務配置指導
操作流程請參見通過向導模式配置離線同步任務、通過腳本模式配置離線同步任務。
腳本模式配置的全量參數和腳本Demo請參見下文的附錄:腳本Demo與參數說明。
單表實時同步任務配置指導
操作流程請參見配置單表增量數據實時同步、DataStudio側實時同步任務配置。
整庫(實時)全增量同步配置指導
操作流程請參見數據集成側同步任務配置。
常見問題
附錄:腳本Demo與參數說明
離線任務腳本配置方式
如果您配置離線任務時使用腳本模式的方式進行配置,您需要按照統一的腳本格式要求,在任務腳本中編寫相應的參數,詳情請參見通過腳本模式配置離線同步任務,以下為您介紹腳本模式下數據源的參數配置詳情。
Reader腳本Demo:通用示例
{
"type":"job",
"version":"2.0",//版本號。
"steps":[
{
"stepType":"oss",//插件名。
"parameter":{
"nullFormat":"",//定義可以表示為null的字符串。
"compress":"",//文本壓縮類型。
"datasource":"",//數據源。
"column":[//字段。
{
"index":0,//列序號。
"type":"string"http://數據類型。
},
{
"index":1,
"type":"long"
},
{
"index":2,
"type":"double"
},
{
"index":3,
"type":"boolean"
},
{
"format":"yyyy-MM-dd HH:mm:ss", //時間格式。
"index":4,
"type":"date"
}
],
"skipHeader":"",//類CSV格式文件可能存在表頭為標題情況,需要跳過。
"encoding":"",//編碼格式。
"fieldDelimiter":",",//字段分隔符。
"fileFormat": "",//文本類型。
"object":[]//object前綴。
},
"name":"Reader",
"category":"reader"
},
{
"stepType":"stream",
"parameter":{},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":""http://錯誤記錄數。
},
"speed":{
"throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
"concurrent":1 //作業并發數。
"mbps":"12",//限流,此處1mbps = 1MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
Reader腳本Demo:ORC或Parquet文件讀取OSS
目前通過復用HDFS Reader的方式完成OSS讀取ORC或Parquet格式的文件,在OSS Reader已有參數的基礎上,增加了Path、FileFormat等擴展配置參數,參數含義請參見HDFS Reader。
以ORC文件格式讀取OSS,示例如下。
{ "stepType": "oss", "parameter": { "datasource": "", "fileFormat": "orc", "path": "/tests/case61/orc__691b6815_9260_4037_9899_****", "column": [ { "index": 0, "type": "long" }, { "index": "1", "type": "string" }, { "index": "2", "type": "string" } ] } }
以Parquet文件格式讀取OSS,示例如下。
{ "type":"job", "version":"2.0", "steps":[ { "stepType":"oss", "parameter":{ "nullFormat":"", "compress":"", "fileFormat":"parquet", "path":"/*", "parquetSchema":"message m { optional BINARY registration_dttm (UTF8); optional Int64 id; optional BINARY first_name (UTF8); optional BINARY last_name (UTF8); optional BINARY email (UTF8); optional BINARY gender (UTF8); optional BINARY ip_address (UTF8); optional BINARY cc (UTF8); optional BINARY country (UTF8); optional BINARY birthdate (UTF8); optional DOUBLE salary; optional BINARY title (UTF8); optional BINARY comments (UTF8); }", "column":[ { "index":"0", "type":"string" }, { "index":"1", "type":"long" }, { "index":"2", "type":"string" }, { "index":"3", "type":"string" }, { "index":"4", "type":"string" }, { "index":"5", "type":"string" }, { "index":"6", "type":"string" }, { "index":"7", "type":"string" }, { "index":"8", "type":"string" }, { "index":"9", "type":"string" }, { "index":"10", "type":"double" }, { "index":"11", "type":"string" }, { "index":"12", "type":"string" } ], "skipHeader":"false", "encoding":"UTF-8", "fieldDelimiter":",", "fieldDelimiterOrigin":",", "datasource":"wpw_demotest_oss", "envType":0, "object":[ "wpw_demo/userdata1.parquet" ] }, "name":"Reader", "category":"reader" }, { "stepType":"odps", "parameter":{ "partition":"dt=${bizdate}", "truncate":true, "datasource":"0_odps_wpw_demotest", "envType":0, "column":[ "id" ], "emptyAsNull":false, "table":"wpw_0827" }, "name":"Writer", "category":"writer" } ], "setting":{ "errorLimit":{ "record":"" }, "locale":"zh_CN", "speed":{ "throttle":false, "concurrent":2 } }, "order":{ "hops":[ { "from":"Reader", "to":"Writer" } ] } }
Reader腳本參數
參數 | 描述 | 是否必選 | 默認值 |
datasource | 數據源名稱,腳本模式支持添加數據源,此配置項填寫的內容必須要與添加的數據源名稱保持一致。 | 是 | 無 |
Object | OSS的Object信息,此處可以支持填寫多個Object。例如xxx的bucket中有yunshi文件夾,文件夾中有ll.txt文件,則Object直接填yunshi/ll.txt。 支持使用調度參數并配合調度,靈活生成Object文件名稱與路徑。
說明
| 是 | 無 |
parquetSchema | 以Parquet文件格式讀取OSS時配置,當且僅當fileFormat為parquet時生效,具體表示parquet存儲的類型說明。您需要確保填寫parquetSchema后,整體配置符合JSON語法。
parquetSchema的配置格式說明如下:
配置示例如下所示。
| 否 | 無 |
column | 讀取字段列表,type指定源數據的類型,index指定當前列來自于文本第幾列(以0開始),value指定當前類型為常量,不是從源頭文件讀取數據,而是根據value值自動生成對應的列。 默認情況下,您可以全部按照String類型讀取數據,配置如下。
您可以指定column字段信息,配置如下。
說明 對于您指定的column信息,type必須填寫,index/value必須選擇其一。 | 是 | 全部按照STRING類型讀取。 |
fileFormat | 文本類型。源頭OSS的文件類型。例如csv、text,兩種格式均支持自定義分隔符。 | 是 | csv |
fieldDelimiter | 讀取的字段分隔符。 說明 OSS Reader在讀取數據時,需要指定字段分割符,如果不指定默認為(,),界面配置中也會默認填寫為(,)。 如果分隔符不可見,請填寫Unicode編碼。例如,\u001b、\u007c。 | 是 | , |
lineDelimiter | 讀取的行分隔符。 說明 當fileFormat取值為text時,本參數有效。 | 否 | 無 |
compress | 文本壓縮類型,默認不填寫(即不壓縮)。支持壓縮類型為gzip、bzip2和zip。 | 否 | 不壓縮 |
encoding | 讀取文件的編碼配置。 | 否 | utf-8 |
nullFormat | 文本文件中無法使用標準字符串定義null(空指針),數據同步提供nullFormat定義哪些字符串可以表示為null。 例如:
| 否 | 無 |
skipHeader | 類CSV格式文件可能存在表頭為標題情況,需要跳過。默認不跳過,壓縮文件模式下不支持skipHeader。 | 否 | false |
csvReaderConfig | 讀取CSV類型文件參數配置,Map類型。讀取CSV類型文件使用的CsvReader進行讀取,會有很多配置,不配置則使用默認值。 | 否 | 無 |
Writer腳本Demo:通用示例
{
"type":"job",
"version":"2.0",
"steps":[
{
"stepType":"stream",
"parameter":{},
"name":"Reader",
"category":"reader"
},
{
"stepType":"oss",//插件名。
"parameter":{
"nullFormat":"",//數據同步系統提供nullFormat,定義哪些字符串可以表示為null。
"dateFormat":"",//日期格式。
"datasource":"",//數據源。
"writeMode":"",//寫入模式。
"writeSingleObject":"false", //表示是否將同步數據寫入單個oss文件。
"encoding":"",//編碼格式。
"fieldDelimiter":","http://字段分隔符。
"fileFormat":"",//文本類型。
"object":""http://Object前綴。
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"http://錯誤記錄數。
},
"speed":{
"throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
"concurrent":1, //作業并發數。
"mbps":"12"http://限流,此處1mbps = 1MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
Writer腳本Demo:ORC或Parquet文件寫入OSS腳本配置demo
目前通過復用HDFS Writer的方式完成OSS寫ORC或Parquet格式的文件。在OSS Writer已有參數的基礎上,增加了Path、FileFormat等擴展配置參數,參數含義請參見HDFS Writer。
ORC或Parquet文件寫入OSS的示例如下:
以下僅為示例,請根據您自己具體的列名稱和類型修改對應的參數,請勿直接復制使用。
以ORC文件格式寫入OSS
寫ORC文件,當前僅支持腳本模式,您需要轉腳本模式配置,其中fileFormat需要配置為
orc
,path需要配置為寫入文件的路徑,column配置格式為{"name":"your column name","type": "your column type"}
。當前支持寫入的ORC類型如下:
字段類型
離線寫OSS(ORC格式)
TINYINT
支持
SMALLINT
支持
INT
支持
BIGINT
支持
FLOAT
支持
DOUBLE
支持
TIMESTAMP
支持
DATE
支持
VARCHAR
支持
STRING
支持
CHAR
支持
BOOLEAN
支持
DECIMAL
支持
BINARY
支持
{ "stepType": "oss", "parameter": { "datasource": "", "fileFormat": "orc", "path": "/tests/case61", "fileName": "orc", "writeMode": "append", "column": [ { "name": "col1", "type": "BIGINT" }, { "name": "col2", "type": "DOUBLE" }, { "name": "col3", "type": "STRING" } ], "writeMode": "append", "fieldDelimiter": "\t", "compress": "NONE", "encoding": "UTF-8" } }
以Parquet文件格式寫入OSS
{ "stepType": "oss", "parameter": { "datasource": "", "fileFormat": "parquet", "path": "/tests/case61", "fileName": "test", "writeMode": "append", "fieldDelimiter": "\t", "compress": "SNAPPY", "encoding": "UTF-8", "parquetSchema": "message test { required int64 int64_col;\n required binary str_col (UTF8);\nrequired group params (MAP) {\nrepeated group key_value {\nrequired binary key (UTF8);\nrequired binary value (UTF8);\n}\n}\nrequired group params_arr (LIST) {\nrepeated group list {\nrequired binary element (UTF8);\n}\n}\nrequired group params_struct {\nrequired int64 id;\n required binary name (UTF8);\n }\nrequired group params_arr_complex (LIST) {\nrepeated group list {\nrequired group element {\n required int64 id;\n required binary name (UTF8);\n}\n}\n}\nrequired group params_complex (MAP) {\nrepeated group key_value {\nrequired binary key (UTF8);\nrequired group value {\nrequired int64 id;\n required binary name (UTF8);\n}\n}\n}\nrequired group params_struct_complex {\nrequired int64 id;\n required group detail {\nrequired int64 id;\n required binary name (UTF8);\n}\n}\n}", "dataxParquetMode": "fields" } }
Writer腳本參數
參數 | 描述 | 是否必選 | 默認值 |
datasource | 數據源名稱,腳本模式支持添加數據源,該配置項填寫的內容必須與添加的數據源名稱保持一致。 | 是 | 無 |
object | OSS Writer寫入的文件名,OSS使用文件名模擬目錄的實現。OSS對于Object的名稱有以下限制:
如果您不需要后綴隨機UUID,建議您配置 | 是 | 無 |
writeMode | OSS Writer寫入前,數據的處理:
| 是 | 無 |
writeSingleObject | OSS寫數據時,是否寫單個文件:
說明 當寫入ORC、parquet類型數據時,writeSingleObject參數不生效,即使用該參數無法在多并發場景下,寫入單個ORC或parquet文件。若要寫入單個文件,您可以將并發設置為1,但文件名會添加隨機后綴,并且設置并發為1時,將影響同步任務的速度。 | 否 | false |
fileFormat | 文件寫出的格式,支持以下幾種格式:
| 否 | text |
compress | 寫入OSS的數據文件的壓縮格式(需使用腳本模式任務配置)。 說明 csv、text文本類型不支持壓縮,parquet/orc文件支持gzip、snappy等壓縮。 | 否 | 無 |
fieldDelimiter | 寫入的字段分隔符。 | 否 | , |
encoding | 寫出文件的編碼配置。 | 否 | utf-8 |
parquetSchema | 以Parquet文件格式寫入OSS的必填項,用來描述目標文件的結構,所以此項當且僅當fileFormat為parquet時生效,格式如下。
配置項說明如下:
說明 每行列設置必須以分號結尾,最后一行也要寫上分號。 示例如下。
| 否 | 無 |
nullFormat | 文本文件中無法使用標準字符串定義null(空指針),數據同步系統提供nullFormat定義可以表示為null的字符串。例如,您配置 | 否 | 無 |
header | OSS寫出時的表頭,例如, | 否 | 無 |
maxFileSize(高級配置,向導模式不支持) | OSS寫出時單個Object文件的最大值,默認為10,000*10MB,類似于在打印log4j日志時,控制日志文件的大小。OSS分塊上傳時,每個分塊大小為10MB(也是日志輪轉文件最小粒度,即小于10MB的maxFileSize會被作為10MB),每個OSS InitiateMultipartUploadRequest支持的分塊最大數量為10,000。 輪轉發生時,Object名字規則是在原有Object前綴加UUID隨機數的基礎上,拼接_1,_2,_3等后綴。 說明 默認單位為MB。 配置示例:"maxFileSize":300, 表示設置單個文件大小為300M。 | 否 | 100,000 |
suffix(高級配置,向導模式不支持) | 數據同步寫出時,生成的文件名后綴。例如,配置suffix為.csv,則最終寫出的文件名為fileName****.csv。 | 否 | 無 |
附錄:parquet類型數據的轉化策略
如果您沒有配置parquetSchema,那么DataWorks側會根據源端字段類型,按照一定的策略進行相應數據類型轉換,轉換策略如下。
轉換后的數據類型 | Parquet type | Parquet logical type |
CHAR / VARCHAR / STRING | BINARY | UTF8 |
BOOLEAN | BOOLEAN | 不涉及 |
BINARY / VARBINARY | BINARY | 不涉及 |
DECIMAL | FIXED_LEN_BYTE_ARRAY | DECIMAL |
TINYINT | INT32 | INT_8 |
SMALLINT | INT32 | INT_16 |
INT/INTEGER | INT32 | 不涉及 |
BIGINT | INT64 | 不涉及 |
FLOAT | FLOAT | 不涉及 |
DOUBLE | DOUBLE | 不涉及 |
DATE | INT32 | DATE |
TIME | INT32 | TIME_MILLIS |
TIMESTAMP/DATETIME | INT96 | 不涉及 |