在該階段,DataWorks為您提供了數據違規下載實時阻斷及審批、數據違規流轉準實時告警等場景的最佳實踐,幫助企業做好數據安全的持續運營。
DataWorks可基于用戶行為實時事件、實時審計日志進行分析,幫助您實時發現風險行為并及時響應。具體應用場景請參見下文。
場景一:數據違規下載實時阻斷及審批
數據下載是企業風險治理的重中之重。通常,企業數據開發人員、分析人員只允許在數據平臺上瀏覽及使用數據,不允許將明細數據下載至本地進行分析。數據導出到本地后將無法審計其使用行為,若使用不當或遇到別有用心者,將導致數據被濫用、泄露,嚴重則可能產生數安事件及風險輿情。
不同企業對下載行為風控規則的定義存在差異,本文將以“實時阻斷或審批超過1000條數據的下載行為”為例進行展示。
實現原理
DataWorks的OpenEvent為您提供消息推送訂閱的能力,同時,您可將服務程序注冊為DataWorks的擴展程序,通過擴展程序來卡點并響應訂閱的事件消息,實現通過擴展程序對特定事件進行消息通知與流程管控。詳情請參見擴展程序概述。
操作步驟
由于“查詢結果下載”為非工作空間內的操作,因此本案例將使用Default總線來承接操作事件消息。
說明如需通過RAM子賬號/RAM角色身份讀取Default總線中的事件,請提前進行授權。
查詢結果下載的事件名稱為“dataworks:ResourcesDownload:DownloadResources”,可參考下圖查看詳情。
{ "datacontenttype": "application/json;charset=utf-8", "aliyunaccountid": "1107550004253538", "aliyunpublishtime": "2023-12-05T07:25:31.708Z", "data": { "eventCode": "download-resources", "extensionBizId": "audit_4d7ebb42b805428483148295a97a8404", "extensionBizName": "DataWorks_IDE_Query_20231205152530.csv", "requestId": "77cac0c2fc12cecbf1d289128897cc7b@@ac15054317017611303051804e8b43", "appId": 3159, "tenantId": 524257424564736, "blockBusiness": true, "eventBody": { "sqlText": "SELECT * FROM table_1", // 查詢SQL "queryDwProjectId": "3159", // 查詢數據源所在的工作空間ID "moduleType": "develop_query", // 下載來源:develop_query(數據開發查詢)/sqlx_query(數據分析查詢)/dw_excel(數據分析電子表格) "operatorBaseId": "1107550004253538", //操作者的UID "datasourceId": "18889", //查詢的數據源ID "queryDwProjectName": "yongxunQA_emr_chengdu1", // 查詢數據源所在的工作空間名稱 "dataRowSize": 4577, // 下載的數據量 "datasourceName": "odps_source", // 查詢的數據源名稱 "operatorUid": "1107550004253538" }, "operator": "1107550004253538" }, "aliyunoriginalaccountid": "1107550004253538", "specversion": "1.0", "aliyuneventbusname": "default", "id": "169d171c-d523-4370-a874-bb0fa083194d", "source": "acs.dataworks", "time": "2023-12-05T15:25:31.588Z", "aliyunregionid": "cn-chengdu", "type": "dataworks:ResourcesDownload:DownloadResources" }
- 說明
實際使用時,請結合上一步消息事件內容進行風險判斷。
處理的擴展點選擇“數據下載前置事件”。
package com.aliyun.dataworks.demo; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.aliyun.dataworks.config.Constants; import com.aliyun.dataworks.config.EventCheckEnum; import com.aliyun.dataworks.config.ExtensionParamProperties; import com.aliyun.dataworks.services.DataWorksOpenApiClient; import com.aliyuncs.IAcsClient; import com.aliyuncs.dataworks_public.model.v20200518.*; import com.aliyuncs.exceptions.ClientException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; /** * @author dataworks demo */ @RestController @RequestMapping("/extensions") public class ExtensionsController { @Autowired(required = false) private DataWorksOpenApiClient dataWorksOpenApiClient; @Autowired private ExtensionParamProperties extensionParamProperties; /** * 接收eventBridge推送過來的消息 * @param jsonParam */ @PostMapping("/consumer") public void consumerEventBridge(@RequestBody String jsonParam){ JSONObject jsonObj = JSON.parseObject(jsonParam); String eventCode = jsonObj.getString(Constants.EVENT_CODE_FILED); if(Constants.COMMIT_FILE_EVENT_CODE.equals(eventCode) || Constants.DEPLOY_FILE_EVENT_CODE.equals(eventCode)){ //初始化client IAcsClient client = dataWorksOpenApiClient.createClient(); try { //當前事件參數信息 String messageId = jsonObj.getString("id"); JSONObject data = jsonObj.getObject("data", JSONObject.class); // Long projectId = data.getLong("appId"); //初始化事件回調 CallbackExtensionRequest callbackExtensionRequest = new CallbackExtensionRequest(); callbackExtensionRequest.setMessageId(messageId); callbackExtensionRequest.setExtensionCode(extensionParamProperties.getExtensionCode()); JSONObject eventBody = data.getJSONObject("eventBody"); Long dataRowSize = eventBody.getLong("dataRowSize"); //獲取擴展程序選項配置在項目空間下的配置 GetOptionValueForProjectRequest getOptionValueForProjectRequest = new GetOptionValueForProjectRequest(); //全局擴展點事件的配置信息所屬projectId默認為-1 getOptionValueForProjectRequest.setProjectId("-1"); getOptionValueForProjectRequest.setExtensionCode(extensionParamProperties.getExtensionCode()); GetOptionValueForProjectResponse getOptionValueForProjectResponse = client.getAcsResponse(getOptionValueForProjectRequest); JSONObject jsonObject = JSON.parseObject(getOptionValueForProjectResponse.getOptionValue()); //這里需根據在DataWorks上實際設置格式來填寫 Long maxDataRowSize = jsonObject.getLong("dataRowSize"); //判斷代碼是否包含限制函數 if(dataRowSize > 1000){ callbackExtensionRequest.setCheckResult(EventCheckEnum.FAIL.getCode()); callbackExtensionRequest.setCheckMessage("下載的行數超過限制數"); }else{//成功回調 callbackExtensionRequest.setCheckResult(EventCheckEnum.OK.getCode()); } //回調DataWorks CallbackExtensionResponse acsResponse = client.getAcsResponse(callbackExtensionRequest); //請求的唯一標識,用于后續錯誤排查使用 System.out.println("acsResponse:" + acsResponse.getRequestId()); } catch (ClientException e) { //請求的唯一標識,用于后續錯誤排查使用 System.out.println("RequestId:" + e.getRequestId()); //錯誤狀態碼 System.out.println("ErrCode:" + e.getErrCode()); //錯誤描述信息 System.out.println("ErrMsg:" + e.getErrMsg()); } }else{ System.out.println("未能過濾其他事件,請檢查配置步驟"); } } }
配置風險響應規則。
進入
,為上述已發布上線的擴展程序配置風險響應,如需審批,則可添加已創建的審批流程。說明在響應配置時:
如需實現“審批”,則需保證擴展程序在識別到用戶風險行為時callbackExtensionRequest.setCheckResult()返回“WARN”。
如需實現“阻斷”,則callbackExtensionRequest.setCheckResult()應返回“FAIL”。
開啟擴展程序。
結果驗證
在數據開發、數據分析模塊單擊下載數據,將跳轉至數據下載頁面進行風險檢測。
根據檢測結果進行后續處理。
若檢測通過,則可繼續下載。
若檢測不通過,則下載被阻斷,或告知用戶需申請權限。
下載被阻斷。
提示用戶申請權限。
場景二:數據違規流轉準實時告警
開發者在開發環境中使用數倉數據構建應用時,可能存在將ODS層數據同步至分析層項目空間、將分析層數據同步至數據倉庫之外的存儲介質(業務數據庫或對象存儲)等情況,此類行為極易造成數倉分析層數據可用性、完整性受損,或明細數據泄露。管理員可通過監控DataWorks操作審計日志及時發現此類行為,步驟如下:
登錄至操作行為審計ActionTrail控制臺。
查看運行數據集成任務ExecuteFile事件的審計報文。
報文中,Reader為odps,但Write為Mysql,表示該同步任務的源端為MaxCompute,目的端為Mysql。
{ "eventId": "0bc1746317041210600372496e6191", "eventVersion": 1, "eventSource": "dataworks.aliyuncs.com", "requestParameters": { "codeContent": { "transform": false, "type": "job", "version": "2.0", "steps": [ { "stepType": "odps", "copies": 1, "parameter": { "partition": [ "pt=${bizdate}" ], "envType": 0, "datasource": "0_odps_xc_DPE_E2_engine", "tunnelQuota": "default", "isSupportThreeModel": false, "column": [ "spcode", "hjstatus", "sncode", "spname", "id", "spcard", "spkhrq", "spperm", "spgz", "spmfact", "spmfactzg", "spjym", "dwbfye", "grbfye", "spmend", "bchjny" ], "tableComment": "智慧城市人口財產主題分析-公積金信息數據", "table": "ods_t_ss_persons_delta_d" }, "name": "Reader", "gui": { "x": 100, "y": 100 }, "category": "reader" }, { "stepType": "mysql", "copies": 1, "parameter": { "postSql": [], "envType": 0, "datasource": "smartcity", "column": [ "spcode", "hjstatus", "sncode", "spname", "id", "spcard", "spkhrq", "spperm", "spgz", "spmfact", "spmfactzg", "spjym", "dwbfye", "grbfye", "spmend", "bchjny" ], "tableComment": "", "writeMode": "insert", "batchSize": 256, "table": "t_ss_persons_delta", "preSql": [] }, "name": "Writer", "gui": { "x": 100, "y": 200 }, "category": "writer" }, { "copies": 1, "parameter": { "nodes": [], "edges": [], "groups": [], "version": "2.0" }, "name": "Processor", "gui": { "x": 100, "y": 300 }, "category": "processor" } ], "order": { "hops": [ { "from": "Reader", "gui": { "sourceAnchor": 1, "targetAnchor": 0 }, "to": "Writer" } ] }, "setting": { "errorLimit": { "record": "" }, "speed": { "throttle": false, "concurrent": 2 } } } }, "sourceIpAddress": "58.100.XXX.XXX, 11.193.XXX.XX", "userAgent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.X.X Safari/537.36", "eventRW": "Write", "eventType": "ConsoleOperation", "referencedResources": { "ACS::DataWorks::File": [ "502990524" ], "ACS::DataWorks::Project": [ "94864" ] }, "userIdentity": { "sessionContext": { "attributes": { "mfaAuthenticated": "false", "creationDate": "1704119017735" } }, "accountId": "1912232488744735", "principalId": "1912232488744735", "type": "root-account", "userName": "root" }, "serviceName": "DataWorks", "additionalEventData": { "CallerBid": "26842" }, "requestId": "0bc1746317041210600372496e6191", "eventTime": "2024-01-01T14:57:42Z", "isGlobal": false, "acsRegion": "cn-shanghai", "eventName": "ExecuteFile" }
配置告警規則。
方式一:通過日志服務進行告警。
通過特定語法配置告警規則“當Reader為odps且Write不為odps時則告警”,實現事后告警。配置詳情請參見創建自定義告警規則。
語法示例參考如下。
event.eventName: ExecuteFile and event.serviceName: DataWorks | select * from (select json_extract_scalar(json_parse("event.requestParameterJson"), '$.codeContent.steps.0.stepType') as src, json_extract_scalar(json_parse("event.requestParameterJson"), '$.codeContent.steps.1.stepType') as dst from log) where src='odps' and dst != 'odps'
方式二:通過自建告警系統進行告警。
通過ActionTrail OpenAPI(LookupEvents)將審計事件明細內容拉取至本地解析,若審計報文內容符合“當Reader為odps且Write不為odps時則告警”規則,則可通過自建告警系統發出告警。
其他典型風險運營場景示例
DataWorks上可能出現的其他風險操作、風險場景、建議響應方式請參見下表。
相關操作如需實現審批、事中攔截,請提交工單、反饋客戶經理或通過釘釘群聯系產品側進行評估。
模塊 | 操作名稱 | 風險場景示例 | 響應方式 | 實現方式 |
數據集成 | 保存數據集成任務 | 禁止將高安全級別區域的數據源置為源端,將低安全級別的數據源置為目的端。 禁止將貼源層數據源置為源端,將非貼源層數據源置為目的端。 禁止將境內數據源置為源端,將境外數據源置為目的端。 | (1)事后告警 | (1)事后告警:通過解析操作審計事件DIUpdateDataxJob、DISaveSolution來識別風險。 |
提交數據集成任務 | 禁止將高安全級別區域的數據源置為源端,將低安全級別的數據源置為目的端。 禁止將貼源層數據源置為源端,將非貼源層數據源置為目的端。 禁止將境內數據源置為源端,將境外數據源置為目的端。 | (1)事后告警 (2)事中攔截 | (1)事后告警:通過解析操作審計事件SubmitFile,并結合OPEN API"GetFile"獲取節點信息來識別風險。 (2)事中攔截:構建“文件提交”擴展程序,并結合對應開放事件及OPEN API"GetFile"獲取節點信息來識別風險、攔截操作。 | |
運行數據集成任務 | 不允許將高安全級別區域的數據源置為源端,將低安全級別的數據源置為目的端。 不允許將貼源層數據源置為源端,將費源層數據源置為目的端。 | (1)事后告警 (2)事中攔截 | (1)事后告警:通過解析操作審計事件ExecuteFile來識別風險。 (2)事中攔截:構建“節點運行”擴展程序,并結合對應開放事件及OPEN API"GetFile"獲取節點信息來識別風險、攔截操作。 | |
數據開發 | 提交文件 | 不允許提交包含Select * 語句的任務。 不允許提交無Insert關鍵字的Select語句任務 。 禁止提交DROP、TRUNCATE、PURGE相關的DDL語句。 禁止提交直接輸出(敏感)字段的語句。 | (1)事后告警 (2)事中攔截 | (1)事后告警:通過解析操作審計事件SubmitFile,并結合OPEN API"GetFile"獲取節點信息來識別風險。 (2)事中攔截:構建“文件提交”擴展程序,并結合對應開放事件及"GetFile"OPEN API獲取節點信息來識別風險、攔截操作。 |
發布文件 | 不允許發布包含Select * 語句的任務。 不允發布交無Insert關鍵字的Select語句任務 。 不允許同時擁有開發、運維角色人員將自己開發的任務直接發布到生產環境。 | (1)事后告警 (2)事中攔截 | (1)事后告警:通過解析操作審計事件DeployFile,并結合OPEN API"GetFile"獲取節點信息來識別風險。 (2)事中攔截:構建“文件發布”擴展程序,并結合對應開放事件及OPEN API"GetFile/ListProjectMembers"獲取節點與成員信息來識別風險、攔截操作。 | |
運行代碼 | 不允許直接輸出查詢敏感級別大于X級別的明細數據。 禁止對生成環境執行DROP、TRUNCATE、PURGE相關的DDL語句。 | (1)事后告警 (2)事中攔截 | (1)事后告警:通過解析操作審計事件ExecuteFile來識別風險。 (2)事中攔截:構建“節點運行”擴展程序,并結合對應開放事件及OPEN API"GetFile"獲取節點信息來識別風險、攔截操作。 | |
刪除文件 | 不允許刪除以“X”字符串開通的任務。 不允許刪除核心業務流程中的節點。 | (1)事后告警 (2)事中攔截 | (1)事后告警:通過解析操作審計事件DeleteFile,并結合OPEN API"GetFile"獲取節點信息來識別風險。 (2)事中攔截:構建“文件刪除”擴展程序,并結合對應開放事件及OPEN API"GetFile"獲取節點信息來識別風險、攔截操作。 | |
表提交 | 不允許執行“提交表”操作來修改表結構。 | (1)事后告警 (2)事中攔截 | (1)事后告警:通過開啟并消費“表提交”開放事件來識別風險。 (2)事中攔截:構建“表提交”擴展程序,并結合對應開放事件來識別風險、攔截操作。 | |
表發布 | 不允許直接將表發布至數倉ADS層。 | (1)事后告警 (2)事中攔截 | (1)事后告警:通過開啟并消費“表發布”開放事件來識別風險。 (2)事中攔截:構建“表發布”擴展程序,并結合對應開放事件來識別風險、攔截操作。 | |
偷鎖 | 不允許開發人員偷鎖他人任務的鎖,惡意修改他人代碼。 | (1)事后告警 | (1)事后告警:通過解析操作審計事件LockFile來識別風險。 | |
查看數據(查詢結果) | 不允許查看明細敏感數據。 | (1)事后告警 | (1)事后告警:通過解析操作審計事件ReadExecutionResults來識別風險。 | |
復制數據(查詢結果) | 不允許復制明細敏感數據。 | (1)事后告警 | (1)事后告警:通過解析操作審計事件CopyTaskResult來識別風險。 | |
下載數據(查詢結果) | 不允許一次性下載超過1000條數據。 按人員所屬部門(工作空間)判斷其是否允許下載數據。 當查詢SQL中包含敏感字段時阻斷下載。 分段風控,當下載條數超過2W條時需要審批,超過5W時條則阻斷。 針對空間角色定義下載條數。例如,開發角色允許下載N條,超過則阻斷;分析師角色允許下載M條,超過則阻斷。 需針對數據開發、數據分析場景分別設置不同的下載條數策略。 | (1)事后告警 (2)事中攔截或審批 | (1)事后告警:通過解析操作審計事件DownloadExecutionResult來識別風險。 (2)事中攔截或審批:構建“數據下載”擴展程序,并結合對應開放事件及OPEN API"ListProjectMembers"獲取節點信息來識別風險、攔截&審批操作。 | |
運維中心 | 暫停&凍結&下線任務 | 不允許暫停&凍結&下線某條核心業務線的任務。 | (1)事后告警 (2)事中攔截 | (1)事后告警:通過解析操作審計事件OfflineNode、StopInstance、SuspendInstance來識別風險。 (2)事中攔截:構建“節點凍結、節點下線”擴展程序,并結合對應開放事件來識別風險、攔截操作。 |
補數據 | 不允許一次性補數據超過N個節點。 | (1)事后告警 (2)事中攔截 | (1)事后告警:通過解析操作審計事件RunCycleDagNodes來識別風險。 (2)事中攔截:構建“補數據”擴展程序,并結合對應開放事件來識別風險、攔截操作。 | |
觸發式任務運行 | 所運行的任務不允許包含Select * 語句的任務; 所運行的任務不允許包含Insert關鍵字的Select語句任務 。 | (1)事后告警 | (1)事后告警:通過解析操作審計事件RunTriggerNode,并結合OPEN API"GetFile"獲取節點信息來識別風險。 | |
基線增刪改 | 不允許直接增刪改基線優先級,避免高優先級任務無法準時產出。 | (1)事后告警 | (1)事后告警:通過解析操作審計事件UpdateBaseline、DeleteBaseline,并結合OPEN API"GetBaseline"獲取基線信息來識別風險。 | |
數據質量 | 設為弱規則 | 不允許直接將與某些核心任務掛鉤的數據質量規則設置為弱規則,避免臟數據蔓延而無法直接阻止。 | (1)事后告警 | (1)事后告警:通過解析操作審計事件UpdateQualityRule,并結合OPEN API"GetQualityRule"獲取質量規則信息來識別風險。 |
刪除規則 | 不允許直接刪除與某些核心任務掛鉤的數據質量規則,避免臟數據蔓延而無法直接阻止。 | (1)事后告警 | (1)事后告警:通過解析操作審計事件DeleteQualityRule,并結合OPEN API"GetQualityRule"獲取質量規則信息來識別風險。 | |
停止規則 | 不允許直接停止與某些核心任務掛鉤的數據質量規則,避免臟數據蔓延而無法直接阻止。 | (1)事后告警 | (1)事后告警:通過解析操作審計事件UpdateQualityRule,并結合OPEN API"GetQualityRule"獲取質量規則信息來識別風險。 | |
數據分析 | SQL查詢 | 不允許直接輸出查詢敏感級別大于X級別的明細數據。 禁止對生成環境執行DROP、TRUNCATE、PURGE相關的DDL語句。 | (1)事后告警 | (1)事后告警:通過解析操作審計事件RunTask來識別風險。 |
查看數據(查詢結果) | 不允許查看明細敏感數據。 | (1)事后告警 | (1)事后告警:通過解析操作審計事件ReadExecutionResults來識別風險。 | |
復制數據(查詢結果/電子表格) | 不允許復制明細敏感數據。 | (1)事后告警 | (1)事后告警:通過解析操作審計事件CopyTaskResult/CopySheetContent來識別風險。 | |
下載數據(查詢結果/電子表格) | 不允許一次性下載超過1000條數據。 按人員所屬部門(工作空間)判斷其是否允許下載數據。 當查詢SQL中包含敏感字段時阻斷下載。 分段風控,當下載條數超過2W條時需要審批,超過5W時條則阻斷。 針對空間角色定義下載條數。例如,開發角色允許下載N條,超過則阻斷;分析師角色允許下載M條,超過則阻斷。 需針對數據開發、數據分析場景分別設置不同的下載條數策略。 | (1)事后告警 (2)事中攔截 | (1)事后告警:通過解析操作審計事件DownloadSqlResult/DownloadSheet來識別風險。 (2)事中攔截:構建“數據下載”擴展程序,并結合對應開放事件及OPEN API"ListProjectMembers"獲取節點信息來識別風險、攔截&審批操作。 | |
管控臺 | 刪除項目空間 | 不允許直接刪除DataWorks項目空間。 | (1)事后告警 (2)事中攔截 | (1)事后告警:通過解析操作審計事件DeleteProject,并結合OPEN API"GetProject"獲取工作空間詳情信息來識別風險。 (2)事中攔截:構建“刪除項目空間”擴展程序,并結合對應開放事件及OPEN API"GetProject"獲取工作空間詳情信息來識別風險、攔截操作。 |
數據源 | 刪除數據源 | 不允許直接刪除與核心調度任務關聯的數據源。 | (1)事后告警 (2)事中攔截 | (1)事后告警:通過開啟并消費“刪除數據源”開放事件來識別風險。 (2)事中攔截:構建“刪除數據源”擴展程序,并結合對應開放事件來識別風險、攔截操作。 |
數據地圖 | 修改MC字段Label值 | 不允許直接修改MaxCompute表字段的Label值。 | (1)事后告警 | (1)事后告警:通過解析操作審計事件updateTableColumnSecurityLabels來識別風險。 |
數據服務 | 發布數據服務API | 不允許直接發布回參包含敏感字段為N級的數服務API。 | (1)事后告警 | (1)事后告警:通過解析操作審計事件PublishApi來識別風險。 |