DataWorks提供內置的流程檢查,如任務發布前代碼評審、數據治理中心治理項的內置檢查項校驗,此外,DataWorks還支持您自定義校驗邏輯并接入DataWorks,實現DataWorks流程管控。本文以在提交與發布時校驗代碼中是否存在MAX_PT函數為例,為您介紹如何基于擴展程序實現工作空間中不允許使用特定函數。
場景說明
本實踐示例的場景為校驗提交發布的代碼中是否有MAX_PT函數,示例的校驗流程為:
序號 | 核心流程 | 核心功能點 |
1 | 將工作空間中的文件提交與發布消息(例如提交、發布節點)通過OpenEvent發布至EventBridge,后續通過EventBridge過濾事件消息,并將消息發送至您的服務。 | 開啟消息訂閱的時候,通過事件規則指定訂閱的事件類型為文件提交與發布(dataworks:FileChange:CommitFile和dataworks:FileChange:DeployFile) 配置詳情請參見最佳實踐:(高級特性應用)禁止使用MAX_PT函數。 |
2 | 本地或在線服務接收消息,通過對擴展程序的設置,實現:
|
配置詳情請參見最佳實踐:(高級特性應用)禁止使用MAX_PT函數。 |
前提條件
操作步驟
步驟一:配置自定義總線
登錄事件總線EventBridge控制臺,單擊左側導航欄事件總線,進入事件總線創建頁面。
單擊按鈕,創建自定義事件總線。
在總線模塊內配置完自定義事件總線名稱后,單擊下一步,進入對事件源配置中。
單擊跳過,跳過事件源、規則、目標模塊的配置。
單擊左側導航欄事件總線進入事件總線創建頁面,找到已創建的事件總線后,單擊名稱,進入事件總線概覽頁面。
單擊左側導航欄事件規則進入事件規則頁面后,單擊創建規則新建事件規則。
本實踐定義該EventBridge自定義總線可接收DataWorks文件提交事件消息和文件發布事件消息,配置demo與核心參數如下。
配置基本信息:自定義規則名稱即可。
實踐模式內容。
事件源類型:選擇自定義事件源。
事件源:不進行配置。
模式內容:以JSON格式編寫,配置內容如下。
{ "source": [ "acs.dataworks" ], "type": [ "dataworks:FileChange:CommitFile", "dataworks:FileChange:DeployFile" ] }
source:定義事件的產品名稱標識acs.dataworks。
type:定義產品下事件的類型標識,配置為dataworks:FileChange:CommitFile、dataworks:FileChange:DeployFile。
事件模式調試:將source、type取值進行補充修改,然后進行事件測試,測試成功后單擊下一步。
配置事件目標。
服務類型:選擇HTTPS或HTTP,更多服務類型可參見管理事件規則。
URL:填寫接收自定義總線推送信息的URL,例如
https://服務器地址:端口號/extensions/consumer
。Body:選擇完整事件。
網絡類型:選擇公網。
步驟二:配置事件分發通道
進入開放平臺頁面。
登錄DataWorks控制臺,切換至目標地域后,單擊左側導航欄的 ,進入開放平臺的開發者后臺頁面。
在開發者后臺頁面,單擊左側導航欄OpenEvent,進入頁面后,單擊添加事件分發通道,在彈窗內進行配置。
要分發事件的工作空間:選擇已創建空間。
指定分發到EventBridge中自定義總線:選擇步驟一創建的事件總線。
保存事件分發通道后,在事件分發通道的操作列,單擊啟用按鈕,啟用新建的事件分發通道。
步驟三:注冊并配置擴展程序(Extensions)
進入開放平臺頁面。
登錄DataWorks控制臺,切換至目標地域后,單擊左側導航欄的 ,進入開放平臺的開發者后臺頁面。
在開發者后臺頁面,單擊左導航欄擴展程序,進入頁面后,單擊注冊擴展程序,在彈窗內進行配置。
部署方式選擇:選擇通過自建服務部署。
注冊擴展程序
擴展程序名稱:自定義配置。
處理的擴展點:選擇文件提交前置事件、文件發布前置事件。
測試用工作空間:配置后,可在擴展性程序創建好,但并未提交前針對該空間內進行擴展程序測試。
擴展程序參數配置
您可通過本參數的配置來控制擴展程序的生效范圍,在指定工作空間下:
提交節點(文件提交前置事件)時不觸發擴展程序校驗、不阻塞提交節點的流程。
發布節點(文件發布前置事件)時觸發擴展程序校驗,如果不滿足校驗通過條件會阻塞發布節點的流程。
需配置為
extension.project.commit-file.disabled=YourProjectId
。其中YourProjectId需替換為擴展程序不生效的擴展點事件所在的工作空間ID。擴展程序選項配置:用于設置不滿足校驗條件后的響應方式,以下示例中指定的相應方式包括告警和禁用。
{ "type":"object", "properties":{ "checkStatus":{ "type":"number", "title":"MAX-PT函數檢查方式", "x-decorator":"FormItem", "x-component":"Radio.Group", "x-decorator-props":{ "tooltip":"描述文件" }, "x-component-props":{ "dataSource":[ { "value":"WARN", "label":"告警" }, { "value":"FAIL", "label":"禁用" } ], "mode":"multiple" } } } }
配置完成以上內容后,單擊確定保存注冊擴展程序。
在已創建好的擴展程序的操作列,單擊提交,進入審核狀態。
說明擴展程序審核由DataWorks平臺審核,審核事件通常在T+3個工作日完成,請耐心等待。
如需要測試創建的擴展程序,需要配置測試用工作空間。
審核通過后,單擊操作列的上線,即可上線使用該擴展程序。
單擊擴展程序管理按鈕,進入
頁面,選擇創建的擴展程序,在啟用列啟用該擴展程序,并單擊設置工作空間對MAX_PT函數使用的管控力度。
開發配置擴展程序
事件總線EventBridge通過HTTP請求獲取DataWorks發送的JSON格式事件,解析消息,并推送至目標服務程序上,對事進行處理后,返回結果至DataWorks中。
示例代碼
該代碼示例是通過調用UpdateIDEEventResultAPI,指定API中的messageId
參數來獲取詳細的事件詳情后進行邏輯判斷是否包含限制函數,判斷結果使用UpdateIDEEventResult這個API將判斷結果返回至DataWorks中。詳情請參見開發擴展程序。
環境構建:Java8,Maven構建工具。
package com.aliyun.dataworks.demo;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.dataworks.config.Constants;
import com.aliyun.dataworks.config.EventCheckEnum;
import com.aliyun.dataworks.config.ExtensionParamProperties;
import com.aliyun.dataworks.services.DataWorksOpenApiClient;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.dataworks_public.model.v20200518.*;
import com.aliyuncs.exceptions.ClientException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
/**
* @author dataworks demo
*/
@RestController
@RequestMapping("/extensions")
public class ExtensionsController {
@Autowired(required = false)
private DataWorksOpenApiClient dataWorksOpenApiClient;
@Autowired
private ExtensionParamProperties extensionParamProperties;
/**
* 接收eventBridge推送過來的消息
* @param jsonParam
*/
@PostMapping("/consumer")
public void consumerEventBridge(@RequestBody String jsonParam){
JSONObject jsonObj = JSON.parseObject(jsonParam);
String eventCode = jsonObj.getString(Constants.EVENT_CODE_FILED);
if(Constants.COMMIT_FILE_EVENT_CODE.equals(eventCode) || Constants.DEPLOY_FILE_EVENT_CODE.equals(eventCode)){
//初始化client
IAcsClient client = dataWorksOpenApiClient.createClient();
try {
//當前事件參數信息
String messageId = jsonObj.getString("id");
JSONObject data = jsonObj.getObject("data", JSONObject.class);
Long projectId = data.getLong("projectId");
//初始化事件回調
UpdateIDEEventResultRequest updateIDEEventResultRequest = new UpdateIDEEventResultRequest();
updateIDEEventResultRequest.setMessageId(messageId);
updateIDEEventResultRequest.setExtensionCode(extensionParamProperties.getExtensionCode());
//查詢觸發擴展點事件時的擴展點數據快照
GetIDEEventDetailRequest getIDEEventDetailRequest = new GetIDEEventDetailRequest();
getIDEEventDetailRequest.setMessageId(messageId);
getIDEEventDetailRequest.setProjectId(projectId);
GetIDEEventDetailResponse getIDEEventDetailResponse = client.getAcsResponse(getIDEEventDetailRequest);
String content = getIDEEventDetailResponse.getEventDetail().getCommittedFile().getContent();
//判斷代碼是否包含限制函數
if(content.contains(Constants.CHECK_CODE)){
//獲取擴展程序選項配置在項目空間下的配置
GetOptionValueForProjectRequest getOptionValueForProjectRequest = new GetOptionValueForProjectRequest();
getOptionValueForProjectRequest.setProjectId(String.valueOf(projectId));
getOptionValueForProjectRequest.setExtensionCode(extensionParamProperties.getExtensionCode());
GetOptionValueForProjectResponse getOptionValueForProjectResponse = client.getAcsResponse(getOptionValueForProjectRequest);
JSONObject jsonObject = JSON.parseObject(getOptionValueForProjectResponse.getOptionValue());
//注意:這里需根據在DataWorks上實際設置格式來填寫
String checkStatus = jsonObject.getString("checkStatus");
updateIDEEventResultRequest.setCheckResult(checkStatus);
updateIDEEventResultRequest.setCheckResultTip("代碼中存在限制函數");
}else{//成功回調
updateIDEEventResultRequest.setCheckResult(EventCheckEnum.OK.getCode());
updateIDEEventResultRequest.setCheckResultTip(EventCheckEnum.OK.getName());
}
//回調DataWorks
UpdateIDEEventResultResponse acsResponse = client.getAcsResponse(updateIDEEventResultRequest);
//請求的唯一標識,用于后續錯誤排查使用
System.out.println("acsResponse:" + acsResponse.getRequestId());
} catch (ClientException e) {
//請求的唯一標識,用于后續錯誤排查使用
System.out.println("RequestId:" + e.getRequestId());
//錯誤狀態碼
System.out.println("ErrCode:" + e.getErrCode());
//錯誤描述信息
System.out.println("ErrMsg:" + e.getErrMsg());
}
}else{
System.out.println("未能過濾其他事件,請檢查配置步驟");
}
}
}
示例工程部署
準備環境與工程
依賴環境:java8及以上,maven構建工具。
部署方式
本地部署:將工程文件打成jar包后,在本地環境中已部署Java8和Maven工具的本地服務器或Windows上進行
java -jar yourapp.jar
運行服務程序。云平臺部署:將工程文件打成jar包后,上傳至相應的運行環境例如:Docker容器、云服務器等進行部署。
說明部署完成的服務,需要保證EventBridge可通過公網訪問到該服務。
下載工程后,進入工程根目錄下執行打包命令,將工程項目打成jar包。
mvn clean package -Dmaven.test.skip=true spring-boot:repackage
執行jar包:
java -jar target/extensions-demo-maxpt-1.0.jar
此時會成功啟動工程,如下圖所示:在瀏覽器輸入
http://localhost:8080/index
會得到"hello world!"
,表示應用成功部署,打通網絡后即可訂閱EventBridge的消息了。
結果驗證
完成代碼部署與打通EventBridge的網絡后,您可以在開啟擴展程序的空間內進行驗證。
驗證步驟
在數據開發頁面創建節點,在節點內編輯被禁止的
MAX_PT
函數保存并提交。單擊發布按鈕,進入創建發布包頁面對該節點直接進行發布,則會觸發擴展程序校驗。
說明根據擴展程序配置,并不會對指定工作空間的代碼文件提交事件生效,所以在提交包含
MAX_PT
函數的節點時不會觸發擴展校驗程序,而是在對包含MAX_PT
函數節點進行發布時,觸發擴展程序校驗。
下圖以在該擴展程序的參數配置中,配置的提交擴展點黑名單生效項目,進行提交并發布包含MAX_PT函數的節點,觀察并驗證擴展程序校驗流程,以及擴展點黑名單是否生效。