MapReduce模型
MapReduce模型是SchedulerX自主研發(fā)的輕量級分布式跑批模型。通過MapJobProcessor或MapReduceJobProcessor接口將接入的Worker組成分布式計算引擎進行大數(shù)據(jù)跑批。相對于傳統(tǒng)的大數(shù)據(jù)跑批(例如Hadoop、Spark等),MapReduce無需將數(shù)據(jù)導入大數(shù)據(jù)平臺,且無額外存儲及計算成本,即可實現(xiàn)秒級別海量數(shù)據(jù)處理,具有成本低、速度快、編程簡單等特性。
注意事項
單個子任務(wù)的大小不能超過64 KB。
ProcessResult的result返回值不能超過1000 Byte。
如果使用reduce,所有子任務(wù)結(jié)果會緩存在Master節(jié)點,該情況對Master節(jié)點內(nèi)存壓力較大,建議子任務(wù)個數(shù)和result返回值不要太大。如果沒有reduce需求,使用MapJobProcessor接口即可。
SchedulerX不保證子任務(wù)絕對執(zhí)行一次。在特殊條件下會failover,可能導致子任務(wù)重復(fù)執(zhí)行,需要業(yè)務(wù)方自行實現(xiàn)冪等。
接口
繼承類MapJobProcessor
接口
解釋
是否必選
public ProcessResult process(JobContext context) throws Exception;
每個子任務(wù)執(zhí)行業(yè)務(wù)的入口,需從context中獲取taskName,您需自行判斷子任務(wù)名稱。邏輯處理完成后,返回ProcessResult。
是
public ProcessResult map(List<? extends Object> taskList, String taskName);
執(zhí)行map方法可以將一批子任務(wù)分布至多臺機器上執(zhí)行,可以多次執(zhí)行map方法。如果taskList為空,返回失敗。執(zhí)行完成后,返回ProcessResult。
是
public void kill(JobContext context);
前端kill任務(wù)會觸發(fā)該方法,需自行實現(xiàn)如何中斷業(yè)務(wù)。
否
繼承類MapReduceJobProcessor
接口
解釋
是否必選
public ProcessResult process(JobContext context) throws Exception;
每個子任務(wù)執(zhí)行業(yè)務(wù)的入口,需從context里獲取taskName,您需自行判斷子任務(wù)名稱。邏輯處理完成后,返回ProcessResult。
是
public ProcessResult map(List<? extends Object> taskList, String taskName);
執(zhí)行map方法可以將一批子任務(wù)分布至多臺機器上執(zhí)行,可以多次執(zhí)行map方法。如果taskList是空,返回失敗。執(zhí)行完成后,返回ProcessResult。
是
public ProcessResult reduce(JobContext context);
所有Worker節(jié)點的子任務(wù)執(zhí)行完成后,會回調(diào)reduce方法。reduce由Master節(jié)點執(zhí)行,一般用于數(shù)據(jù)聚合或通知下游,也可以用于工作流的上下游數(shù)據(jù)傳遞。
reduce方法能處理所有子任務(wù)的結(jié)果。
子任務(wù)通過
return ProcessResult(true, result)
返回結(jié)果(例如返回訂單號)。執(zhí)行reduce方法時,通過context獲取所有子任務(wù)的狀態(tài)(
context.getTaskStatuses()
)和結(jié)果(context.getTaskResults()
),并進行相應(yīng)的邏輯處理。
是
public void kill(JobContext context);
前端kill任務(wù)會觸發(fā)該方法,需自行實現(xiàn)如何中斷業(yè)務(wù)。
否
public boolean runReduceIfFail(JobContext context)
當存在子任務(wù)失敗情況下,是否執(zhí)行reduce方法。默認配置為:子任務(wù)失敗時,仍然執(zhí)行reduce方法。
否
操作步驟
登錄分布式任務(wù)調(diào)度平臺,在左側(cè)導航欄,單擊任務(wù)管理。
在任務(wù)管理頁面,單擊創(chuàng)建任務(wù)。
在創(chuàng)建任務(wù)面板,執(zhí)行模式下拉列表選擇MapReduce,在高級配置區(qū)域配置相關(guān)信息。
配置項
說明
分發(fā)策略
說明需客戶端版本>=1.10.3。
輪詢策略(默認):每個Worker平均分配等量子任務(wù),適用于每個子任務(wù)處理耗時基本一致的場景。
WorkerLoad最優(yōu)策略:由主節(jié)點自動感知Worker節(jié)點的負載情況,適用于子任務(wù)和Worker機器處理耗時有較大差異的場景。
子任務(wù)單機并發(fā)數(shù)
即單機執(zhí)行線程數(shù),默認為5。如需加快執(zhí)行速度,可以調(diào)大該值。如果下游或者數(shù)據(jù)庫無法承接,可適當調(diào)小。
子任務(wù)失敗重試次數(shù)
子任務(wù)失敗會自動重試,默認為0。
子任務(wù)失敗重試間隔
子任務(wù)失敗重試間隔,單位:秒,默認為0。
子任務(wù)failover策略
說明需客戶端版本>=1.8.12。
當執(zhí)行節(jié)點宕機下線后,是否將子任務(wù)重新分發(fā)給其他機器執(zhí)行。開啟該配置后,發(fā)生failover時,子任務(wù)可能會重復(fù)執(zhí)行,需自行做好冪等。
主節(jié)點參與執(zhí)行
說明需客戶端版本>=1.8.12。
主節(jié)點是否參與子任務(wù)執(zhí)行。在線可運行Worker數(shù)量必須不低于2臺,在子任務(wù)數(shù)量特別大時,推薦關(guān)閉該參數(shù)。
子任務(wù)分發(fā)方式
推模型:每臺機器平均分配子任務(wù)。
拉模型:每臺機器主動拉取子任務(wù),沒有木桶效應(yīng),支持動態(tài)擴容拉取子任務(wù)。拉取過程中,所有子任務(wù)會緩存在Master節(jié)點,對內(nèi)存有壓力,建議子任務(wù)數(shù)不超過10,000。
其他配置項,請參見任務(wù)管理高級配置參數(shù)說明。
原理&最佳實踐
Demo
處理單表數(shù)據(jù)(單表ID連續(xù))
主任務(wù)讀取最小ID和最大ID。
select min(id), max(id) from Tab1;
根據(jù)ID的range進行分頁,每個task包含兩個字段:startId和endId。
每個task通過ID的range獲取數(shù)據(jù)。
select * from Tab1 where id >= startId and id < endId;
以下為示例代碼:
class PageTask {
private long startId;
private long endId;
public PageTask(long startId, long endId) {
this.startId = startId;
this.endId = endId;
}
public long getStartId() {
return startId;
}
public long getEndId() {
return endId;
}
}
@Component
public class ScanSingleTableJobProcessor extends MapJobProcessor {
@Autowired
private XXXService xxxService;
private final int PAGE_SIZE = 500;
@Override
public ProcessResult process(JobContext context) throws Exception {
String tableName = context.getJobParameters(); //多個job后端代碼可以一致,通過控制臺配置job參數(shù)表示表名
String taskName = context.getTaskName();
Object task = context.getTask();
if (isRootTask(context)) {
Pair<Long, Long> idPair = queryMinAndMaxId(tableName);
long minId = idPair.getFirst();
long maxId = idPair.getSecond();
List<PageTask> tasks = Lists.newArrayList();
int step = (int) ((maxId - minId) / PAGE_SIZE); //計算分頁數(shù)量
if (step > 0) {
for (long i = minId; i < maxId; i+=step) {
tasks.add(new PageTask(i, (i+step > maxId ? maxId : i+step)));
}
} else {
tasks.add(new PageTask(minId, maxId));
}
return map(tasks, "PageTask");
} else if (taskName.equals("PageTask")) {
PageTask pageTask = (PageTask)task;
long startId = pageTask.getStartId();
long endId = pageTask.getEndId();
List<Record> records = queryRecord(tableName, startId, endId);
//TODO handle records
return new ProcessResult(true);
}
return new ProcessResult(false);
}
private Pair<Long, Long> queryMinAndMaxId(String tableName) {
//TODO select min(id),max(id) from [tableName]
return new Pair<Long, Long>(1L, 10000L);
}
private List<Record> queryRecord(String tableName, long startId, long endId) {
List<Record> records = Lists.newArrayList();
//TODO select * from [tableName] where id>=[startId] and id<[endId]
return records;
}
}
處理單表數(shù)據(jù)(單表ID不連續(xù))
數(shù)據(jù)庫采用分桶策略,增加一個bucket字段作為索引。
例如1024個桶,數(shù)據(jù)庫每添加一行記錄時,將訂單號或者ID進行hash,例如訂單號%1024,落在bucket字段中。
基本上每個桶是平均的,針對每個桶,都可以通過以下SQL語句全量查詢結(jié)果。
select * from Tab1 where bucket=xxx;
以下為示例代碼:
@Component
public class ScanSingleTableJobProcessor extends MapJobProcessor {
@Autowired
private XXXService xxxService;
private final int PAGE_SIZE = 500;
@Override
public ProcessResult process(JobContext context) throws Exception {
String tableName = context.getJobParameters(); //多個job后端代碼可以一致,通過控制臺配置job參數(shù)表示表名。
String taskName = context.getTaskName();
Object task = context.getTask();
if (isRootTask(context)) {
List<Integer> tasks = Lists.newArrayList();
for (int i = 0; i< 1024; i++) {
tasks.add(i);
}
return map(tasks, "BucketTask");
} else if (taskName.equals("BucketTask")) {
int bucketId = (int)task;
List<Record> records = queryRecord(tableName, bucketId);
//TODO handle records
return new ProcessResult(true);
}
return new ProcessResult(false);
}
private List<Record> queryRecord(String tableName, int bucketId) {
List<Record> records = Lists.newArrayList();
//TODO select * from #{tableName} where bucket= #{bucketId}
return records;
}
}
處理分庫分表數(shù)據(jù)
class PageTask {
private String tableName;
private long startId;
private long endId;
public PageTask(String tableName, long startId, long endId) {
this.tableName = tableName;
this.startId = startId;
this.endId = endId;
}
public String getTableName() {
return tableName;
}
public long getStartId() {
return startId;
}
public long getEndId() {
return endId;
}
}
@Component
public class ScanShardingTableJobProcessor extends MapJobProcessor {
@Autowired
private XXXService xxxService;
private final int PAGE_SIZE = 500;
@Override
public ProcessResult process(JobContext context) throws Exception {
String taskName = context.getTaskName();
Object task = context.getTask();
if (isRootTask(context)) {
//先分庫
List<String> dbList = getDbList();
return map(dbList, "DbTask");
} else if (taskName.equals("DbTask")) {
//根據(jù)分庫去分表
String dbName = (String)task;
List<String> tableList = getTableList(dbName);
return map(tableList, "TableTask");
} else if (taskName.equals("TableTask")) {
//如果一個分表也很大,再分頁
String tableName = (String)task;
Pair<Long, Long> idPair = queryMinAndMaxId(tableName);
long minId = idPair.getFirst();
long maxId = idPair.getSecond();
List<PageTask> tasks = Lists.newArrayList();
int step = (int) ((maxId - minId) / PAGE_SIZE); //計算分頁數(shù)量
if (step > 0) {
for (long i = minId; i < maxId; i+=step) {
tasks.add(new PageTask(tableName, i, (i+step > maxId ? maxId : i+step)));
}
} else {
tasks.add(new PageTask(tableName, minId, maxId));
}
return map(tasks, "PageTask");
} else if (taskName.equals("PageTask")) {
PageTask pageTask = (PageTask)task;
String tableName = pageTask.getTableName();
long startId = pageTask.getStartId();
long endId = pageTask.getEndId();
List<Record> records = queryRecord(tableName, startId, endId);
//TODO handle records
return new ProcessResult(true);
}
return new ProcessResult(false);
}
private List<String> getDbList() {
List<String> dbList = Lists.newArrayList();
//TODO 返回分庫列表
return dbList;
}
private List<String> getTableList(String dbName) {
List<String> tableList = Lists.newArrayList();
//TODO 返回分表列表
return tableList;
}
private Pair<Long, Long> queryMinAndMaxId(String tableName) {
//TODO select min(id),max(id) from [tableName]
return new Pair<Long, Long>(1L, 10000L);
}
private List<Record> queryRecord(String tableName, long startId, long endId) {
List<Record> records = Lists.newArrayList();
//TODO select * from [tableName] where id>=[startId] and id<[endId]
return records;
}
}
處理50條消息,reduce返回結(jié)果
@Component
public class TestMapReduceJobProcessor extends MapReduceJobProcessor {
@Override
public ProcessResult process(JobContext context) throws Exception {
String taskName = context.getTaskName();
int dispatchNum=50;
if (isRootTask(context)) {
System.out.println("start root task");
List<String> msgList = Lists.newArrayList();
for (int i = 0; i <= dispatchNum; i++) {
msgList.add("msg_" + i);
}
return map(msgList, "Level1Dispatch");
} else if (taskName.equals("Level1Dispatch")) {
String task = (String)context.getTask();
System.out.println(task);
return new ProcessResult(true);
}
return new ProcessResult(false);
}
@Override
public ProcessResult reduce(JobContext context) throws Exception {
return new ProcessResult(true, "TestMapReduceJobProcessor.reduce");
}
}
處理50條消息并且返回子任務(wù)結(jié)果由reduce匯總
@Component
public class TestMapReduceJobProcessor extends MapReduceJobProcessor {
@Override
public ProcessResult process(JobContext context) throws Exception {
String taskName = context.getTaskName();
int dispatchNum = 50;
if (context.getJobParameters() != null) {
dispatchNum = Integer.valueOf(context.getJobParameters());
}
if (isRootTask(context)) {
System.out.println("start root task");
List<String> msgList = Lists.newArrayList();
for (int i = 0; i <= dispatchNum; i++) {
msgList.add("msg_" + i);
}
return map(msgList, "Level1Dispatch");
} else if (taskName.equals("Level1Dispatch")) {
String task = (String)context.getTask();
Thread.sleep(2000);
return new ProcessResult(true, task);
}
return new ProcessResult(false);
}
@Override
public ProcessResult reduce(JobContext context) throws Exception {
for (Entry<Long, String> result : context.getTaskResults().entrySet()) {
System.out.println("taskId:" + result.getKey() + ", result:" + result.getValue());
}
return new ProcessResult(true, "TestMapReduceJobProcessor.reduce");
}
}