日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

MapReduce模型

更新時間:

MapReduce模型是SchedulerX自主研發(fā)的輕量級分布式跑批模型。通過MapJobProcessor或MapReduceJobProcessor接口將接入的Worker組成分布式計算引擎進行大數(shù)據(jù)跑批。相對于傳統(tǒng)的大數(shù)據(jù)跑批(例如Hadoop、Spark等),MapReduce無需將數(shù)據(jù)導入大數(shù)據(jù)平臺,且無額外存儲及計算成本,即可實現(xiàn)秒級別海量數(shù)據(jù)處理,具有成本低、速度快、編程簡單等特性。

注意事項

  • 單個子任務(wù)的大小不能超過64 KB。

  • ProcessResult的result返回值不能超過1000 Byte。

  • 如果使用reduce,所有子任務(wù)結(jié)果會緩存在Master節(jié)點,該情況對Master節(jié)點內(nèi)存壓力較大,建議子任務(wù)個數(shù)和result返回值不要太大。如果沒有reduce需求,使用MapJobProcessor接口即可。

  • SchedulerX不保證子任務(wù)絕對執(zhí)行一次。在特殊條件下會failover,可能導致子任務(wù)重復(fù)執(zhí)行,需要業(yè)務(wù)方自行實現(xiàn)冪等。

接口

  • 繼承類MapJobProcessor

    接口

    解釋

    是否必選

    public ProcessResult process(JobContext context) throws Exception;

    每個子任務(wù)執(zhí)行業(yè)務(wù)的入口,需從context中獲取taskName,您需自行判斷子任務(wù)名稱。邏輯處理完成后,返回ProcessResult

    public ProcessResult map(List<? extends Object> taskList, String taskName);

    執(zhí)行map方法可以將一批子任務(wù)分布至多臺機器上執(zhí)行,可以多次執(zhí)行map方法。如果taskList為空,返回失敗。執(zhí)行完成后,返回ProcessResult

    public void kill(JobContext context);

    前端kill任務(wù)會觸發(fā)該方法,需自行實現(xiàn)如何中斷業(yè)務(wù)。

  • 繼承類MapReduceJobProcessor

    接口

    解釋

    是否必選

    public ProcessResult process(JobContext context) throws Exception;

    每個子任務(wù)執(zhí)行業(yè)務(wù)的入口,需從context里獲取taskName,您需自行判斷子任務(wù)名稱。邏輯處理完成后,返回ProcessResult

    public ProcessResult map(List<? extends Object> taskList, String taskName);

    執(zhí)行map方法可以將一批子任務(wù)分布至多臺機器上執(zhí)行,可以多次執(zhí)行map方法。如果taskList是空,返回失敗。執(zhí)行完成后,返回ProcessResult

    public ProcessResult reduce(JobContext context);

    所有Worker節(jié)點的子任務(wù)執(zhí)行完成后,會回調(diào)reduce方法。reduce由Master節(jié)點執(zhí)行,一般用于數(shù)據(jù)聚合或通知下游,也可以用于工作流的上下游數(shù)據(jù)傳遞。

    reduce方法能處理所有子任務(wù)的結(jié)果。

    1. 子任務(wù)通過return ProcessResult(true, result)返回結(jié)果(例如返回訂單號)。

    2. 執(zhí)行reduce方法時,通過context獲取所有子任務(wù)的狀態(tài)(context.getTaskStatuses())和結(jié)果(context.getTaskResults()),并進行相應(yīng)的邏輯處理。

    public void kill(JobContext context);

    前端kill任務(wù)會觸發(fā)該方法,需自行實現(xiàn)如何中斷業(yè)務(wù)。

    public boolean runReduceIfFail(JobContext context)

    當存在子任務(wù)失敗情況下,是否執(zhí)行reduce方法。默認配置為:子任務(wù)失敗時,仍然執(zhí)行reduce方法。

