MaxCompute不提供Graph開發插件,您可以基于Eclipse開發MaxCompute Graph程序。

開發流程如下:
  1. 編寫Graph代碼,使用本地調試進行基本的測試。
  2. 進行集群調試,驗證結果。

開發示例

本節以SSSP算法為例,為您介紹如何使用Eclipse開發和調試Graph程序。

操作步驟:
  1. 創建Java工程,工程名為graph_examples
  2. 將MaxCompute客戶端lib目錄下的Jar包加到Eclipse工程的Java Build Path中。
  3. 開發MaxCompute Graph程序。

    實際開發過程中,通常會先復制一個例子(例如單源最短距離),然后進行修改。在本示例中,僅修改了Package的路徑為package com.aliyun.odps.graph.example

  4. 編譯打包。

    在Eclipse環境中,右鍵單擊源代碼目錄(圖中的src目錄),Export > Java > JAR file生成Jar包,選擇目標Jar包的保存路徑,例如D:\\odps\\clt\\odps-graph-example-sssp.jar

  5. 使用MaxCompute客戶端運行SSSP,詳情請參見運行Graph

本地調試

MaxCompute Graph支持本地調試模式,您可以使用Eclipse進行斷點調試。

操作步驟:
  1. 下載一個odps-graph-local的Maven包。
  2. 選擇Eclipse工程,右鍵單擊Graph作業主程序(包含main函數)文件,配置其運行參數(Run As > Run Configurations)。
  3. Arguments頁簽中,設置Program arguments參數為1 sssp_in sssp_out,作為主程序的輸入參數。
  4. Arguments頁簽中,設置VM arguments參數如下。
    -Dodps.runner.mode=local
    -Dodps.project.name=<project.name>
    -Dodps.end.point=<end.point>
    -Dodps.access.id=<access.id> 
    -Dodps.access.key=<access.key>
    參數配置
  5. 對于本地模式(即不指定odps.end.point參數),需要在warehouse創建sssp_insssp_out表,為輸入表sssp_in添加數據,輸入數據如下所示。
    1,"2:2,3:1,4:4"
    2,"1:2,3:2,4:1"
    3,"1:1,2:2,5:1"
    4,"1:4,2:1,5:1"
    5,"3:1,4:1"

    關于warehouse的介紹請參見本地運行

  6. 單擊Run,即可本地跑SSSP。
    說明 參數設置可參見MaxCompute客戶端中conf/odps_config.ini的設置,上述為幾個常用參數,其他參數說明如下:
    • odps.runner.mode:取值為local,本地調試功能必須指定。
    • odps.project.name:指定當前Project,必須指定。
    • odps.end.point:指定當前MaxCompute服務的地址,可以不指定。如果不指定,只從warehouse讀取表或資源的meta和數據,不存在則拋異常。如果指定,會先從warehouse讀取,不存在時會遠程連接MaxCompute讀取。
    • odps.access.id:連接MaxCompute服務的AccessKey ID,只在指定odps.end.point時有效。
    • odps.access.key:連接MaxCompute服務的AccessKey Secret,只在指定odps.end.point時有效。
    • odps.cache.resources:指定使用的資源列表,效果與Jar命令的-resources相同。
    • odps.local.warehouse:本地warehouse路徑,不指定時默認為./warehouse
    在Eclipse中本地運行SSSP的調試輸出信息,如下所示。
    Counters: 3
             com.aliyun.odps.graph.local.COUNTER
                     TASK_INPUT_BYTE=211
                     TASK_INPUT_RECORD=5
                     TASK_OUTPUT_BYTE=161
                     TASK_OUTPUT_RECORD=5
     graph task finish
    說明 上述示例中,需要本地warehouse下有sssp_insssp_out表。sssp_insssp_out的詳細信息請參見編寫Graph(可選)

本地作業臨時目錄

每運行一次本地調試,都會在Eclipse工程目錄下新建一個臨時目錄。
本地運行的Graph作業,臨時目錄包括以下幾個目錄和文件:
  • counters:存放作業運行的一些計數信息。
  • inputs:存放作業的輸入數據,優先取自本地的warehouse,如果本地沒有,會通過MaxCompute SDK從服務端讀取(如果設置了odps.end.point),默認一個input只讀10條數據,可以通過-Dodps.mapred.local.record.limit參數進行修改,但是也不能超過1萬條記錄。
  • outputs:存放作業的輸出數據,如果本地warehouse中存在輸出表,outputs中的結果數據在作業執行完后會覆蓋本地warehouse中對應的表。
  • resources:存放作業使用的資源,與輸入類似,優先取自本地的warehouse,如果本地沒有,會通過MaxCompute SDK從服務端讀取(如果設置了odps.end.point)。
  • job.xml:作業配置。
  • superstep:存放每一輪迭代的持久化信息。
