MaxCompute數據源作為數據中樞,為您提供讀取和寫入數據至MaxCompute的雙向通道。
使用限制
DataWorks的MaxCompute數據源可使用Tunnel Endpoint地址訪問相應MaxCompute項目的Tunnel服務,從而通過上傳、下載等方式同步該項目的數據,使用Tunnel服務時的上傳與下載會涉及DownloadTable操作。
2023年12月11日之后創建的MaxCompute數據源,若數據源所在的DataWorks服務與需要訪問的MaxCompute項目不在同一地域,則無法直接通過Tunnel Endpoint地址同步MaxCompute項目的數據。該場景下,您需先購買云企業網,連通DataWorks服務與MaxCompute項目的網絡,網絡連通后才可跨地域執行數據同步操作。云企業網的介紹及相關操作,請參見云企業網。
離線讀
MaxCompute Reader支持讀取分區表、非分區表,不支持讀取虛擬視圖、不支持同步外部表。
離線讀MaxCompute分區表時,不支持直接對分區字段進行字段映射配置,需要在配置數據來源時指定待同步數據的分區信息。
例如,分區表t0的字段包含id、name兩個字段,一級分區為pt,二級分區為ds。讀取t0的pt=1,ds=hangzhou分區數據時,您需要在配置數據來源時指定分區值為pt=1,ds=hangzhou,后續字段映射配置時進行id、name字段的映射配置。
MaxCompute Reader支持使用WHERE進行數據過濾。
離線寫
當數據有null值時,MaxCompute Writer不支持VARCHAR類型。
如果寫入的表為
DeltaTable
,請將同步完成才可見設為是,否則在并發大于1的場景下,任務將會報錯。
實時寫
實時數據同步任務支持使用Serverless資源組(推薦)和獨享數據集成資源組。
實時同步節點目前僅支持同步PolarDB、Oracle、MySQL數據源至MaxCompute。
實時數據同步任務暫不支持同步沒有主鍵的表。
當實時同步至MaxCompute默認數據源(一般為
odps_first
)時,默認使用臨時AK進行同步,臨時AK超過7天會自動過期,同時,將導致任務運行失敗。平臺檢測到臨時AK導致任務失敗時會自動重啟任務,如果任務配置了該類型的監控報警,您將會收到報警信息。一鍵實時同步至MaxCompute任務配置當天僅能查詢歷史全量數據,增量數據需要等待第二天merge完成后才可在MaxCompute查詢。
一鍵實時同步至MaxCompute任務每天會生成一個全量分區,為避免數據過多占用存儲資源,本方案任務自動建立的MaxCompute表,默認生命周期為30天。如果時長不滿足您的業務需求,可以在配置同步任務時單擊對應的MaxCompute表名修改生命周期。
數據集成使用MaxCompute引擎同步數據通道進行數據上傳和下載(同步數據通道SLA詳情請參見數據傳輸服務(上傳)場景與工具),請根據MaxCompute引擎同步數據通道SLA評估數據同步業務技術選型。
一鍵實時同步至MaxCompute,按實例模式同步時,獨享數據集成資源組規格最低需要為8C16G。
僅支持與當前工作空間同地域的自建MaxCompute數據源,跨地域的MaxCompute項目在測試數據源服務連通性時可以正常連通,但同步任務執行時,在MaxCompute建表階段會報引擎不存在的錯誤。
說明使用自建MaxCompute數據源時,DataWorks項目仍然需要綁定MaxCompute引擎,否則將無法創建MaxCompute SQL節點,導致全量同步標done節點創建失敗。
注意事項
如果目標表列沒有配置和源端的列映射,則同步任務執行完成后,此列值為空,即使此目標列建表時指定了默認值,列值仍為空。
支持的字段類型
支持MaxCompute的1.0數據類型、2.0數據類型、Hive兼容數據類型。不同數據類型版本支持的字段類型詳情如下。
1.0數據類型支持的字段
字段類型 | 離線讀 | 離線寫 | 實時寫 |
BIGINT | 支持 | 支持 | 支持 |
DOUBLE | 支持 | 支持 | 支持 |
DECIMAL | 支持 | 支持 | 支持 |
STRING | 支持 | 支持 | 支持 |
DATETIME | 支持 | 支持 | 支持 |
BOOLEAN | 支持 | 支持 | 支持 |
ARRAY | 支持 | 支持 | 支持 |
MAP | 支持 | 支持 | 支持 |
STRUCT | 支持 | 支持 | 支持 |
2.0數據類型、Hive兼容數據類型支持的字段
字段類型 | 離線讀(MaxCompute Reader) | 離線寫(MaxCompute Writer) | 實時寫 |
TINYINT | 支持 | 支持 | 支持 |
SMALLINT | 支持 | 支持 | 支持 |
INT | 支持 | 支持 | 支持 |
BIGINT | 支持 | 支持 | 支持 |
BINARY | 支持 | 支持 | 支持 |
FLOAT | 支持 | 支持 | 支持 |
DOUBLE | 支持 | 支持 | 支持 |
DECIMAL(pecision,scale) | 支持 | 支持 | 支持 |
VARCHAR(n) | 支持 | 支持 | 支持 |
CHAR(n) | 不支持 | 支持 | 支持 |
STRING | 支持 | 支持 | 支持 |
DATE | 支持 | 支持 | 支持 |
DATETIME | 支持 | 支持 | 支持 |
TIMESTAMP | 支持 | 支持 | 支持 |
BOOLEAN | 支持 | 支持 | 支持 |
ARRAY | 支持 | 支持 | 支持 |
MAP | 支持 | 支持 | 支持 |
STRUCT | 支持 | 支持 | 支持 |
數據類型轉換說明
MaxCompute Reader針對MaxCompute的類型轉換列表,如下所示。
類型分類 | 數據集成配置類型 | 數據庫數據類型 |
整數類 | LONG | BIGINT、INT、TINYINT和SMALLINT |
布爾類 | BOOLEAN | BOOLEAN |
日期時間類 | DATE | DATETIME、TIMESTAMP和DATE |
浮點類 | DOUBLE | FLOAT、DOUBLE和DECIMAL |
二進制類 | BYTES | BINARY |
復雜類 | STRING | ARRAY、MAP和STRUCT |
如果數據轉換失敗,或數據寫出至目的端數據源失敗,則將數據作為臟數據,您可以配合臟數據限制閾值使用。
數據同步前準備
讀取或寫入MaxCompute表數據時,您可以根據需要選擇是否開啟相關屬性。
連接MaxCompute并開啟項目級配置
登錄MaxCompute客戶端,詳情請參見使用本地客戶端(odpscmd)連接。
開啟MaxCompute項目級相關配置:請確認是否已擁有對應的操作權限,您可使用Project Owner賬號執行相關操作,關于MaxCompute權限說明,詳情請參見角色規劃。
開啟acid屬性
您可以使用Project Owner賬號在客戶端執行以下命令開啟acid屬性,關于MaxCompute ACID語義說明,詳情請參見ACID語義。
setproject odps.sql.acid.table.enable=true;
(可選)開啟2.0數據類型
如果需要使用MaxCompute數據2.0類型中的timestamp類型,您需要使用Project Owner賬號在客戶端執行以下命令開啟數據2.0。
setproject odps.sql.type.system.odps2=true;
(可選)創建賬號
工作空間綁定MaxCompute引擎時,默認將在DataWorks生成一個MaxCompute數據源,在當前工作空間可使用該默認引擎數據源進行數據同步,若您需要在其他空間同步當前工作空間的MaxCompute數據源,您需要創建Accesskey ID和Accesskey Secret,以便在其他工作空間創建數據源并使用該數據源時,可基于您的身份訪問該引擎數據。
創建個人Accesskey ID和Accesskey Secret,操作詳情請參見準備阿里云賬號。
創建MaxCompute數據源,詳情請參見配置MaxCompute數據源。
創建MaxCompute數據源
進行數據同步任務開發前,您需要在DataWorks上將MaxCompute項目創建為MaxCompute數據源。創建MaxCompute數據源的詳情操作,請參見創建MaxCompute數據源。
標準模式的工作空間支持數據源隔離功能,您可以分別添加并隔離開發環境和生產環境的數據源,以保護您的數據安全。詳情請參見數據源開發和生產環境隔離。
若工作空間中名為odps_first的MaxCompute數據源非人為在數據源界面創建,則該數據源為數據源改版前,您當前工作空間綁定的第一個MaxCompute引擎默認創建的數據源。進行數據同步時,如選擇此數據源,則表示您要將數據讀取或寫入至該MaxCompute引擎項目中。
您可在數據源配置頁面,查看數據源使用的MaxCompute項目名稱,確認數據最終讀取或寫入至哪一個MaxCompute項目。詳情請參見管理數據源。
數據同步任務開發
數據同步任務的配置入口和通用配置流程可參見下文的配置指導。
單表離線同步任務配置指導
操作流程請參見通過向導模式配置離線同步任務、通過腳本模式配置離線同步任務。
腳本模式配置的全量參數和腳本Demo請參見下文的附錄:腳本Demo與參數說明。
單表實時同步任務配置指導
操作流程請參見配置單表增量數據實時同步、DataStudio側實時同步任務配置。
整庫離線、整庫(實時)全增量、整庫(實時)分庫分表等整庫級別同步配置指導
操作流程請參見數據集成側同步任務配置。
常見問題
更多其他數據集成常見問題請參見數據集成常見問題。
附錄:腳本Demo與參數說明
離線任務腳本配置方式
如果您配置離線任務時使用腳本模式的方式進行配置,您需要按照統一的腳本格式要求,在任務腳本中編寫相應的參數,詳情請參見通過腳本模式配置離線同步任務,以下為您介紹腳本模式下數據源的參數配置詳情。
Reader腳本Demo
實際運行時,請刪除下述代碼中的注釋。
{
"type":"job",
"version":"2.0",
"steps":[
{
"stepType":"odps",//插件名。
"parameter":{
"partition":[],//讀取數據所在的分區。
"isCompress":false,//是否壓縮。
"datasource":"",//數據源。
"column":[//源頭表的列信息。
"id"
],
"where": "",//使用WHERE數據過濾時,填寫具體WHERE子句內容。
"enableWhere":false,//是否使用WHERE進行數據過濾。
"table":""http://表名。
},
"name":"Reader",
"category":"reader"
},
{
"stepType":"stream",
"parameter":{
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"http://錯誤記錄數。
},
"speed":{
"throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
"concurrent":1, //作業并發數。
"mbps":"12"http://限流,此處1mbps = 1MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
如果您需要指定MaxCompute的Tunnel Endpoint,可以通過腳本模式手動配置數據源。將上述示例中的"datasource":"",
替換為數據源的具體參數,示例如下。
"accessId":"*******************",
"accessKey":"*******************",
"endpoint":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api",
"odpsServer":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api",
"tunnelServer":"http://dt.eu-central-1.maxcompute.aliyun.com",
"project":"*****",
Reader腳本參數
參數 | 描述 | 是否必選 | 默認值 |
datasource | 數據源名稱。腳本模式支持添加數據源,該配置項填寫的內容必須與添加的數據源名稱保持一致。 | 是 | 無 |
table | 讀取數據表的表名稱(大小寫不敏感)。 | 是 | 無 |
partition | 讀取的數據所在的分區信息。
例如,分區表test包含pt=1,ds=hangzhou、pt=1,ds=shanghai、pt=2,ds=hangzhou、pt=2,ds=beijing四個分區,則讀取不同分區數據的配置如下:
此外,您還可以根據實際需求設置分區數據的獲取條件:
說明
| 如果表為分區表,則必填。如果表為非分區表,則不能填寫。 | 無 |
column | 讀取MaxCompute源頭表的列信息。例如表test的字段為id、name和age:
| 是 | 無 |
Writer腳本Demo
腳本配置樣例如下。
{
"type":"job",
"version":"2.0",//版本號。
"steps":[
{
"stepType":"stream",
"parameter":{},
"name":"Reader",
"category":"reader"
},
{
"stepType":"odps",//插件名。
"parameter":{
"partition":"",//分區信息。
"truncate":true,//清理規則。
"compress":false,//是否壓縮。
"datasource":"odps_first",//數據源名。
"column": [//源端列名。
"id",
"name",
"age",
"sex",
"salary",
"interest"
],
"table":""http://表名。
},
"name":"Writer",
"category":"writer"
}
],
"setting":{
"errorLimit":{
"record":"0"http://錯誤記錄數,表示臟數據的最大容忍條數。
},
"speed":{
"throttle":true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
"concurrent":1, //作業并發數。
"mbps":"12"http://限流,此處1mbps = 1MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
如果您需要指定MaxCompute的Tunnel Endpoint,可以通過腳本模式手動配置數據源:將上述示例中的"datasource":"",
替換為數據源的具體參數,示例如下。
"accessId":"<yourAccessKeyId>",
"accessKey":"<yourAccessKeySecret>",
"endpoint":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api",
"odpsServer":"http://service.eu-central-1.maxcompute.aliyun-inc.com/api",
"tunnelServer":"http://dt.eu-central-1.maxcompute.aliyun.com",
"project":"**********",
Writer腳本參數
參數 | 描述 | 是否必選 | 默認值 |
datasource | 數據源名稱,腳本模式支持添加數據源,該配置項填寫的內容必須與添加的數據源名稱保持一致。 | 是 | 無 |
table | 寫入的數據表的表名稱(大小寫不敏感),不支持填寫多張表。 | 是 | 無 |
partition | 需要寫入數據表的分區信息,必須指定到最后一級分區。例如把數據寫入一個三級分區表,必須配置到最后一級分區,例如
| 如果表為分區表,則必填。如果表為非分區表,則不能填寫。 | 無 |
column | 需要導入的字段列表。當導入全部字段時,可以配置為
| 是 | 無 |
truncate | 通過配置 因為利用MaxCompute SQL進行數據清理工作,SQL無法保證原子性,所以truncate選項不是原子操作。當多個任務同時向一個Table或Partition清理分區時,可能出現并發時序問題,請務必注意。 針對該類問題,建議您盡量不要多個作業DDL同時操作同一個分區,或者在多個并發作業啟動前,提前創建分區。 | 是 | 無 |