Java任務
Java調度任務可以在您的應用進程中執行。本文介紹如何管理Java類型的任務。
執行模式
Java任務類型支持單機、廣播、可視化MapReduce、MapReduce和分片運行模式:
單機:在同一個
groupId
下的機器隨機挑一臺執行。廣播:同一個
groupId
下的所有機器同時執行。可視化MapReduce:屬于MapReduce模型任務,需開通專業版。可支持至1000以內子任務量,并且支持基于業務關鍵字的可視化子任務執行詳細記錄查詢,支持基于業務關鍵字的子任務運行日志或堆棧查詢。
MapReduce:常規MapReduce模型任務,支持超大數量的子任務并行處理,僅可查詢子任務運行匯總信息,建議子任務在100萬以下時選擇。
分片運行:包括靜態分片和動態分批,用于處理大數據業務需求。
單機和廣播需要實現JavaProcessor;可視化MapReduce、MapReduce和分片運行需要實現MapJobProcessor。
Processor類路徑,即實現類的全路徑名,例如com.apache.armon.test.schedulerx.processor.MySimpleJob
:
如果不上傳JAR包,SchedulerX會去您的應用進程中的classpath下查找processor實現類,所以每次修改需要重新編譯和發布。
如果上傳了JAR包,每次會熱加載JAR包和processor,不需要重新發布應用。
編程模型
Java任務支持兩種編程模型:JavaProcessor和MapJobProcessor。
JavaProcessor
可選:
public void preProcess(JobContext context) throws Exception
必需:
public ProcessResult process(JobContext context) throws Exception
可選:
public void postProcess(JobContext context)
可選:
public void kill(JobContext context)
MapJobProcessor
必需:
public ProcessResult process(JobContext context) throws Exception
可選:
public void postProcess(JobContext context)
可選:
public void kill(JobContext context)
必需:
public ProcessResult map(List<? extends Object> taskList, String taskName)
ProcessResult
每個process需要返回ProcessResult,用來表示任務執行的狀態、結果和錯誤信息。
任務運行成功:
return new ProcessResult(true)
。任務運行失敗:
return new ProcessResult(false, ErrorMsg)
或者直接拋異常。任務運行成功并且返回結果:
return new ProcessResult(true, result)
。result
是一個字符串,不能大于1000字節。
HelloSchedulerx2.0任務示例
@Component
public class MyProcessor1 extends JavaProcessor {
@Override
public ProcessResult process(JobContext context) throws Exception {
//TODO
System.out.println("Hello, schedulerx2.0!");
return new ProcessResult(true);
}
}
支持Kill功能的任務示例
@Component
public class MyProcessor2 extends JavaProcessor {
private volatile boolean stop = false;
@Override
public ProcessResult process(JobContext context) throws Exception {
int N = 10000;
while (!stop && N >= 0) {
//TODO
N--;
}
return new ProcessResult(true);
}
@Override
public void kill(JobContext context) {
stop = true;
}
@Override
public void preProcess(JobContext context) {
stop = false; //如果是通過Spring啟動,Bean是單例,需要通過preProcess把標記為復位
}
}
通過Map模型批量處理任務示例
/**
* 對一張單表進行分布式批量處理
* 1. 根任務先查詢一張表,獲取minId,maxId
* 2. 構造PageTask,通過map進行分發
* 3. 下一級獲取到如果是PageTask,則進行數據處理
*
*/
@Component
public class ScanSingleTableJobProcessor extends MapJobProcessor {
private static final int pageSize = 100;
static class PageTask {
private int startId;
private int endId;
public PageTask(int startId, int endId) {
this.startId = startId;
this.endId = endId;
}
public int getStartId() {
return startId;
}
public int getEndId() {
return endId;
}
}
@Override
public ProcessResult process(JobContext context) {
String taskName = context.getTaskName();
Object task = context.getTask();
if (isRootTask(context)) {
System.out.println("start root task");
Pair<Integer, Integer> idPair = queryMinAndMaxId();
int minId = idPair.getFirst();
int maxId = idPair.getSecond();
List<PageTask> taskList = Lists.newArrayList();
int step = (int) ((maxId - minId) / pageSize); //計算分頁數量
for (int i = minId; i < maxId; i+=step) {
taskList.add(new PageTask(i, (i+step > maxId ? maxId : i+step)));
}
return map(taskList, "Level1Dispatch"); //process調用map方法完成子任務分發
} else if (taskName.equals("Level1Dispatch")) {
PageTask record = (PageTask)task;
long startId = record.getStartId();
long endId = record.getEndId();
//TODO
return new ProcessResult(true);
}
return new ProcessResult(true);
}
@Override
public void postProcess(JobContext context) {
//TODO
System.out.println("all tasks is finished.");
}
private Pair<Integer, Integer> queryMinAndMaxId() {
//TODO select min(id),max(id) from xxx
return null;
}
}