操作步驟

  1. 登錄分布式任務(wù)調(diào)度平臺,在左側(cè)導航欄,單擊任務(wù)管理

  2. 任務(wù)管理頁面,單擊創(chuàng)建任務(wù)

  3. 創(chuàng)建任務(wù)面板,執(zhí)行模式下拉列表選擇MapReduce,在高級配置區(qū)域配置相關(guān)信息。

    配置項

    說明

    分發(fā)策略

    說明

    需客戶端版本>=1.10.3。

    • 輪詢策略(默認):每個Worker平均分配等量子任務(wù),適用于每個子任務(wù)處理耗時基本一致的場景。

    • WorkerLoad最優(yōu)策略:由主節(jié)點自動感知Worker節(jié)點的負載情況,適用于子任務(wù)和Worker機器處理耗時有較大差異的場景。

    子任務(wù)單機并發(fā)數(shù)

    即單機執(zhí)行線程數(shù),默認為5。如需加快執(zhí)行速度,可以調(diào)大該值。如果下游或者數(shù)據(jù)庫無法承接,可適當調(diào)小。

    子任務(wù)失敗重試次數(shù)

    子任務(wù)失敗會自動重試,默認為0。

    子任務(wù)失敗重試間隔

    子任務(wù)失敗重試間隔,單位:秒,默認為0。

    子任務(wù)failover策略

    說明

    需客戶端版本>=1.8.12。

    當執(zhí)行節(jié)點宕機下線后,是否將子任務(wù)重新分發(fā)給其他機器執(zhí)行。開啟該配置后,發(fā)生failover時,子任務(wù)可能會重復(fù)執(zhí)行,需自行做好冪等。

    主節(jié)點參與執(zhí)行

    說明

    需客戶端版本>=1.8.12。

    主節(jié)點是否參與子任務(wù)執(zhí)行。在線可運行Worker數(shù)量必須不低于2臺,在子任務(wù)數(shù)量特別大時,推薦關(guān)閉該參數(shù)。

    子任務(wù)分發(fā)方式

    • 推模型:每臺機器平均分配子任務(wù)。

    • 拉模型:每臺機器主動拉取子任務(wù),沒有木桶效應(yīng),支持動態(tài)擴容拉取子任務(wù)。拉取過程中,所有子任務(wù)會緩存在Master節(jié)點,對內(nèi)存有壓力,建議子任務(wù)數(shù)不超過10,000。

    其他配置項,請參見任務(wù)管理高級配置參數(shù)說明

原理&最佳實踐

Schedulerx2.0分布式計算原理&最佳實踐

Demo

處理單表數(shù)據(jù)(單表ID連續(xù))

  1. 主任務(wù)讀取最小ID和最大ID。

    select min(id), max(id) from Tab1;
  2. 根據(jù)ID的range進行分頁,每個task包含兩個字段:startId和endId。

  3. 每個task通過ID的range獲取數(shù)據(jù)。

    select * from Tab1 where id >= startId and id < endId;

以下為示例代碼:

class PageTask {
    private long startId;
    private long endId;

    public PageTask(long startId, long endId) {
        this.startId = startId;
        this.endId = endId;
    }

    public long getStartId() {
        return startId;
    }

    public long getEndId() {
        return endId;
    }
}
@Component
public class ScanSingleTableJobProcessor extends MapJobProcessor {

    @Autowired
    private XXXService xxxService;

    private final int PAGE_SIZE = 500;

