日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

原生SDK概述

本文為您介紹較為常用的MapReduce核心接口。

如果您使用Maven,可以從Maven庫中搜索odps-sdk-mapred獲取不同版本的Java SDK,相關配置信息如下。

<dependency>
    <groupId>com.aliyun.odps</groupId>
    <artifactId>odps-sdk-mapred</artifactId>
    <version>0.40.10-public</version>
</dependency>

數據類型

MapReduce支持的數據類型為BIGINT、STRING、DOUBLE、BOOLEAN、DATETIME和DECIMAL類型。MaxCompute數據類型與Java數據類型的對應關系如下。

MaxCompute SQL Type

Java Type

BIGINT

LONG

STRING

STRING

DOUBLE

DOUBLE

BOOLEAN

BOOLEAN

DATETIME

DATE

DECIMAL

BIGDECIMAL

MapReduce主要接口

主要接口

描述

MapperBase

用戶自定義的Map函數需要繼承自此類。處理輸入表的記錄對象,加工處理成鍵值對集合輸出到Reduce階段,或者不經過Reduce階段直接輸出結果記錄到結果表。不經過Reduce階段而直接輸出計算結果的作業,也可稱之為MapOnly作業。

ReducerBase

用戶自定義的Reduce函數需要繼承自此類。對與一個鍵(Key)關聯的一組數值集(Values)進行歸約計算。

TaskContext

是MapperBase及ReducerBase多個成員函數的輸入參數之一,含有任務運行的上下文信息。

JobClient

用于提交和管理作業,提交方式包括阻塞(同步)方式及非阻塞(異步) 方式。

RunningJob

作業運行時對象,用于跟蹤運行中的MapReduce作業實例。

JobConf

描述一個MapReduce任務的配置,通常在主程序(main函數)中定義JobConf對象,然后通過JobClient提交作業給MaxCompute服務。

MapperBase

主要函數接口。

主要接口

描述

void cleanup(TaskContext context)

在Map階段結束時,map方法之后調用。

void map(long key, Record record, TaskContext context)

map方法,處理輸入表的記錄。

void setup(TaskContext context)

在Map階段開始時,map方法之前調用。

ReducerBase

主要函數接口。

主要接口

描述

void cleanup( TaskContext context)

在Reduce階段結束時,reduce方法之后調用。

void reduce(Record key, Iterator<Record > values, TaskContext context)

reduce方法,處理輸入表的記錄。

void setup( TaskContext context)

在Reduce階段開始時,reduce方法之前調用。

TaskContext

主要函數接口。

主要接口

描述

TableInfo[] getOutputTableInfo()

獲取輸出的表信息。

Record createOutputRecord()

創建默認輸出表的記錄對象。

Record createOutputRecord(String label)

創建給定label輸出表的記錄對象。

Record createMapOutputKeyRecord()

創建Map輸出Key的記錄對象。

Record createMapOutputValueRecord()

創建Map輸出Value的記錄對象。

void write(Record record)

寫記錄到默認輸出,用于Reduce端寫出數據,可以在Reduce端多次調用。

void write(Record record, String label)

寫記錄到給定label輸出,用于Reduce端寫出數據。可以在 Reduce端多次調用。

void write(Record key, Record value)

Map寫記錄到中間結果,可以在Map函數中多次調用。 可以在Map端多次調用。

BufferedInputStream readResourceFileAsStream(String resourceName)

讀取文件類型資源。

Iterator<Record > readResourceTable(String resourceName)

讀取表類型資源。

Counter getCounter(Enum<? > name)

獲取給定名稱的Counter對象。

Counter getCounter(String group, String name)

獲取給定組名和名稱的Counter對象。

void progress()

向MapReduce框架報告心跳信息。 如果用戶方法處理時間很長,且中間沒有調用框架,可以調用這個方法避免task超時,框架默認600秒超時。

  • MaxCompute的TaskContext接口中提供了progress功能,但此功能是為防止Worker長時間運行未結束,被框架誤認為超時而被殺的情況出現。此接口更類似于向框架發送心跳信息,并不是用來匯報Worker進度。

  • MaxCompute MapReduce默認Worker超時時間為10分鐘(系統默認配置,不受用戶控制),如果超過10分鐘,Worker仍然沒有向框架發送心跳(調用progress接口),框架會強制停止該Worker,MapReduce任務失敗退出。因此,建議您在Mapper/Reducer函數中,定期調用progress接口,防止框架認為Worker超時,誤殺任務。

JobConf

主要函數接口。

主要接口

描述

void setResources(String resourceNames)

聲明本作業使用的資源。只有聲明的資源才能在運行Mapper/Reducer時通過TaskContext對象讀取。

void setMapOutputKeySchema(Column[] schema)

設置Mapper輸出到Reducer的Key屬性。

void setMapOutputValueSchema(Column[] schema)

設置Mapper輸出到Reducer的Value屬性。

void setOutputKeySortColumns(String[] cols)

設置Mapper輸出到Reducer的Key排序列。

void setOutputGroupingColumns(String[] cols)

設置Key分組列。

void setMapperClass(Class<? extends Mapper > theClass)

設置作業的Mapper函數。

