日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

Java任務

更新時間:

Java調度任務可以在您的應用進程中執行。本文介紹如何管理Java類型的任務。

執行模式

Java任務類型支持單機、廣播、可視化MapReduce、MapReduce和分片運行模式:

  • 單機:在同一個groupId下的機器隨機挑一臺執行。

  • 廣播:同一個groupId下的所有機器同時執行。

  • 可視化MapReduce:屬于MapReduce模型任務,需開通專業版。可支持至1000以內子任務量,并且支持基于業務關鍵字的可視化子任務執行詳細記錄查詢,支持基于業務關鍵字的子任務運行日志或堆棧查詢。

  • MapReduce:常規MapReduce模型任務,支持超大數量的子任務并行處理,僅可查詢子任務運行匯總信息,建議子任務在100萬以下時選擇。

  • 分片運行:包括靜態分片和動態分批,用于處理大數據業務需求。

單機廣播需要實現JavaProcessor;可視化MapReduce、MapReduce分片運行需要實現MapJobProcessor。

Processor類路徑,即實現類的全路徑名,例如com.apache.armon.test.schedulerx.processor.MySimpleJob

  • 如果不上傳JAR包,SchedulerX會去您的應用進程中的classpath下查找processor實現類,所以每次修改需要重新編譯和發布。

  • 如果上傳了JAR包,每次會熱加載JAR包和processor,不需要重新發布應用。

編程模型

Java任務支持兩種編程模型:JavaProcessor和MapJobProcessor。

  • JavaProcessor

    • 可選:public void preProcess(JobContext context) throws Exception

    • 必需:public ProcessResult process(JobContext context) throws Exception

    • 可選:public void postProcess(JobContext context)

    • 可選:public void kill(JobContext context)

  • MapJobProcessor

    • 必需:public ProcessResult process(JobContext context) throws Exception

    • 可選:public void postProcess(JobContext context)

    • 可選:public void kill(JobContext context)

    • 必需:public ProcessResult map(List<? extends Object> taskList, String taskName)

ProcessResult

每個process需要返回ProcessResult,用來表示任務執行的狀態、結果和錯誤信息。

  • 任務運行成功:return new ProcessResult(true)

  • 任務運行失敗:return new ProcessResult(false, ErrorMsg)或者直接拋異常。

  • 任務運行成功并且返回結果:return new ProcessResult(true, result)。result是一個字符串,不能大于1000字節。

HelloSchedulerx2.0任務示例

@Component
public class MyProcessor1 extends JavaProcessor {

    @Override
    public ProcessResult process(JobContext context) throws Exception {
        //TODO
        System.out.println("Hello, schedulerx2.0!");
        return new ProcessResult(true);
    }
}            

支持Kill功能的任務示例

@Component
public class MyProcessor2 extends JavaProcessor {
    private volatile boolean stop = false;

    @Override
    public ProcessResult process(JobContext context) throws Exception {
        int N = 10000;
        while (!stop && N >= 0) {
            //TODO
            N--;
        }
        return new ProcessResult(true);
    }

    @Override
    public void kill(JobContext context) {
        stop = true;
    }

    @Override
    public void preProcess(JobContext context) {
        stop = false;  //如果是通過Spring啟動,Bean是單例,需要通過preProcess把標記為復位
    }
}          

通過Map模型批量處理任務示例

/**
 * 對一張單表進行分布式批量處理
 * 1. 根任務先查詢一張表,獲取minId,maxId
 * 2. 構造PageTask,通過map進行分發
 * 3. 下一級獲取到如果是PageTask,則進行數據處理
 *
 */
@Component
public class ScanSingleTableJobProcessor extends MapJobProcessor {
    private static final int pageSize = 100;

    static class PageTask {
        private int startId;
        private int endId;
        public PageTask(int startId, int endId) {
             this.startId = startId;
             this.endId = endId;
        }
        public int getStartId() {
              return startId;
        }
        public int getEndId() {
              return endId;
        }
    }

    @Override
    public ProcessResult process(JobContext context) {
        String taskName = context.getTaskName();
        Object task = context.getTask();
        if (isRootTask(context)) {
            System.out.println("start root task");
            Pair<Integer, Integer> idPair = queryMinAndMaxId();
            int minId = idPair.getFirst();
            int maxId = idPair.getSecond();
            List<PageTask> taskList = Lists.newArrayList();
            int step = (int) ((maxId - minId) / pageSize); //計算分頁數量
            for (int i = minId; i < maxId; i+=step) {
                taskList.add(new PageTask(i, (i+step > maxId ? maxId : i+step)));
            }
            return map(taskList, "Level1Dispatch"); //process調用map方法完成子任務分發
        } else if (taskName.equals("Level1Dispatch")) {
            PageTask record = (PageTask)task;
            long startId = record.getStartId();
            long endId = record.getEndId();
            //TODO
            return new ProcessResult(true);
        }

        return new ProcessResult(true);
    }

    @Override
    public void postProcess(JobContext context) {
        //TODO
        System.out.println("all tasks is finished.");
    }

    private Pair<Integer, Integer> queryMinAndMaxId() {
        //TODO select min(id),max(id) from xxx
        return null;
    }

}