DataWorks的開放平臺為您提供OpenEvent、OpenAPI等開放能力,您可通過開放平臺將第三方調度系統(tǒng)集成到DataWorks的調度系列中,將三方調度系統(tǒng)的任務嵌入DataWorks的業(yè)務流程中。本文以一個示例為您介紹集成第三方調度系統(tǒng)時需要進行的配置要點。

背景信息

當您的數據處理主流程在DataWorks中,且需要集成一個其他調度系統(tǒng)的調度任務時,您可以使用DataWorks的開放平臺和HTTP觸發(fā)節(jié)點。如下圖所示。集成三方系統(tǒng)集成三方調度系統(tǒng)后,整體的任務運行流程如下。
  • 三方調度系統(tǒng)可通過DataWorks的OpenEvent功能,訂閱依賴的DataWorks節(jié)點的狀態(tài),當依賴的節(jié)點運行完成后,即可開始運行三方調度系統(tǒng)中的任務。
  • 當三方系統(tǒng)中的任務運行完成后,即可通過DataWorks的RunTriggerNode這個API觸發(fā)運行DataWorks的HTTP觸發(fā)節(jié)點,通過HTTP觸發(fā)節(jié)點觸發(fā)下游的DataWorks節(jié)點開始運行。
其中使用的DataWorks關鍵功能與概念包括:
下文以一個示例為您介紹要實現上述業(yè)務需求需進行的主要操作流程。

DataWorks配置:開啟并配置消息訂閱(OpenEvent)

開啟并配置消息訂閱的詳細步驟請參見開啟消息訂閱,以下為本實踐中的核心配置流程與注意事項。

  1. EvenBridge控制臺,跳過事件源等配置,快速創(chuàng)建一個自定義總線。創(chuàng)建自定義總線
  2. EvenBridge控制臺對應的事件總線中,創(chuàng)建事件規(guī)則。
    本實踐定義該EventBridge自定義總線可接收DataWorks實例狀態(tài)變更消息,配置demo與核心參數配置如下。
    1. 配置事件模式事件規(guī)則3
      {
          "source": [
              "acs.dataworks"
          ],
          "type": [
              "dataworks:InstanceStatusChanges:InstanceStatusChanges"
          ]
      }
      • source:定義事件的產品名稱標識,配置為acs.dataworks
      • type:定義產品下事件的類型標識,配置為dataworks:InstanceStatusChanges:InstanceStatusChanges。您可以在下方的事件模式調試中,將source、type取值進行補充修改,然后進行事件測試,測試成功后單擊下一步測試3
    2. 配置事件目標中,服務類型選擇為HTTPS,并填寫合適的URL,其他參數可保持默認。事件目標
  3. DataWorks控制臺的開放平臺頁面,啟用上述消息分發(fā)通道。啟用

三方系統(tǒng)配置:開發(fā)觸發(fā)三方任務運行的邏輯

完成訂閱需要依賴的DataWorks節(jié)點后,您需要配置三方調度系統(tǒng),根據DataWorks實例狀態(tài)觸發(fā)運行任務,當訂閱到依賴的DataWorks節(jié)點已運行成功,即開始運行三方系統(tǒng)的任務。配置的示例代碼如下。
package com.aliyun.dataworks.demo;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.dataworks.config.Constants;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;


/**
 * @author dataworks demo
 */
@RestController
@RequestMapping("/event")
public class ExtensionsController {

