本文以Stream同步數據至DataHub的腳本模式為例,為您介紹如何通過數據集成導入離線DataHub數據。
前提條件
準備阿里云賬號,并創建賬號的訪問密鑰。詳情請參見開通DataWorks服務。
開通MaxCompute,自動產生一個默認的MaxCompute數據源,并使用主賬號登錄DataWorks。
創建工作空間,您可以在工作空間中協作完成業務流程,共同維護數據和任務等。詳情請參見創建工作空間。
背景信息
數據集成是阿里云提供的數據同步平臺。該平臺具備可跨異構數據存儲系統、可靠、安全、低成本、可彈性擴展等特點,可以為20多種數據源提供不同網絡環境下的離線數據進出通道。本文以配置DataHub數據源為例,如果您需要使用其它類型的數據源配置同步任務,請參見支持的數據源及同步方案。
操作步驟
進入數據開發頁面。
登錄DataWorks控制臺,切換至目標地域后,單擊左側導航欄的 ,在下拉框中選擇對應工作空間后單擊進入數據開發。
展開目標業務流程,右鍵單擊數據集成,選擇 。
在新建節點對話框中,輸入名稱并選擇路徑,單擊確認。
說明節點名稱的長度不能超過128個字符。
此處的路徑為創建的業務流程,具體操作請參見創建周期業務流程。
成功創建離線同步節點后,根據需要進行網絡資源配置,單擊下一步,選擇工具欄中的圖標。
單擊提示對話框中的確認,即可進入腳本模式進行開發。
單擊工具欄中的圖標。
在導入模板對話框中,選擇從來源端的Stream數據源同步至目標端的DataHub數據源的導入模板,單擊確認。
導入模板后,根據自身需求編輯代碼。
{ "type": "job", "version": "1.0", "configuration": { "setting": { "errorLimit": { "record": "0" }, "speed": { "mbps": "1",//單位:MB/s。 "concurrent": 1,//作業并發數。 "throttle": false } }, "reader": { "plugin": "stream", "parameter": { "column": [//源端列名。 { "value": "field",//列屬性。 "type": "string" }, { "value": true, "type": "bool" }, { "value": "byte string", "type": "bytes" } ], "sliceRecordCount": "100000" } }, "writer": { "plugin": "datahub", "parameter": { "datasource": "datahub",//數據源名。 "topic": "xxxx",//Topic是DataHub訂閱和發布的最小單位,您可以用Topic來表示一類或者一種流數據。 "mode": "random",//隨機寫入。 "shardId": "0",//Shard 表示對一個Topic進行數據傳輸的并發通道,每個Shard會有對應的ID。 "maxCommitSize": 524288,//為了提高寫出效率,待攢數據大小達到maxCommitSize大小(單位MB)時,批量提交到目的端。默認是1,048,576,即1MB數據。 "maxRetryCount": 500 } } } }
配置完成后,分別單擊和圖標。
說明DataHub僅支持以腳本模式導入數據。
如果需要選擇新模板,請單擊工具欄中的圖標。一旦導入新模板,原有內容會被覆蓋。
保存同步任務后,直接單擊圖標,任務會立刻運行。
您也可以單擊圖標,提交同步任務至調度系統中。調度系統會按照配置屬性,從第2天開始定時執行。