本文為您介紹如何運行MaxCompute Graph作業。
運行作業
MaxCompute客戶端提供一個JAR命令用于運行MaxCompute Graph作業,其使用方式與MapReduce中的JAR命令相同。
使用語法如下。
Usage: jar [<GENERIC_OPTIONS>] <MAIN_CLASS> [ARGS]
-conf <configuration_file> Specify an application configuration file
-classpath <local_file_list> classpaths used to run mainClass
-D <name>=<value> Property value pair, which will be used to run mainClass
-local Run job in local mode
-resources <resource_name_list> file/table resources used in graph, seperate by comma
其中<GENERIC_OPTIONS>包括以下參數(均為可選):
-conf <configuration file>:指定JobConf配置文件。
-classpath <local_file_list>:本地執行時的classpath,主要用于指定main函數所在的JAR包。
通常,用戶更習慣于將main函數與Graph作業編寫在一個包中,例如單源最短距離。因此,在執行示例程序時,-resources及-classpath的參數中都出現了JAR包,但二者意義不同:
-resources引用的是Graph作業,運行于分布式環境中。
-classpath引用的是main函數,運行于本地,指定的JAR包路徑也是本地文件路徑。包名之間使用系統默認的文件分割符進行分割(通常,Windows系統是分號,Linux系統是冒號)。
-D <prop_name>=<prop_value>:本地執行時,<mainClass>的Java屬性可以定義多個。
-local:以本地模式執行Graph作業,主要用于程序調試。
-resources <resource_name_list>:Graph作業運行時使用的資源聲明。通常,resource_name_list中需要指定運行Graph作業所使用的資源名稱。如果您在Graph作業中讀取了其他MaxCompute資源,則這些資源名稱也需要被添加到<resource_name_list>中。資源之間使用逗號分隔。如果使用跨項目空間資源時,需要在前面加上PROJECT_NAME/resources/。示例:
-resources otherproject/resources/resfile
。
同時,您也可以直接運行Graph作業的main函數,直接將作業提交到MaxCompute,而不是通過MaxCompute客戶端提交作業。以PageRank算法為例,如下所示。
public static void main(String[] args) throws Exception {
if (args.length < 2)
printUsage();
Account account = new AliyunAccount(accessId, accessKey);
Odps odps = new Odps(account);
odps.setEndpoint(endPoint);
odps.setDefaultProject(project);
SessionState ss = SessionState.get();
ss.setOdps(odps);
ss.setLocalRun(false);
String resource = "mapreduce-examples.jar";
GraphJob job = new GraphJob();
// 將使用的JAR及其他文件添加到class cache resource,對應于JAR命令中-libjars中指定的資源。
job.addCacheResourcesToClassPath(resource);
job.setGraphLoaderClass(PageRankVertexReader.class);
job.setVertexClass(PageRankVertex.class);
job.addInput(TableInfo.builder().tableName(args[0]).build());
job.addOutput(TableInfo.builder().tableName(args[1]).build());
// default max iteration is 30
job.setMaxIteration(30);
if (args.length >= 3)
job.setMaxIteration(Integer.parseInt(args[2]));
long startTime = System.currentTimeMillis();
job.run();
System.out.println("Job Finished in "
+ (System.currentTimeMillis() - startTime) / 1000.0
+ " seconds");
}
輸入及輸出
MaxCompute Graph作業的輸入輸出限制為表,不允許您自定義輸入輸出格式。
作業輸入定義如下。
GraphJob job = new GraphJob(); job.addInput(TableInfo.builder().tableName(“tblname”).build()); //表作為輸入。 job.addInput(TableInfo.builder().tableName(“tblname”).partSpec("pt1=a/pt2=b").build()); //分區作為輸入。 //只讀取輸入表的col2和col0列,在GraphLoader的load方法中,record.get(0)得到的是col2列,順序一致。 job.addInput(TableInfo.builder().tableName(“tblname”).partSpec("pt1=a/pt2=b").build(), new String[]{"col2", "col0"});
說明支持多路輸入。
addInput框架讀取輸入表的記錄傳給用戶自定義的GraphLoader,載入圖數據。
暫時不支持分區過濾條件,更多應用限制請參見 應用限制。
作業輸出定義如下所示。
GraphJob job = new GraphJob(); //輸出表為分區表時需要給到最末一級分區 job.addOutput(TableInfo.builder().tableName("table_name").partSpec("pt1=a/pt2=b").build()); //下面的參數中,true表示覆蓋tableinfo指定的分區,即INSERT OVERWRITE語義,false表示INSERT INTO語義。 job.addOutput(TableInfo.builder().tableName("table_name").partSpec("pt1=a/pt2=b").lable("output1").build(), true);
說明支持多路輸出,通過Label標識每路輸出。
Graph作業在運行時可以通過WorkerContext的write方法寫出記錄到輸出表,多路輸出需要指定標識。
讀取資源
通過GraphJob讀取資源。
除了通過JAR命令指定Graph讀取的資源外,還可以通過GraphJob的以下兩種方法指定。
void addCacheResources(String resourceNames) void addCacheResourcesToClassPath(String resourceNames)
通過WorkerContext讀取資源。
您可以通過WorkerContext的相應上下文對象讀取資源。
public byte[] readCacheFile(String resourceName) throws IOException; public Iterable<byte[]> readCacheArchive(String resourceName) throws IOException; public Iterable<byte[]> readCacheArchive(String resourceName, String relativePath)throws IOException; public Iterable<WritableRecord> readResourceTable(String resourceName); public BufferedInputStream readCacheFileAsStream(String resourceName) throws IOException; public Iterable<BufferedInputStream> readCacheArchiveAsStream(String resourceName) throws IOException; public Iterable<BufferedInputStream> readCacheArchiveAsStream(String resourceName, String relativePath) throws IOException;
說明Graph也支持在WorkerComputer的setup方法中讀取資源,保存在Worker Value中,之后通過getWorkerValue方法獲取。
如果您需要在讀取資源的同時并行進行資源處理,可以使用上述流接口。該讀取方式可以減少內存的消耗。
更多應用限制請參見應用限制。