    /**
     * 接收eventBridge推送過來的消息
     * @param jsonParam
     */
    @PostMapping("/consumer")
    public void consumerEventBridge(@RequestBody String jsonParam){
        JSONObject jsonObj = JSON.parseObject(jsonParam);
        String eventCode = jsonObj.getString(Constants.EVENT_CODE_FILED);
        if(Constants.INSTANCE_STATUS_EVENT_CODE.equals(eventCode)){
            JSONObject dataParam = JSON.parseObject(jsonObj.getString("data"));
            //調度任務實例開始等時間的具體時間
            System.out.println("beginWaitTimeTime: "+ dataParam.getString("beginWaitTimeTime"));
            //DagId
            System.out.println("dagId: "+ dataParam.getString("dagId"));
            //Dag的類型,取值如下:
            //0:周期調度任務
            //1:手動任務
            //2:冒煙測試
            //3:補數據
            //4:手動業(yè)務流程
            //5:臨時業(yè)務流程
            System.out.println("dagType: "+dataParam.getString("dagType"));
            //任務實例的調度類型,取值如下:
            //NORMAL(0):正常調度任務。該任務被日常調度。
            //MANUAL(1):手動任務。該任務不會被日常調度。
            //PAUSE(2):凍結任務。該任務被日常調度,但啟動調度時直接被置為失敗狀態(tài)。
            //SKIP(3):空跑任務。該任務被日常調度,但啟動調度時直接被置為成功狀態(tài)。
            //SKIP_UNCHOOSE(4):臨時工作流中未選擇的任務,僅存在于臨時工作流中,啟動調度時直接被置為成功狀態(tài)。
            //SKIP_CYCLE(5):未到運行周期的周或月任務。該任務被日常調度,但啟動調度時直接被置為成功狀態(tài)。
            //CONDITION_UNCHOOSE(6):上游實例中有分支(IF)節(jié)點,但是該下游節(jié)點未被分支節(jié)點選中,直接置為空跑任務。
            //REALTIME_DEPRECATED(7):實時生成的已經過期的周期實例,該類型的任務直接被置為成功狀態(tài)。
            System.out.println("taskType: "+dataParam.getString("taskType"));
            //任務實例的修改時間
            System.out.println("modifyTime: "+dataParam.getString("modifyTime"));
            //任務實例的創(chuàng)建時間
            System.out.println("createTime: "+dataParam.getString("createTime"));
            //工作空間的ID。您可以調用ListProjects查看空間ID信息。
            System.out.println("appId: "+dataParam.getString("appId"));
            //調度任務實例所在工作空間的租戶ID
            System.out.println("tenantId: "+dataParam.getString("tenantId"));
            //調度任務實例的操作碼:該字段可忽略
            System.out.println("opCode: "+dataParam.getString("opCode"));
            //業(yè)務流程的ID,周期調度任務實例的業(yè)務流程默認為1,手動業(yè)務流程和內部工作流調度任務實例為實際的業(yè)務流程ID
            System.out.println("flowId: "+dataParam.getString("flowId"));
            //調度任務實例對應的節(jié)點ID
            System.out.println("nodeId:"+dataParam.getString("nodeId"));
            //調度任務實例開始等資源的具體時間
            System.out.println("beginWaitResTime: "+dataParam.getString("beginWaitResTime"));
            //調度任務實例ID
            System.out.println("taskId: "+dataParam.getString("taskId"));
            //任務的狀態(tài),取值如下:
            //0(未運行)
            //2(等待定時時間dueTime或cycleTime到來)
            //3(等待資源)
            //4(運行中)
            //7(下發(fā)給數據質量進行數據校檢)
            //8(正在進行分支條件校檢)
            //5(執(zhí)行失敗)
            //6(執(zhí)行成功)
            System.out.println("status: "+dataParam.getString("status"));
            // 訂閱到DataWorks的節(jié)點完成事件后觸發(fā)本地調度節(jié)點運行
            localScheduleEventService.triggerLocalNode(dataParam);
        }else{
            System.out.println("未能過濾其他事件,請檢查配置步驟");
        }
    }
}
                

DataWorks配置:創(chuàng)建HTTP觸發(fā)節(jié)點

三方系統(tǒng)任務運行成功后,需要通過DataWorks的RunTriggerNode觸發(fā)DataWorks的HTTP觸發(fā)節(jié)點開始運行,通過HTTP觸發(fā)節(jié)點進一步觸發(fā)下游節(jié)點運行,因此您需要根據實際業(yè)務需要創(chuàng)建HTTP觸發(fā)節(jié)點。

HTTP觸發(fā)節(jié)點的詳細介紹與新建步驟請參見HTTP觸發(fā)器節(jié)點

三方系統(tǒng)配置:開發(fā)觸發(fā)HTTP觸發(fā)節(jié)點運行的邏輯