說明 如果需要本地調試時輸出詳細日志,需要在src目錄下放一個log4j的配置文件log4j.properties_odps_graph_cluster_debug

集群調試

通過本地的調試后,您即可提交作業到集群進行測試。

操作步驟如下:
  1. 配置MaxCompute客戶端。
  2. 使用add jar /path/work.jar -f;命令更新Jar包。
  3. 使用Jar命令運行作業,查看運行日志和結果數據。
說明 集群運行Graph的詳細介紹請參見編寫Graph(可選)

性能調優

對性能有所影響的Graph Job配置項如下:
  • setSplitSize(long):輸入表切分大小,單位MB,大于0,默認64。
  • setNumWorkers(int):設置作業Worker數量,范圍為[1,1000],默認值為1。Worker數由作業輸入字節數和splitSize決定。
  • setWorkerCPU(int):Map CPU資源,100為1CPU核,[50,800]之間,默認值為200。
  • setWorkerMemory(int):Map內存資源,單位MB,范圍為[256,12288],默認值為4096。
  • setMaxIteration(int):設置最大迭代次數,默認為-1,小于或等于0時表示最大迭代次數不作為作業終止條件。
  • setJobPriority(int):設置作業優先級,范圍為[0,9],默認值為9,數值越大優先級越小。
通常情況下的調優建議:
  • 可以考慮使用setNumWorkers方法增加Worker數目。
  • 可以考慮使用setSplitSize方法減少切分大小,提高作業載入數據速度。
  • 加大Worker的CPU或內存。
  • 設置最大迭代次數,如果有些應用結果精度要求不高,可以考慮減少迭代次數,盡快結束。
接口setNumWorkerssetSplitSize配合使用,可以提高數據的載入速度。假設setNumWorkersworkerNumsetSplitSizesplitSize,總輸入字節數為inputSize,則輸入被切分后的塊數splitNum=inputSize/splitSizeworkerNumsplitNum之間的關系如下:
  • 情況一:splitNum=workerNum,每個Worker負責載入1個Split。
  • 情況二:splitNum>workerNum,每個Worker負責載入1個或多個Split。
  • 情況三:splitNum<workerNum,每個Worker負責載入0個或1一個Split。

因此,應調節workerNumsplitSize,在滿足前兩種情況時,數據載入比較快。迭代階段只需調節workerNum。如果設置runtime partitioning為False,則建議直接使用setSplitSize控制Worker數量,或者保證滿足前兩種情況。當出現第三種情況時,部分Worker上的切片會為0,可以在Jar命令前使用set odps.graph.split.size=<m>; set odps.graph.worker.num=<n>; setNumWorkers,和setSplitSize等效。

另外一種常見的性能問題為數據傾斜,反應到Counters就是某些Worker處理的點或邊數量遠超過其他Worker。數據傾斜的原因通常是某些Key對應的點、邊,或者消息的數量遠超過其他Key,這些Key被分到少量的Worker處理,導致這些Worker運行時間較長。解決方法:
  • 嘗試Combiner,將這些Key對應點的消息進行本地聚合,減少消息發生。

    開發人員可定義Combiner以減少存儲消息的內存和網絡數據流量,縮短作業的執行時間。

  • 改進業務邏輯。
    數據量大時,讀取磁盤中的數據可能耗費一部分處理時間。減少需要讀取的數據字節數可以提高總體的吞吐量,提高作業性能。您可通過以下方法進行改進:
    • 減少輸入數據量:對某些決策性質的應用,處理數據采樣后子集所得到的結果只可能影響結果的精度,而并不會影響整體的準確性,因此可以考慮先對數據進行特定采樣后再導入輸入表中進行處理。
    • 避免讀取用不到的字段:MaxCompute Graph框架的TableInfo類支持讀取指定的列(以列名數組方式傳入),而非整個表或表分區,這樣也可以減少輸入的數據量,提高作業性能。

內置Jar包

以下Jar包會默認加載到運行Graph程序的JVM中,您不必上傳這些資源,也不必在命令行的-libjars帶上這些Jar包。
  • commons-codec-1.3.jar
  • commons-io-2.0.1.jar
  • commons-lang-2.5.jar
  • commons-logging-1.0.4.jar
  • commons-logging-api-1.0.4.jar
  • guava-14.0.jar
  • json.jar
  • log4j-1.2.15.jar
  • slf4j-api-1.4.3.jar
  • slf4j-log4j12-1.4.3.jar
  • xmlenc-0.52.jar
說明 在JVM的Classpath中,上述內置Jar包會位于您的Jar包之前,可能產生版本沖突。例如您的程序中使用了commons-codec-1.5.jar某個類的函數,但是這個函數不在commons-codec-1.3.jar中,這時如果1.3版本無法實現您的需求,則只能等待MaxCompute升級新版本。