本文為您介紹較為常用的MapReduce核心接口。
如果您使用Maven,可以從Maven庫中搜索odps-sdk-mapred獲取不同版本的Java SDK,相關配置信息如下。
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-mapred</artifactId>
<version>0.40.10-public</version>
</dependency>
數據類型
MapReduce支持的數據類型為BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME和DECIMAL類型。MaxCompute數據類型與Java數據類型的對應關系如下。
MaxCompute SQL Type | Java Type |
BIGINT | LONG |
STRING | STRING |
DOUBLE | DOUBLE |
BOOLEAN | BOOLEAN |
DATETIME | DATE |
DECIMAL | BIGDECIMAL |
MapReduce主要接口
主要接口 | 描述 |
MapperBase | 用戶自定義的Map函數需要繼承自此類。處理輸入表的記錄對象,加工處理成鍵值對集合輸出到Reduce階段,或者不經過Reduce階段直接輸出結果記錄到結果表。不經過Reduce階段而直接輸出計算結果的作業,也可稱之為MapOnly作業。 |
ReducerBase | 用戶自定義的Reduce函數需要繼承自此類。對與一個鍵(Key)關聯的一組數值集(Values)進行歸約計算。 |
TaskContext | 是MapperBase及ReducerBase多個成員函數的輸入參數之一,含有任務運行的上下文信息。 |
JobClient | 用于提交和管理作業,提交方式包括阻塞(同步)方式及非阻塞(異步) 方式。 |
RunningJob | 作業運行時對象,用于跟蹤運行中的MapReduce作業實例。 |
JobConf | 描述一個MapReduce任務的配置,通常在主程序(main函數)中定義JobConf對象,然后通過JobClient提交作業給MaxCompute服務。 |
MapperBase
主要函數接口。
主要接口 | 描述 |
void cleanup(TaskContext context) | 在Map階段結束時,map方法之后調用。 |
void map(long key, Record record, TaskContext context) | map方法,處理輸入表的記錄。 |
void setup(TaskContext context) | 在Map階段開始時,map方法之前調用。 |
ReducerBase
主要函數接口。
主要接口 | 描述 |
void cleanup( TaskContext context) | 在Reduce階段結束時,reduce方法之后調用。 |
void reduce(Record key, Iterator<Record > values, TaskContext context) | reduce方法,處理輸入表的記錄。 |
void setup( TaskContext context) | 在Reduce階段開始時,reduce方法之前調用。 |
TaskContext
主要函數接口。
主要接口 | 描述 |
TableInfo[] getOutputTableInfo() | 獲取輸出的表信息。 |
Record createOutputRecord() | 創建默認輸出表的記錄對象。 |
Record createOutputRecord(String label) | 創建給定label輸出表的記錄對象。 |
Record createMapOutputKeyRecord() | 創建Map輸出Key的記錄對象。 |
Record createMapOutputValueRecord() | 創建Map輸出Value的記錄對象。 |
void write(Record record) | 寫記錄到默認輸出,用于Reduce端寫出數據,可以在Reduce端多次調用。 |
void write(Record record, String label) | 寫記錄到給定label輸出,用于Reduce端寫出數據。可以在 Reduce端多次調用。 |
void write(Record key, Record value) | Map寫記錄到中間結果,可以在Map函數中多次調用。 可以在Map端多次調用。 |
BufferedInputStream readResourceFileAsStream(String resourceName) | 讀取文件類型資源。 |
Iterator<Record > readResourceTable(String resourceName) | 讀取表類型資源。 |
Counter getCounter(Enum<? > name) | 獲取給定名稱的Counter對象。 |
Counter getCounter(String group, String name) | 獲取給定組名和名稱的Counter對象。 |
void progress() | 向MapReduce框架報告心跳信息。 如果用戶方法處理時間很長,且中間沒有調用框架,可以調用這個方法避免task超時,框架默認600秒超時。 |
MaxCompute的TaskContext接口中提供了progress功能,但此功能是為防止Worker長時間運行未結束,被框架誤認為超時而被殺的情況出現。此接口更類似于向框架發送心跳信息,并不是用來匯報Worker進度。
MaxCompute MapReduce默認Worker超時時間為10分鐘(系統默認配置,不受用戶控制),如果超過10分鐘,Worker仍然沒有向框架發送心跳(調用progress接口),框架會強制停止該Worker,MapReduce任務失敗退出。因此,建議您在Mapper/Reducer函數中,定期調用progress接口,防止框架認為Worker超時,誤殺任務。
JobConf
主要函數接口。
主要接口 | 描述 |
void setResources(String resourceNames) | 聲明本作業使用的資源。只有聲明的資源才能在運行Mapper/Reducer時通過TaskContext對象讀取。 |
void setMapOutputKeySchema(Column[] schema) | 設置Mapper輸出到Reducer的Key屬性。 |
void setMapOutputValueSchema(Column[] schema) | 設置Mapper輸出到Reducer的Value屬性。 |
void setOutputKeySortColumns(String[] cols) | 設置Mapper輸出到Reducer的Key排序列。 |
void setOutputGroupingColumns(String[] cols) | 設置Key分組列。 |
void setMapperClass(Class<? extends Mapper > theClass) | 設置作業的Mapper函數。 |
void setPartitionColumns(String[] cols) | 設置作業指定的分區列。默認是Mapper輸出Key的所有列。 |
void setReducerClass(Class<? extends Reducer > theClass) | 設置作業的Reducer。 |
void setCombinerClass(Class<? extends Reducer > theClass) | 設置作業的combiner。在Map端運行,作用類似于單個Map對本地的相同Key值做Reduce。 |
void setSplitSize(long size) | 設置分片大小,單位MB,默認值256。 |
void setNumReduceTasks(int n) | 設置Reducer任務數,默認為Mapper任務數的1/4。 |
void setMemoryForMapTask(int mem) | 設置Mapper任務中單個Worker的內存大小,單位MB, 默認值2048。 |
void setMemoryForReduceTask(int mem) | 設置Reducer任務中單個Worker的內存大小,單位MB, 默認值 2048。 |
通常情況下,GroupingColumns包含在KeySortColumns中,KeySortColumns和PartitionColumns要包含在Key中。
在Map端,Mapper輸出的Record會根據設置的PartitionColumns計算哈希值,決定分配到哪個Reducer,會根據KeySortColumns對Record進行排序。
在Reduce端,輸入Records,再按照KeySortColumns排序后,會根據GroupingColumns指定的列對輸入的Records進行分組,即會順序遍歷輸入的Records,把GroupingColumns所指定列相同的Records作為一次reduce函數調用的輸入。
JobClient
主要函數接口。
主要接口 | 描述 |
static RunningJob runJob(JobConf job) | 阻塞(同步)方式提交MapReduce作業后立即返回。 |
static RunningJob submitJob(JobConf job) | 非阻塞(異步)方式提交MapReduce作業后立即返回。 |
RunningJob
主要函數接口。
主要接口 | 描述 |
String getInstanceID() | 獲取作業運行實例ID,用于查看運行日志和作業管理。 |
boolean isComplete() | 查詢作業是否結束。 |
boolean isSuccessful() | 查詢作業實例是否運行成功。 |
void waitForCompletion() | 等待直至作業實例結束。一般用于異步方式提交的作業。 |
JobStatus getJobStatus() | 查詢作業實例運行狀態。 |
void killJob() | 結束此作業。 |
Counters getCounters() | 獲取Conter信息。 |
InputUtils
主要函數接口。
主要接口 | 描述 |
static void addTable(TableInfo table, JobConf conf) | 添加表table到任務輸入,可以被調用多次 ,新加入的表以append方式添加到輸入隊列中。 |
static void setTables(TableInfo [] tables, JobConf conf) | 添加多張表到任務輸入中。 |
OutputUtils
主要函數接口。
主要接口 | 描述 |
static void addTable(TableInfo table, JobConf conf) | 添加表table到任務輸出,可以被調用多次 ,新加入的表以append方式添加到輸出隊列中。 |
static void setTables(TableInfo[] tables, JobConf conf) | 添加多張表到任務輸出中。 |
Pipeline
Pipeline是MR2的主體類。可以通過Pipeline.builder構建一個Pipeline。Pipeline的主要接口如下。
public Builder addMapper(Class<? extends Mapper> mapper)
public Builder addMapper(Class<? extends Mapper> mapper,
Column[] keySchema, Column[] valueSchema, String[] sortCols,
SortOrder[] order, String[] partCols,
Class<? extends Partitioner> theClass, String[] groupCols)
public Builder addReducer(Class<? extends Reducer> reducer)
public Builder addReducer(Class<? extends Reducer> reducer,
Column[] keySchema, Column[] valueSchema, String[] sortCols,
SortOrder[] order, String[] partCols,
Class<? extends Partitioner> theClass, String[] groupCols)
public Builder setOutputKeySchema(Column[] keySchema)
public Builder setOutputValueSchema(Column[] valueSchema)
public Builder setOutputKeySortColumns(String[] sortCols)
public Builder setOutputKeySortOrder(SortOrder[] order)
public Builder setPartitionColumns(String[] partCols)
public Builder setPartitionerClass(Class<? extends Partitioner> theClass)
public Builder setOutputGroupingColumns(String[] cols)
示例如下。
Job job = new Job();
Pipeline pipeline = Pipeline.builder()
.addMapper(TokenizerMapper.class)
.setOutputKeySchema(
new Column[] { new Column("word", OdpsType.STRING) })
.setOutputValueSchema(
new Column[] { new Column("count", OdpsType.BIGINT) })
.addReducer(SumReducer.class)
.setOutputKeySchema(
new Column[] { new Column("count", OdpsType.BIGINT) })
.setOutputValueSchema(
new Column[] { new Column("word", OdpsType.STRING),
new Column("count", OdpsType.BIGINT) })
.addReducer(IdentityReducer.class).createPipeline();
job.setPipeline(pipeline);
job.addInput(...)
job.addOutput(...)
job.submit();
如上所示,您可以在main函數中構建一個Map之后,連續接兩個Reduce的MapReduce任務。如果您比較熟悉MapReduce的基礎功能,即可輕松使用MR2。
建議您在使用MR2功能前,先了解MapReduce的基礎用法。
JobConf僅能夠配置Map后接單Reduce的MapReduce任務。