MaxCompute Graph是一套面向迭代的圖計算處理框架。圖計算作業使用圖進行建模,圖由點(Vertex)和邊(Edge)組成,點和邊包含權值(Value)。
MaxCompute Graph支持以下圖編輯操作:
- 修改點或邊的權值。
- 增加、刪除點。
- 增加、刪除邊。
說明 編輯點和邊時,需要您同時在代碼中維護點與邊的關系。
通過迭代對圖進行編輯、演化,最終求解出結果。典型應用有PageRank、單源最短距離算法、K-均值聚類算法 等。您可以使用MaxCompute Graph提供的接口Java SDK編寫圖計算程序。
基本概念
- 圖(Graph):是用于表示對象之間關聯關系的一種抽象數據結構。使用頂點(Vertex)和邊(Edge)進行描述,頂點表示對象,邊表示對象之間的關系。可以抽象為用圖描述的數據即為圖數據。
- 點(Vertex):在圖模型中用于表示對象。
- 邊(Edge):在圖模型中用于表示對象之間的關系。由源ID、目標ID和與該邊緣關聯的數據組成的單個定向邊緣。
- 有向圖:即邊有方向性的圖模型,一條邊的兩個頂點一般為不同的角色,例如頁面A連接向頁面B。有向圖中的邊分為出邊和入邊。
- 無向圖:即邊無方向性的圖模型,例如用戶組中的普通用戶。
- 出邊:指從當前頂點指向其它頂點的邊。
- 入邊:其它頂點指向當前頂點的邊。
- 度:度表示一個頂點的所有邊的數量。
- 出度:是一個頂點出邊的數量。
- 入度:是一個頂點入邊的數量。
- 超步(SuperStep):圖進行迭代計算時,一次迭代稱為一個超步。
Graph數據結構
MaxCompute Graph能夠處理的圖必須是一個由點(Vertex)和邊(Edge)組成的有向圖。由于MaxCompute僅提供二維表的存儲結構,因此需要您自行將圖數據分解為二維表存儲在MaxCompute中。您可以根據自身的業務場景進行分解。
在進行圖計算分析時,使用自定義的GraphLoader將二維表數據轉換為MaxCompute Graph引擎中的點和邊。
- 點的結構為<ID, Value, Halted, Edges>,參數分別表示:
- 點標識符(ID)
- 權值(Value)
- 狀態(Halted, 表示是否要停止迭代。)
- 出邊集合(Edges,以該點為起始點的所有邊列表。)
- 邊的結構為<DestVertexID,Value>,參數分別為:
- 目標點(DestVertexID)
- 權值(Value)
例如,上圖可以表述為以下二維表格式。
Vertex | <ID, Value, Halted, Edges> |
---|---|
v0 | <0, 0, false, [<1, 5 >, <2, 10 >]> |
v1 | <1, 5, false, [<2, 3>, <3, 2>, <5, 9>]> |
v2 | <2, 8, false, [<1, 2>, <5, 1 >]> |
v3 | <3, Long.MAX_VALUE, false, [<0, 7>, <5, 6>]> |
v5 | <5, Long.MAX_VALUE, false, [<3, 4 >]> |
Graph程序邏輯
Graph程序主要包含圖加載、迭代計算、迭代終止等處理步驟。
- 圖加載
圖加載包含兩個步驟:
- 圖加載:框架調用您自定義的GraphLoader,將輸入表的記錄解析為點或邊。
- 分布式化:框架調用您自定義的Partitioner對點進行分片(默認的分片邏輯是,根據點ID的哈希值對Worker個數取模分片),分配到相應的Worker。
- 迭代計算
一次迭代為一個超步(SuperStep),遍歷所有非結束狀態(Halted值為False)的點或者收到消息的點(處于結束狀態的點收到信息會被自動喚醒),并調用其
compute(ComputeContext context, Iterable messages)
方法。在您實現的compute(ComputeContext context, Iterable messages)
方法中:- 處理上一個超步發給當前點的消息(Messages)。
- 根據需要對圖進行編輯:
- 修改點、邊的取值。
- 發送消息給某些點。
- 增加、刪除點或邊。
- 通過Aggregator匯總信息到全局信息,詳情請參見Aggregator機制。
- 設置當前點狀態,結束或非結束狀態。
- 迭代進行過程中,框架會將消息以異步的方式發送到對應Worker,并在下一個超步進行處理,無需人工干預。
- 迭代終止
滿足以下任意一條,迭代即終止:
- 所有點處于結束狀態(Halted值為True)且沒有新消息產生。
- 達到最大迭代次數。
- 某個Aggregator的
terminate
方法返回True。
Graph程序的偽代碼描述如下所示。// 1. load for each record in input_table { GraphLoader.load(); } // 2. setup WorkerComputer.setup(); for each aggr in aggregators { aggr.createStartupValue(); } for each v in vertices { v.setup(); } // 3. superstep for (step = 0; step < max; step ++) { for each aggr in aggregators { aggr.createInitialValue(); } for each v in vertices { v.compute(); } } // 4. cleanup for each v in vertices { v.cleanup(); } WorkerComputer.cleanup();