PageRank
PageRank算法是計算網(wǎng)頁排名的經(jīng)典算法。輸入是一個有向圖G,其中頂點(diǎn)表示網(wǎng)頁。如果存在網(wǎng)頁A到網(wǎng)頁B的鏈接,則存在連接A到B的邊。
基本原理
算法的基本原理如下:
初始化:點(diǎn)值表示PageRank的rank值(DOUBLE類型)。初始時,所有點(diǎn)取值為
1/TotalNumVertices
。迭代公式:
PageRank(i)=0.15/TotalNumVertices+0.85*sum
。其中sum為所有指向i點(diǎn)的點(diǎn)(設(shè)為j)PageRank(j)/out_degree(j)
的累加值。
由算法基本原理可以看出,此算法非常適合使用MaxCompute Graph程序進(jìn)行求解。每個點(diǎn)j維護(hù)其PageRank值,每一輪迭代都將PageRank(j)/out_degree(j)
發(fā)給其鄰接點(diǎn)(向其投票)。下一輪迭代時,每個點(diǎn)根據(jù)迭代公式重新計算PageRank取值。
前提條件
您已通過編寫Graph完成測試所需的環(huán)境配置。
測試準(zhǔn)備
本測試是在MaxCompute客戶端,提交作業(yè)到集群進(jìn)行測試。您也可以先進(jìn)行本地測試,詳情請參見本地調(diào)試。
準(zhǔn)備好測試程序的JAR包,假設(shè)名字為graph-examples.jar,本地存放路徑為MaxCompute客戶端bin目錄下data\resources。
準(zhǔn)備好PageRank的測試表和資源。
創(chuàng)建測試表。
CREATE TABLE pagerank_in(vertex STRING, des_1 STRING, des_2 STRING); CREATE TABLE pagerank_out(vertex_id STRING, vertex_value DOUBLE);
添加測試資源。
-- 首次添加忽略-f覆蓋指令。 add jar data\resources\graph-examples.jar -f;
使用Tunnel將MaxCompute客戶端bin目錄下data.txt導(dǎo)入
pagerank_in
表中。tunnel upload data.txt pagerank_in;
data.txt數(shù)據(jù)如下:
1,2,4 2,1,3 4,2,3 3,1,2
測試步驟
在MaxCompute客戶端中執(zhí)行PageRank測試。
jar -resources graph-examples.jar -classpath data\resources\graph-examples.jar
com.aliyun.odps.graph.PageRank pagerank_in pagerank_out
預(yù)期結(jié)果
作業(yè)成功結(jié)束后,輸出表pagerank_out
中的內(nèi)容如下。
+------------+--------------+
| vertex_id | vertex_value |
+------------+--------------+
| 1 | 0.2781238395149928 |
| 2 | 0.3245614688676814 |
| 3 | 0.24161225195637787 |
| 4 | 0.155702636559485 |
+------------+--------------+
代碼示例
import java.io.IOException;
import org.apache.log4j.Logger;
import com.aliyun.odps.io.WritableRecord;
import com.aliyun.odps.graph.ComputeContext;
import com.aliyun.odps.graph.GraphJob;
import com.aliyun.odps.graph.GraphLoader;
import com.aliyun.odps.graph.MutationContext;
import com.aliyun.odps.graph.Vertex;
import com.aliyun.odps.graph.WorkerContext;
import com.aliyun.odps.io.DoubleWritable;
import com.aliyun.odps.io.LongWritable;
import com.aliyun.odps.io.NullWritable;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.io.Text;
import com.aliyun.odps.io.Writable;
public class PageRank {
private final static Logger LOG = Logger.getLogger(PageRank.class);
public static class PageRankVertex extends
Vertex<Text, DoubleWritable, NullWritable, DoubleWritable> {
@Override
public void compute(
ComputeContext<Text, DoubleWritable, NullWritable, DoubleWritable> context,
Iterable<DoubleWritable> messages) throws IOException {
if (context.getSuperstep() == 0) {
setValue(new DoubleWritable(1.0 / context.getTotalNumVertices()));
} else if (context.getSuperstep() >= 1) {
double sum = 0;
for (DoubleWritable msg : messages) {
sum += msg.get();
}
DoubleWritable vertexValue = new DoubleWritable(
(0.15f / context.getTotalNumVertices()) + 0.85f * sum);
setValue(vertexValue);
}
if (hasEdges()) {
context.sendMessageToNeighbors(this, new DoubleWritable(getValue()
.get() / getEdges().size()));
}
}
@Override
public void cleanup(
WorkerContext<Text, DoubleWritable, NullWritable, DoubleWritable> context)
throws IOException {
context.write(getId(), getValue());
}
}
public static class PageRankVertexReader extends
GraphLoader<Text, DoubleWritable, NullWritable, DoubleWritable> {
@Override
public void load(
LongWritable recordNum,
WritableRecord record,
MutationContext<Text, DoubleWritable, NullWritable, DoubleWritable> context)
throws IOException {
PageRankVertex vertex = new PageRankVertex();
vertex.setValue(new DoubleWritable(0));
vertex.setId((Text) record.get(0));
System.out.println(record.get(0));
for (int i = 1; i < record.size(); i++) {
Writable edge = record.get(i);
System.out.println(edge.toString());
if (!(edge.equals(NullWritable.get()))) {
vertex.addEdge(new Text(edge.toString()), NullWritable.get());
}
}
LOG.info("vertex edgs size: "
+ (vertex.hasEdges() ? vertex.getEdges().size() : 0));
context.addVertexRequest(vertex);
}
}
private static void printUsage() {
System.out.println("Usage: <in> <out> [Max iterations (default 30)]");
System.exit(-1);
}
public static void main(String[] args) throws IOException {
if (args.length < 2)
printUsage();
GraphJob job = new GraphJob();
job.setGraphLoaderClass(PageRankVertexReader.class);
job.setVertexClass(PageRankVertex.class);
job.addInput(TableInfo.builder().tableName(args[0]).build());
job.addOutput(TableInfo.builder().tableName(args[1]).build());
// default max iteration is 30
job.setMaxIteration(30);
if (args.length >= 3)
job.setMaxIteration(Integer.parseInt(args[2]));
long startTime = System.currentTimeMillis();
job.run();
System.out.println("Job Finished in "
+ (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
}
}
上述代碼,說明如下:
第 23 行:定義PageRankVertex類,其中:
點(diǎn)值表示該點(diǎn)(網(wǎng)頁)的當(dāng)前PageRank取值。
compute()方法使用迭代公式:
PageRank(i)=0.15/TotalNumVertices+0.85*sum
更新點(diǎn)值。cleanup()方法把點(diǎn)及其PageRank取值寫到結(jié)果表中。
第 55 行:定義PageRankVertexReader類,加載圖,將表中每一條記錄解析為一個點(diǎn),記錄的第一列是起點(diǎn),其他列為終點(diǎn)。
第 88 行:主程序(main函數(shù)),定義GraphJob,指定Vertex/GraphLoader等的實(shí)現(xiàn),以及最大迭代次數(shù)(默認(rèn)30),并指定輸入輸出表。