MaxCompute不提供Graph開發插件,您可以基于Eclipse開發MaxCompute Graph程序。
- 編寫Graph代碼,使用本地調試進行基本的測試。
- 進行集群調試,驗證結果。
開發示例
本節以SSSP算法為例,為您介紹如何使用Eclipse開發和調試Graph程序。
- 創建Java工程,工程名為graph_examples。
- 將MaxCompute客戶端lib目錄下的Jar包加到Eclipse工程的Java Build Path中。
- 開發MaxCompute Graph程序。
實際開發過程中,通常會先復制一個例子(例如單源最短距離),然后進行修改。在本示例中,僅修改了Package的路徑為package com.aliyun.odps.graph.example。
- 編譯打包。
在Eclipse環境中,右鍵單擊源代碼目錄(圖中的src目錄), 生成Jar包,選擇目標Jar包的保存路徑,例如D:\\odps\\clt\\odps-graph-example-sssp.jar。
- 使用MaxCompute客戶端運行SSSP,詳情請參見運行Graph。
本地調試
MaxCompute Graph支持本地調試模式,您可以使用Eclipse進行斷點調試。
- 下載一個odps-graph-local的Maven包。
- 選擇Eclipse工程,右鍵單擊Graph作業主程序(包含
main
函數)文件,配置其運行參數( )。 - 在Arguments頁簽中,設置Program arguments參數為1 sssp_in sssp_out,作為主程序的輸入參數。
- 在Arguments頁簽中,設置VM arguments參數如下。
-Dodps.runner.mode=local -Dodps.project.name=<project.name> -Dodps.end.point=<end.point> -Dodps.access.id=<access.id> -Dodps.access.key=<access.key>
- 對于本地模式(即不指定odps.end.point參數),需要在warehouse創建sssp_in、sssp_out表,為輸入表sssp_in添加數據,輸入數據如下所示。
1,"2:2,3:1,4:4" 2,"1:2,3:2,4:1" 3,"1:1,2:2,5:1" 4,"1:4,2:1,5:1" 5,"3:1,4:1"
關于warehouse的介紹請參見本地運行。
- 單擊Run,即可本地跑SSSP。
說明 參數設置可參見MaxCompute客戶端中conf/odps_config.ini的設置,上述為幾個常用參數,其他參數說明如下:
- odps.runner.mode:取值為local,本地調試功能必須指定。
- odps.project.name:指定當前Project,必須指定。
- odps.end.point:指定當前MaxCompute服務的地址,可以不指定。如果不指定,只從warehouse讀取表或資源的meta和數據,不存在則拋異常。如果指定,會先從warehouse讀取,不存在時會遠程連接MaxCompute讀取。
- odps.access.id:連接MaxCompute服務的AccessKey ID,只在指定odps.end.point時有效。
- odps.access.key:連接MaxCompute服務的AccessKey Secret,只在指定odps.end.point時有效。
- odps.cache.resources:指定使用的資源列表,效果與Jar命令的
-resources
相同。 - odps.local.warehouse:本地warehouse路徑,不指定時默認為./warehouse。
在Eclipse中本地運行SSSP的調試輸出信息,如下所示。Counters: 3 com.aliyun.odps.graph.local.COUNTER TASK_INPUT_BYTE=211 TASK_INPUT_RECORD=5 TASK_OUTPUT_BYTE=161 TASK_OUTPUT_RECORD=5 graph task finish
本地作業臨時目錄
- counters:存放作業運行的一些計數信息。
- inputs:存放作業的輸入數據,優先取自本地的warehouse,如果本地沒有,會通過MaxCompute SDK從服務端讀取(如果設置了odps.end.point),默認一個input只讀10條數據,可以通過
-Dodps.mapred.local.record.limit
參數進行修改,但是也不能超過1萬條記錄。 - outputs:存放作業的輸出數據,如果本地warehouse中存在輸出表,outputs中的結果數據在作業執行完后會覆蓋本地warehouse中對應的表。
- resources:存放作業使用的資源,與輸入類似,優先取自本地的warehouse,如果本地沒有,會通過MaxCompute SDK從服務端讀取(如果設置了odps.end.point)。
- job.xml:作業配置。
- superstep:存放每一輪迭代的持久化信息。
集群調試
通過本地的調試后,您即可提交作業到集群進行測試。
- 配置MaxCompute客戶端。
- 使用
add jar /path/work.jar -f;
命令更新Jar包。 - 使用Jar命令運行作業,查看運行日志和結果數據。
性能調優
setSplitSize(long)
:輸入表切分大小,單位MB,大于0,默認64。setNumWorkers(int)
:設置作業Worker數量,范圍為[1,1000],默認值為1。Worker數由作業輸入字節數和splitSize
決定。setWorkerCPU(int)
:Map CPU資源,100為1CPU核,[50,800]之間,默認值為200。setWorkerMemory(int)
:Map內存資源,單位MB,范圍為[256,12288],默認值為4096。setMaxIteration(int)
:設置最大迭代次數,默認為-1,小于或等于0時表示最大迭代次數不作為作業終止條件。setJobPriority(int)
:設置作業優先級,范圍為[0,9],默認值為9,數值越大優先級越小。
- 可以考慮使用
setNumWorkers
方法增加Worker數目。 - 可以考慮使用
setSplitSize
方法減少切分大小,提高作業載入數據速度。 - 加大Worker的CPU或內存。
- 設置最大迭代次數,如果有些應用結果精度要求不高,可以考慮減少迭代次數,盡快結束。
setNumWorkers
與setSplitSize
配合使用,可以提高數據的載入速度。假設setNumWorkers
為workerNum
,setSplitSize
為splitSize
,總輸入字節數為inputSize
,則輸入被切分后的塊數splitNum=inputSize/splitSize
。workerNum
和splitNum
之間的關系如下:
- 情況一:
splitNum
=workerNum
,每個Worker負責載入1個Split。 - 情況二:
splitNum
>workerNum
,每個Worker負責載入1個或多個Split。 - 情況三:
splitNum
<workerNum
,每個Worker負責載入0個或1一個Split。
因此,應調節workerNum
和splitSize
,在滿足前兩種情況時,數據載入比較快。迭代階段只需調節workerNum
。如果設置runtime partitioning
為False,則建議直接使用setSplitSize
控制Worker數量,或者保證滿足前兩種情況。當出現第三種情況時,部分Worker上的切片會為0,可以在Jar命令前使用set odps.graph.split.size=<m>; set odps.graph.worker.num=<n>;
與setNumWorkers
,和setSplitSize
等效。
- 嘗試Combiner,將這些Key對應點的消息進行本地聚合,減少消息發生。
開發人員可定義Combiner以減少存儲消息的內存和網絡數據流量,縮短作業的執行時間。
- 改進業務邏輯。
數據量大時,讀取磁盤中的數據可能耗費一部分處理時間。減少需要讀取的數據字節數可以提高總體的吞吐量,提高作業性能。您可通過以下方法進行改進:
- 減少輸入數據量:對某些決策性質的應用,處理數據采樣后子集所得到的結果只可能影響結果的精度,而并不會影響整體的準確性,因此可以考慮先對數據進行特定采樣后再導入輸入表中進行處理。
- 避免讀取用不到的字段:MaxCompute Graph框架的TableInfo類支持讀取指定的列(以列名數組方式傳入),而非整個表或表分區,這樣也可以減少輸入的數據量,提高作業性能。
內置Jar包
-libjars
帶上這些Jar包。
- commons-codec-1.3.jar
- commons-io-2.0.1.jar
- commons-lang-2.5.jar
- commons-logging-1.0.4.jar
- commons-logging-api-1.0.4.jar
- guava-14.0.jar
- json.jar
- log4j-1.2.15.jar
- slf4j-api-1.4.3.jar
- slf4j-log4j12-1.4.3.jar
- xmlenc-0.52.jar