工作流調(diào)度
本文介紹如何通過(guò)可視化的工作流進(jìn)行任務(wù)編排且支持Cron表達(dá)式和API。
使用說(shuō)明
工作流中的Job沒有獨(dú)立調(diào)度時(shí)間,跟隨工作流的時(shí)間開始調(diào)度。
工作流調(diào)度至少要有2個(gè)Job,且有依賴關(guān)系。如果只有一個(gè)Job,請(qǐng)直接使用任務(wù)管理。
工作流支持跨應(yīng)用編排,可以將不同應(yīng)用下的任務(wù)進(jìn)行編排。
使用限制
當(dāng)前只有Java任務(wù)支持?jǐn)?shù)據(jù)傳輸,分布式Java任務(wù)請(qǐng)使用MapReduce模型進(jìn)行數(shù)據(jù)傳輸。
使用指南
具體操作,請(qǐng)參見流程管理。
工作流上下游數(shù)據(jù)傳遞
SchedulerX提供工作流任務(wù)編排,同時(shí)支持任務(wù)上下游的數(shù)據(jù)傳遞。
返回執(zhí)行結(jié)果
/**
*
* @param status
* @param result, the size should be less than 1000 bytes
* @throws Exception
*/
public ProcessResult(boolean status, String result) throws Exception;
在Processor結(jié)尾,通過(guò)該方法可以返回result執(zhí)行結(jié)果。
result的長(zhǎng)度不能超過(guò)1000字節(jié),不是String的長(zhǎng)度。如果有中文字符,可能會(huì)超過(guò)1000字節(jié),超過(guò)1000字節(jié)時(shí)任務(wù)會(huì)失敗。
獲取上游數(shù)據(jù)
List<JobInstanceData> upstreamDatas = JobContext.getUpstreamData();
在Processor里,可以通過(guò)該接口從JobContext中獲取上游的數(shù)據(jù),上游的數(shù)據(jù)是一個(gè)list(可能有多個(gè)父節(jié)點(diǎn))。JobInstanceData里有兩個(gè)屬性,分別是JobName和Data(String類型)。
Demo
首先編寫三個(gè)JobProcessor。
public class TestSimpleJobA extends JavaProcessor { @Override public ProcessResult process(JobContext context) throws Exception { System.out.println("TestSimpleJobA " + DateTime.now().toString("yyyy-MM-dd HH:mm:ss")); return new ProcessResult(true, String.valueOf(1)); } }
public class TestSimpleJobB extends JavaProcessor { @Override public ProcessResult process(JobContext context) throws Exception { System.out.println("TestSimpleJobB " + DateTime.now().toString("yyyy-MM-dd HH:mm:ss")); return new ProcessResult(true, String.valueOf(2)); } }
public class TestSimpleJobC extends JavaProcessor { @Override public ProcessResult process(JobContext context) throws Exception { List<JobInstanceData> upstreamDatas = context.getUpstreamData(); int sum = 0; for (JobInstanceData jobInstanceData : upstreamDatas) { System.out.println("jobName=" + jobInstanceData.getJobName() + ", data=" + jobInstanceData.getData()); sum += Integer.valueOf(jobInstanceData.getData()); } System.out.println("TestSimpleJobC sum=" + sum); return new ProcessResult(true, String.valueOf(sum)); } }
通過(guò)控制臺(tái)配置工作流如下圖所示:
觸發(fā)一次該工作流之后,在左側(cè)導(dǎo)航欄單擊執(zhí)行列表>流程實(shí)例列表,找到創(chuàng)建的工作流名稱。然后單擊實(shí)例ID/流程實(shí)例圖,進(jìn)入工作流實(shí)例詳情頁(yè)面,右鍵單擊JobA的實(shí)例,選擇詳情,進(jìn)入任務(wù)實(shí)例詳情,可以看到JobA實(shí)例結(jié)果=1,如下圖:
同理,可以看到JobB的實(shí)例結(jié)果=2, JobC的實(shí)例結(jié)果=3。
在控制臺(tái)查看JobC機(jī)器的打印信息。
jobName=jobB, data=2 jobName=jobA, data=1 TestSimpleJobC sum=3