    @Override
    public ProcessResult process(JobContext context) throws Exception {
        String tableName = context.getJobParameters(); //多個job后端代碼可以一致,通過控制臺配置job參數(shù)表示表名
        String taskName = context.getTaskName();
        Object task = context.getTask();
        if (isRootTask(context)) {
            Pair<Long, Long> idPair = queryMinAndMaxId(tableName);
            long minId = idPair.getFirst();
            long maxId = idPair.getSecond();
            List<PageTask> tasks = Lists.newArrayList();
            int step = (int) ((maxId - minId) / PAGE_SIZE); //計算分頁數(shù)量
            if (step > 0) {
                for (long i = minId; i < maxId; i+=step) {
                    tasks.add(new PageTask(i, (i+step > maxId ? maxId : i+step)));
                }
            } else {
                tasks.add(new PageTask(minId, maxId));
            }
            return map(tasks, "PageTask");
        } else if (taskName.equals("PageTask")) {
            PageTask pageTask = (PageTask)task;
            long startId = pageTask.getStartId();
            long endId = pageTask.getEndId();
            List<Record> records = queryRecord(tableName, startId, endId);
            //TODO handle records
            return new ProcessResult(true);
        }

        return new ProcessResult(false);
    }

    private Pair<Long, Long> queryMinAndMaxId(String tableName) {
        //TODO select min(id),max(id) from [tableName]
        return new Pair<Long, Long>(1L, 10000L);
    }

    private List<Record> queryRecord(String tableName, long startId, long endId) {
        List<Record> records = Lists.newArrayList();
        //TODO select * from [tableName] where id>=[startId] and id<[endId]
        return records;
    }

}

處理單表數(shù)據(jù)(單表ID不連續(xù))

  1. 數(shù)據(jù)庫采用分桶策略,增加一個bucket字段作為索引。

  2. 例如1024個桶,數(shù)據(jù)庫每添加一行記錄時,將訂單號或者ID進行hash,例如訂單號%1024,落在bucket字段中。

  3. 基本上每個桶是平均的,針對每個桶,都可以通過以下SQL語句全量查詢結(jié)果。

    select * from Tab1 where bucket=xxx;

以下為示例代碼:

@Component
public class ScanSingleTableJobProcessor extends MapJobProcessor {

    @Autowired
    private XXXService xxxService;

    private final int PAGE_SIZE = 500;

    @Override
    public ProcessResult process(JobContext context) throws Exception {
        String tableName = context.getJobParameters(); //多個job后端代碼可以一致,通過控制臺配置job參數(shù)表示表名。
        String taskName = context.getTaskName();
        Object task = context.getTask();
        if (isRootTask(context)) {
            List<Integer> tasks = Lists.newArrayList();
            for (int i = 0; i< 1024; i++) {
                tasks.add(i);
            }    
            return map(tasks, "BucketTask");
        } else if (taskName.equals("BucketTask")) {
            int bucketId = (int)task;
            List<Record> records = queryRecord(tableName, bucketId);
            //TODO handle records
            return new ProcessResult(true);
        }

        return new ProcessResult(false);
    }

    private List<Record> queryRecord(String tableName, int bucketId) {
        List<Record> records = Lists.newArrayList();
        //TODO select * from #{tableName} where bucket= #{bucketId}
        return records;
    }

}

處理分庫分表數(shù)據(jù)

class PageTask {
    private String tableName;
    private long startId;
    private long endId;

    public PageTask(String tableName, long startId, long endId) {
        this.tableName = tableName;
        this.startId = startId;
        this.endId = endId;
    }

    public String getTableName() {
        return tableName;
    }

    public long getStartId() {
        return startId;
    }

    public long getEndId() {
        return endId;
    }
}
@Component
public class ScanShardingTableJobProcessor extends MapJobProcessor {

    @Autowired
    private XXXService xxxService;

    private final int PAGE_SIZE = 500;