三方系統(tǒng)通過RunTriggerNode觸發(fā)HTTP觸發(fā)節(jié)點運行的代碼示例如下。
  • HttpTriggerNodeService代碼實現demo。
    package com.aliyun.dataworks.services;
    
    import com.aliyuncs.dataworks_public.model.v20200518.RunTriggerNodeRequest;
    import com.aliyuncs.dataworks_public.model.v20200518.RunTriggerNodeResponse;
    import com.aliyuncs.exceptions.ClientException;
    import com.aliyuncs.exceptions.ServerException;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    /**
     * @author dataworks demo
     */
    @Service
    public class HttpTriggerNodeService {
    
        @Autowired
        private DataWorksOpenApiClient dataWorksOpenApiClient;
    
    
        /**
         * @return
         */
        public boolean triggerNode(Long appId, Long nodeId, Long bizDate, Long cycleTime) {
            try {
                RunTriggerNodeRequest runTriggerNodeRequest = new RunTriggerNodeRequest();
    
                // 設置NodeId,表示觸發(fā)式節(jié)點的節(jié)點ID,節(jié)點ID可通過ListNodes API查詢獲取
                runTriggerNodeRequest.setNodeId(nodeId);
    
                // 設置CycleTime,表示觸發(fā)式節(jié)點的任務的運行時間戳,需將HTTP觸發(fā)節(jié)點的調度配置中,節(jié)點指定的運行時間,換算為時間戳
                // 如果HTTP觸發(fā)節(jié)點所在的地域與調度系統(tǒng)所在的地域不在同個時區(qū),存在時差,這里需配置為觸發(fā)節(jié)點所在時區(qū)的時間。
                // 例如,HTTP觸發(fā)節(jié)點在北京地域且Cyctime為北京時間18:00,而調度系統(tǒng)在美西地域,此時調度系統(tǒng)配置時,需配置為北京時間18:00的時間戳。
                runTriggerNodeRequest.setCycleTime(cycleTime);
    
    
                // 設置BizDate,表示觸發(fā)式節(jié)點實例所在的業(yè)務日期時間戳,需將業(yè)務日期換算為時間戳。
                // 業(yè)務日期為運行時間的前一天,且時間精確到日,時分秒均為00000000。以運行日期為2020年11月25日為例,業(yè)務時間為2020112400000000,需將這個時間換算為業(yè)務日期的時間戳
                // 如果HTTP觸發(fā)節(jié)點所在的地域與調度系統(tǒng)所在的地域不在同個時區(qū),存在時差,這里需配置為觸發(fā)節(jié)點所在時區(qū)的時間。
                runTriggerNodeRequest.setBizDate(bizDate);
    
                // 設置AppId(appId=projectId),表示觸發(fā)式節(jié)點所屬的DataWorks工作空間ID,通過GetNode可以查詢到節(jié)點對應的projectId
                runTriggerNodeRequest.setAppId(appId);
    
                RunTriggerNodeResponse runTriggerNodeResponse = dataWorksOpenApiClient.createClient().getAcsResponse(runTriggerNodeRequest);
                System.out.println(runTriggerNodeResponse.getRequestId());
                return runTriggerNodeResponse.getSuccess();
            } catch (ServerException e) {
                e.printStackTrace();
            } catch (ClientException e) {
                e.printStackTrace();
                // 請求ID
                System.out.println(e.getRequestId());
                // 錯誤碼
                System.out.println(e.getErrCode());
                // 錯誤信息
                System.out.println(e.getErrMsg());
            }
            return false;
        }
    }
                            
  • 第三方調度的狀態(tài)機運行demo。
        @Scheduled(cron = "0 0/30 * * * ? ")
        public void schedule() {
            //TODO 模擬本地調度通過HTTP觸發(fā)器節(jié)點調起DataWorks的周期節(jié)點
            triggerDwScheduleNode();
        }
  • triggerDwScheduleNode方法的實現demo
    /**
         * 這里以觸發(fā)器節(jié)點為日調度頻率為例,介紹如何獲取調起觸發(fā)器節(jié)點的相關參數信息
         */
        public void triggerDwScheduleNode() {
            Date gmtDate = getTimeByZeroEnd(new Date());
            GregorianCalendar gc = (GregorianCalendar) GregorianCalendar.getInstance();
            gc.setTime(gmtDate);
            gc.add(Calendar.DATE, -1);
            Long bizDate = gc.getTimeInMillis();
            GetNodeResponse.Data node = nodeService.getNode(nodeId);
            if (node != null) {
                String bizDateStr = getTimeInExpress(gc.getTime(), "yyyy-MM-dd HH:mm:ss");
                ListInstancesResponse.Data instances = nodeService.getInstance(nodeId, node.getProjectId(), bizDateStr);
                if (!CollectionUtils.isEmpty(instances.getInstances())) {
                    ListInstancesResponse.Data.Instance instance = instances.getInstances().get(0);
                    httpTriggerNodeService.triggerNode(node.getProjectId(), node.getNodeId(), bizDate, instance.getCycTime());
                }
            }
    
        }

本地部署運行

下載工程:下載工程后,進入工程根目錄下執(zhí)行:
mvn clean package -Dmaven.test.skip=true spring-boot:repackage
獲得可直接運行的jar后執(zhí)行:
java -jar target/schedule-integration-demo-1.0.jar
完成后,在瀏覽器輸入http://localhost:8080/index會得到"hello world!",表示應用成功部署。