void setPartitionColumns(String[] cols)

設置作業指定的分區列。默認是Mapper輸出Key的所有列。

void setReducerClass(Class<? extends Reducer > theClass)

設置作業的Reducer。

void setCombinerClass(Class<? extends Reducer > theClass)

設置作業的combiner。在Map端運行,作用類似于單個Map對本地的相同Key值做Reduce。

void setSplitSize(long size)

設置分片大小,單位MB,默認值256。

void setNumReduceTasks(int n)

設置Reducer任務數,默認為Mapper任務數的1/4。

void setMemoryForMapTask(int mem)

設置Mapper任務中單個Worker的內存大小,單位MB, 默認值2048。

void setMemoryForReduceTask(int mem)

設置Reducer任務中單個Worker的內存大小,單位MB, 默認值 2048。

  • 通常情況下,GroupingColumns包含在KeySortColumns中,KeySortColumns和PartitionColumns要包含在Key中。

  • 在Map端,Mapper輸出的Record會根據設置的PartitionColumns計算哈希值,決定分配到哪個Reducer,會根據KeySortColumns對Record進行排序。

  • 在Reduce端,輸入Records,再按照KeySortColumns排序后,會根據GroupingColumns指定的列對輸入的Records進行分組,即會順序遍歷輸入的Records,把GroupingColumns所指定列相同的Records作為一次reduce函數調用的輸入。

JobClient

主要函數接口。

主要接口

描述

static RunningJob runJob(JobConf job)

阻塞(同步)方式提交MapReduce作業后立即返回。

static RunningJob submitJob(JobConf job)

非阻塞(異步)方式提交MapReduce作業后立即返回。

RunningJob

主要函數接口。

主要接口

描述

String getInstanceID()

獲取作業運行實例ID,用于查看運行日志和作業管理。

boolean isComplete()

查詢作業是否結束。

boolean isSuccessful()

查詢作業實例是否運行成功。

void waitForCompletion()

等待直至作業實例結束。一般用于異步方式提交的作業。

JobStatus getJobStatus()

查詢作業實例運行狀態。

void killJob()

結束此作業。

Counters getCounters()

獲取Conter信息。

InputUtils

主要函數接口。

主要接口

描述

static void addTable(TableInfo table, JobConf conf)

添加表table到任務輸入,可以被調用多次 ,新加入的表以append方式添加到輸入隊列中。

static void setTables(TableInfo [] tables, JobConf conf)

添加多張表到任務輸入中。

OutputUtils

主要函數接口。

主要接口

描述

static void addTable(TableInfo table, JobConf conf)

添加表table到任務輸出,可以被調用多次 ,新加入的表以append方式添加到輸出隊列中。

static void setTables(TableInfo[] tables, JobConf conf)

添加多張表到任務輸出中。

Pipeline

Pipeline是MR2的主體類。可以通過Pipeline.builder構建一個Pipeline。Pipeline的主要接口如下。

    public Builder addMapper(Class<? extends Mapper> mapper)
    public Builder addMapper(Class<? extends Mapper> mapper,
           Column[] keySchema, Column[] valueSchema, String[] sortCols,
           SortOrder[] order, String[] partCols,
           Class<? extends Partitioner> theClass, String[] groupCols)
    public Builder addReducer(Class<? extends Reducer> reducer)
    public Builder addReducer(Class<? extends Reducer> reducer,
           Column[] keySchema, Column[] valueSchema, String[] sortCols,
           SortOrder[] order, String[] partCols,
           Class<? extends Partitioner> theClass, String[] groupCols)
    public Builder setOutputKeySchema(Column[] keySchema)
    public Builder setOutputValueSchema(Column[] valueSchema)
    public Builder setOutputKeySortColumns(String[] sortCols)
    public Builder setOutputKeySortOrder(SortOrder[] order)
    public Builder setPartitionColumns(String[] partCols)
    public Builder setPartitionerClass(Class<? extends Partitioner> theClass)
    public Builder setOutputGroupingColumns(String[] cols)

示例如下。

    Job job = new Job();
    Pipeline pipeline = Pipeline.builder()
     .addMapper(TokenizerMapper.class)
     .setOutputKeySchema(
         new Column[] { new Column("word", OdpsType.STRING) })
     .setOutputValueSchema(
         new Column[] { new Column("count", OdpsType.BIGINT) })
     .addReducer(SumReducer.class)
     .setOutputKeySchema(
         new Column[] { new Column("count", OdpsType.BIGINT) })
     .setOutputValueSchema(
         new Column[] { new Column("word", OdpsType.STRING),
         new Column("count", OdpsType.BIGINT) })
     .addReducer(IdentityReducer.class).createPipeline();
    job.setPipeline(pipeline);  
    job.addInput(...)
    job.addOutput(...)
    job.submit();

如上所示,您可以在main函數中構建一個Map之后,連續接兩個Reduce的MapReduce任務。如果您比較熟悉MapReduce的基礎功能,即可輕松使用MR2。

說明
  • 建議您在使用MR2功能前,先了解MapReduce的基礎用法。

  • JobConf僅能夠配置Map后接單Reduce的MapReduce任務。