DataWorks提供內置的流程檢查,如任務發布前代碼評審、數據治理中心治理項的內置檢查項校驗,此外,DataWorks還支持您自定義校驗邏輯并接入DataWorks,實現DataWorks流程管控。本文以在運維中心的獲取實例狀態變更為例,為您介紹如何基于開放消息實現訂閱實例狀態變更。
背景信息
本實踐涉及的開放平臺的相關功能介紹與基本概念可參見OpenEvent概述。
開啟并配置消息訂閱(OpenEvent)
開啟并配置消息訂閱的詳細步驟請參見開啟消息訂閱,以下為本實踐中的核心配置流程與注意事項。
- 在EvenBridge控制臺,跳過事件源等配置,快速創建一個自定義總線。
- 在EvenBridge控制臺對應的事件總線中,創建事件規則。本實踐定義該EventBridge自定義總線可接收DataWorks實例狀態變更消息,配置demo與核心參數配置如下。
- 配置事件模式。
{ "source": [ "acs.dataworks" ], "type": [ "dataworks:InstanceStatusChanges:InstanceStatusChanges" ] }
- source:定義事件的產品名稱標識,配置為acs.dataworks。
- type:定義產品下事件的類型標識,配置為dataworks:InstanceStatusChanges:InstanceStatusChanges。您可以在下方的事件模式調試中,將source、type取值進行補充修改,然后進行事件測試,測試成功后單擊下一步。
- 配置事件目標中,服務類型選擇為HTTPS,并填寫合適的URL,其他參數可保持默認。
- 配置事件模式。
- 在DataWorks控制臺的開放平臺頁面,啟用上述消息分發通道。
代碼編寫
package com.aliyun.dataworks.demo;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.dataworks.config.Constants;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author dataworks demo
*/
@RestController
@RequestMapping("/event")
public class ExtensionsController {
/**
* 接收eventBridge推送過來的消息
* @param jsonParam
*/
@PostMapping("/consumer")
public void consumerEventBridge(@RequestBody String jsonParam){
JSONObject jsonObj = JSON.parseObject(jsonParam);
String eventCode = jsonObj.getString(Constants.EVENT_CODE_FILED);
if(Constants.INSTANCE_STATUS_EVENT_CODE.equals(eventCode)){
JSONObject dataParam = JSON.parseObject(jsonObj.getString("data"));
//調度任務實例開始等時間的具體時間
System.out.println("beginWaitTimeTime: "+ dataParam.getString("beginWaitTimeTime"));
//DagId
System.out.println("dagId: "+ dataParam.getString("dagId"));
//Dag的類型,取值如下:
//0:周期調度任務
//1:手動任務
//2:冒煙測試
//3:補數據
//4:手動業務流程
//5:臨時業務流程
System.out.println("dagType: "+dataParam.getString("dagType"));
//任務實例的調度類型,取值如下:
//NORMAL(0):正常調度任務。該任務被日常調度。
//MANUAL(1):手動任務。該任務不會被日常調度。
//PAUSE(2):凍結任務。該任務被日常調度,但啟動調度時直接被置為失敗狀態。
//SKIP(3):空跑任務。該任務被日常調度,但啟動調度時直接被置為成功狀態。
//SKIP_UNCHOOSE(4):臨時工作流中未選擇的任務,僅存在于臨時工作流中,啟動調度時直接被置為成功狀態。
//SKIP_CYCLE(5):未到運行周期的周或月任務。該任務被日常調度,但啟動調度時直接被置為成功狀態。
//CONDITION_UNCHOOSE(6):上游實例中有分支(IF)節點,但是該下游節點未被分支節點選中,直接置為空跑任務。
//REALTIME_DEPRECATED(7):實時生成的已經過期的周期實例,該類型的任務直接被置為成功狀態。
System.out.println("taskType: "+dataParam.getString("taskType"));
//任務實例的修改時間
System.out.println("modifyTime: "+dataParam.getString("modifyTime"));
//任務實例的創建時間
System.out.println("createTime: "+dataParam.getString("createTime"));
//工作空間的ID。您可以調用ListProjects查看空間ID信息。
System.out.println("appId: "+dataParam.getString("appId"));
//調度任務實例所在工作空間的租戶ID
System.out.println("tenantId: "+dataParam.getString("tenantId"));
//調度任務實例的操作碼:該字段可忽略
System.out.println("opCode: "+dataParam.getString("opCode"));
//業務流程的ID,周期調度任務實例的業務流程默認為1,手動業務流程和內部工作流調度任務實例為實際的業務流程ID
System.out.println("flowId: "+dataParam.getString("flowId"));
//調度任務實例對應的節點ID
System.out.println("nodeId:"+dataParam.getString("nodeId"));
//調度任務實例開始等資源的具體時間
System.out.println("beginWaitResTime: "+dataParam.getString("beginWaitResTime"));
//調度任務實例ID
System.out.println("taskId: "+dataParam.getString("taskId"));
//任務的狀態,取值如下:
//0(未運行)
//2(等待定時時間dueTime或cycleTime到來)
//3(等待資源)
//4(運行中)
//7(下發給數據質量進行數據校檢)
//8(正在進行分支條件校檢)
//5(執行失敗)
//6(執行成功)
System.out.println("status: "+dataParam.getString("status"));
}else{
System.out.println("未能過濾其他事件,請檢查配置步驟");
}
}
}
本地部署運行
下載工程:
- 依賴環境:java8及以上,maven構建工具。
- 工程下載鏈接:event-demo-instance-status.zip。
mvn clean package -Dmaven.test.skip=true spring-boot:repackage
獲得可直接運行的jar后執行:java -jar target/event-demo-instance-status-1.0.jar
此時會成功啟動工程,如下圖所示:在瀏覽器輸入http://localhost:8080/index
會得到"hello world!"
,表示應用成功部署,打通網絡后即可訂閱EventBridge的消息了。