可視化MapReduce模型
可視化MapReduce模型在MapReduce模型的基礎上,新增了可視化可運維的能力。您無需修改后端代碼,只需在SchedulerX控制臺將分布式模型改為可視化MapReduce,即可新增一個子任務列表頁面,并且可以查看每個子任務的詳情、結果和日志,同時支持每個子任務級別的重跑。
注意事項
低版本SDK有安全漏洞,請升級到1.12.2以上版本。
僅專業版支持。
子任務個數不能超過1000個。
單個子任務的大小不能超過64 KB。
子任務顯示自定義標簽信息時,子任務對象需要實現指定接口。
ProcessResult的result返回值不能超過1000 Byte。
如果使用reduce,所有子任務結果會緩存在Master節點,該情況對Master節點內存壓力較大,建議子任務個數和result返回值不要太大。如果沒有reduce需求,使用MapJobProcessor接口即可。
SchedulerX不保證子任務絕對執行一次。在特殊條件下會failover,可能導致子任務重復執行,需要業務方自行實現冪等。
接口
繼承MapReduce模型所有接口,任務處理代碼開發模型與MapReduce模型完全一致。具體信息,請參見MapReduce模型。
(可選)在MapReduce模型接口基礎上,支持設置每個子任務的標簽展示(子任務對象需要實現com.alibaba.schedulerx.worker.processor.BizSubTask接口)。
接口
解釋
是否必選
public Map<String, String> labelMap()
實現輸出子任務標簽信息,用于展示對應子任務對象的業務自定義特征信息(如:賬戶名、商品Code、城市區域等)。
否
與MapReduce對比
對比項 | MapReduce | 可視化MapReduce |
子任務數量 | 可支持百萬級 | 小于等于1000。 |
任務開發模式 | 兩者相同 | |
子任務列表 | 不支持 | 支持。 |
子任務運行詳情 | 不支持 | 支持,單個子任務執行記錄、執行狀態、日志、鏈路追蹤、運行堆棧。 |
子任務標簽 | 不支持 | 支持,子任務實現BizSubTask接口可查看業務標簽信息。 |
子任務操作 | 不支持 | 支持,單個子任務支持停止、重跑。 |
任務開發演示
賬戶批量處理
案例描述:對一批銀行賬戶進行批量處理,每個賬號作為獨立的子任務在整個集群中進行全局并行處理,并且每一個子任務在執行列表中需要顯示其對應的賬戶信息以便查看,可以方便的掌握每一個賬號地處理狀態及其執行詳細信息。如下將提供相應demo代碼供參考使用。
自定義賬號信息子任務對象,每個子任務對象支持展示其標簽信息,需實現接口com.alibaba.schedulerx.worker.processor.BizSubTask,并實現labelMap方法。
public class ParallelAccountInfo implements BizSubTask { /** * 主鍵 */ private long id; private String name; private String accountId; public ParallelAccountInfo(long id, String name, String accountId) { this.id = id; this.name = name; this.accountId = accountId; } /** * 實現labelMap方法,用于設置對應子任務的標簽信息 * @return */ @Override public Map<String, String> labelMap() { Map<String, String> labelMap = new HashMap(); labelMap.put("戶名", name); return labelMap; } }
子任務對象實現對應接口后,子任務列表才可展示出每個子任務對象獨有的標簽信息(例如:案例中的戶名)用于區分每一個賬戶對象的業務處理情況,且支持按標簽搜索。
賬號業務任務處理Processor,實現對單個賬號的業務邏輯處理,繼承com.alibaba.schedulerx.worker.processor.MapReduceJobProcessor。
public class ParallelJob extends MapReduceJobProcessor { private static final Logger logger = LoggerFactory.getLogger("schedulerx"); @Override public ProcessResult reduce(JobContext context) throws Exception { return new ProcessResult(true); } @Override public ProcessResult process(JobContext context) throws Exception { if(isRootTask(context)){ logger.info("構建并行計算的子任務列表..."); List<ParallelAccountInfo> list = new LinkedList(); /** * 判斷如果是rootTask的情況下,構建并行計算子任務對象列表 * 在實際業務場景中,用戶可自行根據業務場景加載子任務對象且該業務對象實現BizSubTask接口 * 場景案例: * 1、從數據庫中加載未被處理的客戶賬戶信息 * 2、構建省份城市地區信息列表,按區域分發任務處理 * 3、根據業務標簽作為子任務分類,如:電器、日用品、食品等 * 4、可根據時間作為子任務分類,如:按月(1月、2月...) */ for(int i=0; i < 20; i++){ list.add(new ParallelAccountInfo(i, "CUS"+StringUtils.leftPad(i+"", 4, "0"), "AC"+StringUtils.leftPad(i+"", 12, "0"))); } return map(list, "transfer"); }else { /** * 非rootTask,用戶可以獲取對應的子任務信息進行相應的業務處理 */ ParallelAccountInfo obj = (ParallelAccountInfo)context.getTask(); // 針對獲取的 obj子任務信息,進行業務邏輯處理 // do something logger.info("處理子任務信息:{}", JSON.toJSONString(obj)); return new ProcessResult(true); } } }
完成任務開發部署后,在控制臺配置相應定時任務運行,請參見操作步驟。
操作步驟
任務配置
登錄分布式任務調度平臺,在左側導航欄,單擊任務管理。
在任務管理頁面,單擊創建任務。
在創建任務面板,執行模式下拉列表選擇可視化MapReduce。
在高級配置區域配置相關信息。其他配置項,請參見任務管理高級配置參數說明。
配置項
說明
分發策略
輪詢策略(默認):每個Worker平均分配等量子任務,適用于每個子任務處理耗時基本一致的場景。
WorkerLoad最優策略:由主節點自動感知Worker節點的負載情況,適用于子任務和Worker機器處理耗時有較大差異的場景。
說明客戶端版本為1.10.14及以上。
子任務單機并發數
即單機執行線程數,默認為5。如需加快執行速度,可以調大該值。如果下游或者數據庫無法承接,可適當調小。
子任務失敗重試次數
子任務失敗會自動重試,默認為0。
子任務失敗重試間隔
子任務失敗重試間隔,單位:秒,默認為0。
子任務failover策略
當執行節點宕機下線后,是否將子任務重新分發給其他機器執行。開啟該配置后,發生failover時,子任務可能會重復執行,需自行做好冪等。
說明客戶端版本為1.8.13及以上。
主節點參與執行
主節點是否參與子任務執行。在線可運行Worker數量必須不低于2臺,在子任務數量特別大時,推薦關閉該參數。
說明客戶端版本為1.8.13及以上。
可視化能力
任務執行后,您可以在執行列表頁面,單擊詳情查看對應子任務的詳細執行信息。
在子任務列表頁簽查看每個子任務處理的狀態。
在子任務列表頁簽,單擊子任務操作列的日志,可以查看每個子任務運行的業務日志信息,分析執行狀態結果。
任務執行記錄在運行中時,在當前執行詳情頁簽,單擊查看堆棧,可以查看對應機器處理線程運行中的情況,分析當前任務運行異常情況。
在子任務列表頁簽,當接入鏈路追蹤后,單擊對應的TraceId,可以查詢每個子任務的執行調用鏈路。具體操作,請參見如何接入鏈路追蹤。