    @Override
    public ProcessResult process(JobContext context) throws Exception {
        String taskName = context.getTaskName();
        Object task = context.getTask();
        if (isRootTask(context)) {
            //先分庫
            List<String> dbList = getDbList();
            return map(dbList, "DbTask");
        } else if (taskName.equals("DbTask")) {
            //根據(jù)分庫去分表
            String dbName = (String)task;
            List<String> tableList = getTableList(dbName);
            return map(tableList, "TableTask");
        } else if (taskName.equals("TableTask")) {
            //如果一個分表也很大,再分頁
            String tableName = (String)task;
            Pair<Long, Long> idPair = queryMinAndMaxId(tableName);
            long minId = idPair.getFirst();
            long maxId = idPair.getSecond();
            List<PageTask> tasks = Lists.newArrayList();
            int step = (int) ((maxId - minId) / PAGE_SIZE); //計算分頁數(shù)量
            if (step > 0) {
                for (long i = minId; i < maxId; i+=step) {
                    tasks.add(new PageTask(tableName, i, (i+step > maxId ? maxId : i+step)));
                }
            } else {
                tasks.add(new PageTask(tableName, minId, maxId));
            }
            return map(tasks, "PageTask");
        } else if (taskName.equals("PageTask")) {
            PageTask pageTask = (PageTask)task;
            String tableName = pageTask.getTableName();
            long startId = pageTask.getStartId();
            long endId = pageTask.getEndId();
            List<Record> records = queryRecord(tableName, startId, endId);
            //TODO handle records
            return new ProcessResult(true);
        }

        return new ProcessResult(false);
    }

    private List<String> getDbList() {
        List<String> dbList = Lists.newArrayList();
        //TODO 返回分庫列表
        return dbList;
    }

    private List<String> getTableList(String dbName) {
        List<String> tableList = Lists.newArrayList();
        //TODO 返回分表列表
        return tableList;
    }

    private Pair<Long, Long> queryMinAndMaxId(String tableName) {
        //TODO select min(id),max(id) from [tableName]
        return new Pair<Long, Long>(1L, 10000L);
    }

    private List<Record> queryRecord(String tableName, long startId, long endId) {
        List<Record> records = Lists.newArrayList();
        //TODO select * from [tableName] where id>=[startId] and id<[endId]
        return records;
    }

}

處理50條消息,reduce返回結(jié)果

@Component
public class TestMapReduceJobProcessor extends MapReduceJobProcessor {

    @Override
    public ProcessResult process(JobContext context) throws Exception {
        String taskName = context.getTaskName();
        int dispatchNum=50;
        if (isRootTask(context)) {
            System.out.println("start root task");
            List<String> msgList = Lists.newArrayList();
            for (int i = 0; i <= dispatchNum; i++) {
                msgList.add("msg_" + i);
            }
            return map(msgList, "Level1Dispatch");
        } else if (taskName.equals("Level1Dispatch")) {
            String task = (String)context.getTask();
            System.out.println(task);
            return new ProcessResult(true);
        }

        return new ProcessResult(false);
    }

    @Override
    public ProcessResult reduce(JobContext context) throws Exception {
        return new ProcessResult(true, "TestMapReduceJobProcessor.reduce");
    }

}

處理50條消息并且返回子任務(wù)結(jié)果由reduce匯總

@Component
public class TestMapReduceJobProcessor extends MapReduceJobProcessor {

    @Override
    public ProcessResult process(JobContext context) throws Exception {
        String taskName = context.getTaskName();
        int dispatchNum = 50;
        if (context.getJobParameters() != null) {
            dispatchNum = Integer.valueOf(context.getJobParameters());
        }
        if (isRootTask(context)) {
            System.out.println("start root task");
            List<String> msgList = Lists.newArrayList();
            for (int i = 0; i <= dispatchNum; i++) {
                msgList.add("msg_" + i);
            }
            return map(msgList, "Level1Dispatch");
        } else if (taskName.equals("Level1Dispatch")) {
            String task = (String)context.getTask();
            Thread.sleep(2000);
            return new ProcessResult(true, task);
        }

        return new ProcessResult(false);
    }

    @Override
    public ProcessResult reduce(JobContext context) throws Exception {
        for (Entry<Long, String> result : context.getTaskResults().entrySet()) {
            System.out.println("taskId:" + result.getKey() + ", result:" + result.getValue());
        }
        return new ProcessResult(true, "TestMapReduceJobProcessor.reduce");
    }
}