惡意文件檢測SDK功能依托云安全中心多引擎檢測平臺,可識別離線文件和阿里云OSS文件中存在的常見病毒,例如勒索病毒、挖礦程序等,防止惡意文件傳播和執行。本文介紹如何使用惡意文件檢測SDK功能。
功能說明
惡意文件檢測SDK為云端檢測方案,使用該功能會將您的文件上傳到云端進行檢測。
檢測的病毒類型
支持檢測的病毒類型(virus_type)中英文映射表
病毒類型(virus_type) | 病毒名稱 |
Backdoor | 反彈Shell后門 |
DDoS | DDoS木馬 |
Downloader | 下載器木馬 |
Engtest | 引擎測試程序 |
Hacktool | 黑客工具 |
Trojan | 高危程序 |
Malbaseware | 被污染的基礎軟件 |
MalScript | 惡意腳本 |
Malware | 惡意程序 |
Miner | 挖礦程序 |
Proxytool | 代理工具 |
RansomWare | 勒索病毒 |
RiskWare | 風險軟件 |
Rootkit | Rootkit |
Stealer | 竊密工具 |
Scanner | 掃描器 |
Suspicious | 可疑程序 |
Virus | 感染型病毒 |
WebShell | 網站后門 |
Worm | 蠕蟲病毒 |
AdWare | 廣告程序 |
Patcher | 破解程序 |
Gametool | 私服工具 |
檢測文件說明
支持對未加密的壓縮包解壓并進行檢測。
在調用SDK檢測離線文件場景下,默認不解壓,需要您自行配置解壓,掃描時可以設置是否識別壓縮文件并解壓、最大解壓層級和最大解壓文件數。
在OSS文件檢測場景下,默認不解壓,需要您自行配置解壓,支持全局和單個Bucket粒度下配置壓縮包的解壓層級。
支持對已配置服務端加密的OSS數據進行解密后檢測。OSS針對不同使用場景提供了以下服務器端加密方式,詳細內容,請參見服務器端加密。
使用KMS托管密鑰進行加解密(SSE-KMS):使用KMS托管的默認CMK(Customer Master Key)或指定CMK進行加解密操作。數據無需通過網絡發送到KMS服務端進行加解密。
使用OSS完全托管密鑰進行加解密(SSE-OSS):使用OSS完全托管的密鑰加密每個Object。
檢測惡意文件的方式
在業務服務器中調用SDK檢測離線文件
在您的業務程序中接入云安全中心的惡意文件檢測SDK,通過調用SDK的方式檢測惡意文件,在返回結果中獲取惡意文件信息,且支持在云安全中心控制臺查看存在風險的文件檢測結果。
支持通過Java或Python方式接入。
在云安全中心控制臺檢測OSS中存儲的文件
如果您待檢測的文件存儲在阿里云對象存儲OSS Bucket中,可以直接在云安全中心控制臺執行對目標OSS Bucket內文件的檢測,并支持查看存在風險的文件列表。
提供檢測結果
云安全中心通過整合多個知名病毒檢測引擎的結果,并結合安全專家的經驗,基于文件潛在惡意程度和檢測準確程度兩個關鍵維度,評定檢測出惡意文件事件的風險等級(高危、中危、低危),并給出相應惡意文件說明和處置建議。
日志分析
如果已開通云安全中心日志分析服務,會將惡意文件檢測記錄投遞到云安全中心的專屬Logstore(日志庫)中。詳細說明,請參見日志分析說明和惡意文件檢測字段說明。
應用場景
應用場景 | 惡意文件檢測說明 |
服務器應用場景 | 在服務器環境中,會常遇到各種大規模傳播的惡意文件,例如蠕蟲病毒、挖礦軟件、DDoS木馬以及惡意腳本等。 這些惡意文件通常具備自我復制和廣泛傳播的特性,旨在消耗系統資源、發起分布式拒絕服務攻擊或控制服務器以實施非法活動。 |
服務器定向攻擊場景 | 在服務器定向攻擊場景中,為了確保系統的安全和穩定運行,需要特別關注黑客工具、代理工具及后門程序等惡意文件。 這類攻擊突出隱蔽性和針對性,黑客通過植入此類惡意文件來竊取敏感信息、操控系統或作為跳板進一步滲透網絡。 |
全系統環境檢測 | 在各種環境中,都應密切關注破壞力強的勒索病毒和感染型病毒。 勒索病毒通過加密用戶數據勒索贖金,感染型病毒能自我復制并擴散至其他文件,兩者均能造成嚴重的數據損失和系統癱瘓。 |
辦公網與文件存儲環境檢測 | 在辦公網絡和文件存儲環境中,特別需要注意防范惡意文檔類文件(例如含有宏病毒的Office文件、帶有惡意負載的壓縮包等)的潛在威脅。 這些文件通常通過偽裝成日常工作交流中的正常文檔,誘導用戶打開從而實施攻擊。例如竊取登錄憑據、部署遠程訪問木馬等。 |
使用限制
在OSS文件檢測場景下,一次惡意文件檢測僅可以檢測一個不超過500 MB的文件。
在調用SDK檢測場景下,一次惡意文件檢測僅可以檢測一個不超過100 MB的文件。
試用版和企業版的默認接口請求頻率不同。
試用版:10次/秒。
企業版:20次/秒。
支持檢測的壓縮包文件類型有
.7z
、.zip
、.tar
、.gz
、.rar
、.ar
、.bz2
、.xz
和.lzma
。一個壓縮包支持解壓的層級最多為5層,解壓后的文件總個數最多為1,000個,解壓后的所有文件總大小最多為1 GB。對于超出限制范圍的文件,不會被檢測。
惡意文件的檢測速度會受到網速、計算機性能、云產品限制等方面的影響。惡意文件檢測SDK內部采用隊列的方式,緩解外部峰值時的請求來提高并發率。當內部的隊列滿時,拒絕外部繼續提交請求,并使用等待接口,等待隊列有空間可用時再處理外部請求。
您可以通過增加隊列長度來提高并發量,該方法會影響部分樣本的檢測時長。
timeout_ms
參數為樣本超時時間,單位為毫秒。為了減少超時,建議您將timeout_ms
參數設置為60,000毫秒(即60秒)。在OSS文件檢測場景下,僅支持存儲類型為標準存儲和低頻訪問的文件檢測,不支持歸檔存儲類型的文件檢測。存儲類型詳細內容,請參見存儲類型概述。
在OSS文件檢測場景下,支持檢測的OSS Bucket的所屬地域包括:華北1(青島)、華北2(北京)、華北3(張家口)、華北5(呼和浩特)、華東1(杭州)、華東2(上海)、華南1(深圳)、華南2(河源)、華南3(廣州)、西南1(成都)、中國香港、新加坡、印度尼西亞(雅加達)、泰國(曼谷)、菲律賓(馬尼拉)、馬來西亞(吉隆坡)、韓國(首爾)、日本(東京)、美國(硅谷)、英國(倫敦)、美國(弗吉尼亞)、德國(法蘭克福)。
計費說明
使用惡意文件檢測SDK功能會消耗文件檢測次數。如果是壓縮包文件,每個壓縮包文件消耗的文件檢測次數按照解壓后文件個數計算。
經過企業實名認證的阿里云賬號可以免費試用惡意文件檢測SDK功能。試用版提供10,000次惡意文件檢測次數。
購買企業版可以選擇您所需的惡意文件檢測次數。
計費項詳細說明,請參見計費概述。
開通服務并檢測惡意文件
前提條件
如果您使用的是RAM用戶,請確保已為RAM用戶授予AliyunYundunSASFullAccess權限。具體操作,請參見為RAM用戶授權。
步驟一:開通服務
云安全中心支持通過免費試用和付費購買的方式開通服務。
免費試用
如果您的阿里云賬號已通過企業認證,您可以通過免費試用開通惡意文件檢測SDK服務,并獲取1萬次惡意文件檢測次數。每個阿里云賬號僅有一次免費試用機會。
登錄云安全中心控制臺。在控制臺左上角,選擇需防護資產所在的區域中國。
在左側導航欄,選擇 。
在惡意文件檢測SDK頁面,單擊立即試用。
付費購買
如果免費試用無法滿足需求,您可以通過付費購買的方式開通惡意文件檢測SDK服務。
如果存在未使用的免費試用次數,付費購買次數后,剩余的免費試用次數將累計在您付費購買的次數中。
登錄云安全中心控制臺。在控制臺左上角,選擇需防護資產所在的區域中國。
在左側導航欄,選擇 。
在惡意文件檢測SDK頁面,單擊立即購買。在購買面板,選擇惡意文件檢測SDK為是,并按照要檢測文件的數量購買足夠的惡意文件檢測次數。
如果您已購買云安全中心付費版本,您可以直接選擇所需的惡意文件檢測次數。如未購買,請根據您的需求選擇云安全中心版本:
無需使用云安全中心的其他安全防護功能時,版本選擇僅采購增值服務。
如需使用云安全中心的其他功能,例如漏洞修復、容器威脅檢測,請選擇相應的云安全中心版本。各版本的功能差異詳情,請參見功能特性。
仔細閱讀并選中服務協議,單擊立即購買并完成支付。
開通惡意文件檢測SDK服務后,您可以在惡意文件檢測SDK頁面,查看您的惡意文件檢測SDK的剩余檢測次數。如果剩余檢測次數不足以支撐后續業務需求,您可以單擊升級配置,購買更多的資源。更多說明,請參見升級與降配。
步驟二:檢測惡意文件
根據您的業務場景,選擇以下方式檢測目標環境內的惡意文件。
執行惡意文件檢測前,請確保當前阿里云賬號下有足夠的剩余檢測次數。如果剩余檢測次數不足,您可以在惡意文件檢測SDK頁面的風險文件總覽頁簽下,單擊升級配置購買足夠的檢測次數。
在業務服務器中調用SDK檢測離線文件
準備工作
已配置環境變量
ALIBABA_CLOUD_ACCESS_KEY_ID
和ALIBABA_CLOUD_ACCESS_KEY_SECRET
。阿里云SDK支持通過定義
ALIBABA_CLOUD_ACCESS_KEY_ID
和ALIBABA_CLOUD_ACCESS_KEY_SECRET
環境變量來創建默認的訪問憑證。調用接口時,程序直接訪問憑證,讀取您的訪問密鑰(即AccessKey)并自動完成鑒權。更多信息,請參見配置環境變量。您需要選擇接入方式并參考下表的說明獲取SDK包。
接入方式
版本要求
獲取SDK包
Java接入
必須使用1.8或更高版本的JDK。
您可以通過以下方式獲取Java SDK。
離線導入安裝:在聯網環境下訪問Java SDK代碼庫并下載Java SDK,將下載的Java SDK添加到項目工程中。
Python接入
Python 3.6及以上版本。
您可以通過以下方式獲取部署SDK包。
使用pip快速安裝(聯網環境下推薦):
pip install -U alibabacloud_filedetect
離線安裝(無互聯網連接的環境):在聯網環境下訪問Python代碼庫并下載Python SDK。將Python SDK上傳至項目工程環境,解壓壓縮包后,運行以下安裝命令:
# 切換至python SDK根目錄 cd alibabacloud-file-detect-python-sdk-master # 安裝SDK,注意python的版本 python setup.py install
示例代碼
使用Python接入時,請替換下面示例中的測試用例路徑參數,即path
。
package com.aliyun.filedetect.sample;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import com.aliyun.filedetect.*;
public class Sample {
/**
* 同步檢測文件接口
* @param detector 檢測器對象
* @param path 待檢測的文件路徑
* @param timeout_ms 設置超時時間,單位為毫秒
* @param wait_if_queuefull 如果檢測隊列滿了,false表示不等待直接返回錯誤,true表示一直等待直到隊列不滿時
* @throws InterruptedException
*/
public static DetectResult detectFileSync(OpenAPIDetector detector, String path, int timeout_ms, boolean wait_if_queuefull) throws InterruptedException {
if (null == detector || null == path) return null;
DetectResult result = null;
while(true) {
result = detector.detectSync(path, timeout_ms);
if (null == result) break;
if (result.error_code != ERR_CODE.ERR_DETECT_QUEUE_FULL) break;
if (!wait_if_queuefull) break;
detector.waitQueueAvailable(-1);
}
return result;
}
/**
* 異步檢測文件接口
* @param detector 檢測器對象
* @param path 待檢測的文件路徑
* @param timeout_ms 設置超時時間,單位為毫秒
* @param wait_if_queuefull 如果檢測隊列滿了,false表示不等待直接返回錯誤,true表示一直等待直到隊列不滿時
* @param callback 結果回調函數
* @throws InterruptedException
*/
public static int detectFile(OpenAPIDetector detector, String path, int timeout_ms, boolean wait_if_queuefull, IDetectResultCallback callback) throws InterruptedException {
if (null == detector || null == path || null == callback) return ERR_CODE.ERR_INIT.value();
int result = ERR_CODE.ERR_INIT.value();
if (wait_if_queuefull) {
final IDetectResultCallback real_callback = callback;
callback = new IDetectResultCallback() {
public void onScanResult(int seq, String file_path, DetectResult callback_res) {
if (callback_res.error_code == ERR_CODE.ERR_DETECT_QUEUE_FULL) return;
real_callback.onScanResult(seq, file_path, callback_res);
}
};
}
while(true) {
result = detector.detect(path, timeout_ms, callback);
if (result != ERR_CODE.ERR_DETECT_QUEUE_FULL.value()) break;
if (!wait_if_queuefull) break;
detector.waitQueueAvailable(-1);
}
return result;
}
/**
* 同步檢測URL文件接口
* @param detector 檢測器對象
* @param url 待檢測的文件URL
* @param md5 待檢測的文件md5
* @param timeout_ms 設置超時時間,單位為毫秒
* @param wait_if_queuefull 如果檢測隊列滿了,false表示不等待直接返回錯誤,true表示一直等待直到隊列不滿時
* @throws InterruptedException
*/
public static DetectResult detectUrlSync(OpenAPIDetector detector, String url, String md5, int timeout_ms, boolean wait_if_queuefull) throws InterruptedException {
if (null == detector || null == url || null == md5) return null;
DetectResult result = null;
while(true) {
result = detector.detectUrlSync(url, md5, timeout_ms);
if (null == result) break;
if (result.error_code != ERR_CODE.ERR_DETECT_QUEUE_FULL) break;
if (!wait_if_queuefull) break;
detector.waitQueueAvailable(-1);
}
return result;
}
/**
* 異步檢測URL文件接口
* @param detector 檢測器對象
* @param url 待檢測的文件URL
* @param md5 待檢測的文件md5
* @param timeout_ms 設置超時時間,單位為毫秒
* @param wait_if_queuefull 如果檢測隊列滿了,false表示不等待直接返回錯誤,true表示一直等待直到隊列不滿時
* @param callback 結果回調函數
* @throws InterruptedException
*/
public static int detectUrl(OpenAPIDetector detector, String url, String md5, int timeout_ms, boolean wait_if_queuefull, IDetectResultCallback callback) throws InterruptedException {
if (null == detector || null == url || null == md5 || null == callback) return ERR_CODE.ERR_INIT.value();
int result = ERR_CODE.ERR_INIT.value();
if (wait_if_queuefull) {
final IDetectResultCallback real_callback = callback;
callback = new IDetectResultCallback() {
public void onScanResult(int seq, String file_path, DetectResult callback_res) {
if (callback_res.error_code == ERR_CODE.ERR_DETECT_QUEUE_FULL) return;
real_callback.onScanResult(seq, file_path, callback_res);
}
};
}
while(true) {
result = detector.detectUrl(url, md5, timeout_ms, callback);
if (result != ERR_CODE.ERR_DETECT_QUEUE_FULL.value()) break;
if (!wait_if_queuefull) break;
detector.waitQueueAvailable(-1);
}
return result;
}
/**
* 格式化檢測結果
* @param result 檢測結果對象
* @return 格式化后的字符串
*/
public static String formatDetectResult(DetectResult result) {
if (result.isSucc()) {
DetectResult.DetectResultInfo info = result.getDetectResultInfo();
String msg = String.format("[DETECT RESULT] [SUCCEED] %s", formatDetectResultInfo(info));
if (info.compresslist != null) {
int idx = 1;
for (DetectResult.CompressFileDetectResultInfo comp_res : info.compresslist) {
msg += String.format("\n\t\t\t [COMPRESS FILE] [IDX:%d] %s", idx++, formatCompressFileDetectResultInfo(comp_res));
}
}
return msg;
}
DetectResult.ErrorInfo info = result.getErrorInfo();
return String.format("[DETECT RESULT] [FAIL] md5: %s, time: %d, error_code: %s, error_message: %s"
, info.md5, info.time, info.error_code.name(), info.error_string);
}
private static String formatDetectResultInfo(DetectResult.DetectResultInfo info) {
String msg = String.format("MD5: %s, TIME: %d, RESULT: %s, SCORE: %d", info.md5, info.time, info.result.name(), info.score);
if (info.compresslist != null) {
msg += String.format(", COMPRESS_FILES: %d", info.compresslist.size());
}
DetectResult.VirusInfo vinfo = info.getVirusInfo();
if (vinfo != null) {
msg += String.format(", VIRUS_TYPE: %s, EXT_INFO: %s", vinfo.virus_type, vinfo.ext_info);
}
return msg;
}
private static String formatCompressFileDetectResultInfo(DetectResult.CompressFileDetectResultInfo info) {
String msg = String.format("PATH: %s, \t\t RESULT: %s, SCORE: %d", info.path, info.result.name(), info.score);
DetectResult.VirusInfo vinfo = info.getVirusInfo();
if (vinfo != null) {
msg += String.format(", VIRUS_TYPE: %s, EXT_INFO: %s", vinfo.virus_type, vinfo.ext_info);
}
return msg;
}
/**
* 同步檢測目錄或文件
* @param path 指定路徑,可以是文件或者目錄。目錄的話就會遞歸遍歷
* @param is_sync 是否使用同步接口,推薦使用異步。 true是同步, false是異步
* @throws InterruptedException
*/
public static void detectDirOrFileSync(OpenAPIDetector detector, String path, int timeout_ms, Map<String, DetectResult> result_map) throws InterruptedException {
File file = new File(path);
String abs_path = file.getAbsolutePath();
if (file.isDirectory()) {
String[] ss = file.list();
if (ss == null) return;
for (String s : ss) {
String subpath = abs_path + File.separator + s;
detectDirOrFileSync(detector, subpath, timeout_ms, result_map);
}
return;
}
System.out.println(String.format("[detectFileSync] [BEGIN] queueSize: %d, path: %s, timeout: %d", detector.getQueueSize(), abs_path, timeout_ms));
DetectResult res = detectFileSync(detector, abs_path, timeout_ms, true);
System.err.println(String.format(" [ END ] %s", formatDetectResult(res)));
result_map.put(abs_path, res);
}
/**
* 異步檢測目錄或文件
* @param path 指定路徑,可以是文件或者目錄。目錄的話就會遞歸遍歷
* @param is_sync 是否使用同步接口,推薦使用異步。 true是同步, false是異步
* @throws InterruptedException
*/
public static void detectDirOrFile(OpenAPIDetector detector, String path, int timeout_ms, IDetectResultCallback callback) throws InterruptedException {
File file = new File(path);
String abs_path = file.getAbsolutePath();
if (file.isDirectory()) {
String[] ss = file.list();
if (ss == null) return;
for (String s : ss) {
String subpath = abs_path + File.separator + s;
detectDirOrFile(detector, subpath, timeout_ms, callback);
}
return;
}
int seq = detectFile(detector, abs_path, timeout_ms, true, callback);
System.out.println(String.format("[detectFile] [BEGIN] seq: %d, queueSize: %d, path: %s, timeout: %d", seq, detector.getQueueSize(), abs_path, timeout_ms));
}
/**
* 開始對文件或目錄進行
* @param path 指定路徑,可以是文件或者目錄。目錄的話就會遞歸遍歷
* @param is_sync 是否使用同步接口,推薦使用異步。 true是同步, false是異步
* @throws InterruptedException
*/
public static void scan(final OpenAPIDetector detector, String path, int detect_timeout_ms, boolean is_sync) throws InterruptedException {
System.out.println(String.format("[SCAN] [START] path: %s, detect_timeout_ms: %d, is_sync: %b", path, detect_timeout_ms, is_sync));
long start_time = System.currentTimeMillis();
final Map<String, DetectResult> result_map = new HashMap<>();
if (is_sync) {
detectDirOrFileSync(detector, path, detect_timeout_ms, result_map);
} else {
detectDirOrFile(detector, path, detect_timeout_ms, new IDetectResultCallback() {
public void onScanResult(int seq, String file_path, DetectResult callback_res) {
System.err.println(String.format("[detectFile] [ END ] seq: %d, queueSize: %d, %s", seq, detector.getQueueSize(), formatDetectResult(callback_res)));
result_map.put(file_path, callback_res);
}
});
// 等待任務執行完成
detector.waitQueueEmpty(-1);
}
long used_time = System.currentTimeMillis() - start_time;
System.out.println(String.format("[SCAN] [ END ] used_time: %d, files: %d", used_time, result_map.size()));
int fail_count = 0;
int white_count = 0;
int black_count = 0;
for (Map.Entry<String, DetectResult> entry : result_map.entrySet()) {
DetectResult res = entry.getValue();
if (res.isSucc()) {
if (res.getDetectResultInfo().result == DetectResult.RESULT.RES_BLACK) {
black_count ++;
} else {
white_count ++;
}
} else {
fail_count ++;
}
}
System.out.println(String.format(" fail_count: %d, white_count: %d, black_count: %d"
, fail_count, white_count, black_count));
}
public static void main(String[] args_) throws Exception {
// 獲取檢測器實例
OpenAPIDetector detector = OpenAPIDetector.getInstance();
// 初始化
ERR_CODE init_ret = detector.init(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
System.out.println("INIT RET: " + init_ret.name());
// 設置解壓縮參數(可選,默認不解壓壓縮包)
boolean decompress = true; // 是否識別壓縮文件并解壓,默認為false
int decompressMaxLayer = 5; // 最大解壓層數,decompress參數為true時生效
int decompressMaxFileCount = 1000; // 最大解壓文件數,decompress參數為true時生效
ERR_CODE initdec_ret = detector.initDecompress(decompress, decompressMaxLayer, decompressMaxFileCount);
System.out.println("INIT_DECOMPRESS RET: " + initdec_ret.name());
if (true) {
// 示例用法1:掃描本地目錄或文件
boolean is_sync_scan = false; // 是異步檢測還是同步檢測。異步檢測性能更好。false表示異步檢測
int timeout_ms = 500000; // 單個樣本檢測時間,單位為毫秒
String path = "test2.php"; // 待掃描的文件或目錄
// 啟動掃描,直到掃描結束
scan(detector, path, timeout_ms, is_sync_scan);
}
if (true) {
// 示例用法2:掃描URL文件
int timeout_ms = 500000; // 單個樣本檢測時間,單位為毫秒
String url = "https://xxxxxxxx.oss-cn-hangzhou-1.aliyuncs.com/xxxxx/xxxxxxxxxxxxxx?Expires=1*****25&OSSAccessKeyId=xxx"; // 待掃描的URL文件
String md5 = "a767f*************6e21d000000"; // 待掃描的文件MD5
// 同步掃描。如果需要異步掃描,調用detectUrl接口
System.out.println(String.format("[detectUrlSync] [BEGIN] URL: %s, MD5: %s, TIMEOUT: %d", url, md5, timeout_ms));
DetectResult result = detectUrlSync(detector, url, md5, timeout_ms, true);
System.err.println(String.format("[detectUrlSync] [ END ] %s", formatDetectResult(result)));
}
// 反初始化
System.out.println("Over.");
detector.uninit();
}
}
# -*- coding: utf-8 -*-
import os
import sys
from typing import List
import threading
import time
import traceback
from alibabacloud_filedetect.OpenAPIDetector import OpenAPIDetector
from alibabacloud_filedetect.IDetectResultCallback import IDetectResultCallback
from alibabacloud_filedetect.ERR_CODE import ERR_CODE
from alibabacloud_filedetect.DetectResult import DetectResult
class Sample(object):
def __init__(self):
pass
"""
同步檢測文件接口
@param detector 檢測器對象
@param path 待檢測的文件路徑
@param timeout_ms 設置超時時間,單位為毫秒
@param wait_if_queuefull 如果檢測隊列滿了,False表示不等待直接返回錯誤,True表示一直等待直到隊列不滿時
"""
def detectFileSync(self, detector, path, timeout_ms, wait_if_queuefull):
if detector is None or path is None:
return None
result = None
while True:
result = detector.detectSync(path, timeout_ms)
if result is None:
break
if result.error_code != ERR_CODE.ERR_DETECT_QUEUE_FULL:
break
if wait_if_queuefull is False:
break
detector.waitQueueAvailable(-1)
return result
"""
異步檢測文件接口
@param detector 檢測器對象
@param path 待檢測的文件路徑
@param timeout_ms 設置超時時間,單位為毫秒
@param wait_if_queuefull 如果檢測隊列滿了,False表示不等待直接返回錯誤,True表示一直等待直到隊列不滿時
@param callback 結果回調函數
"""
def detectFile(self, detector, path, timeout_ms, wait_if_queuefull, callback):
if detector is None or path is None or callback is None:
return ERR_CODE.ERR_INIT.value
result = ERR_CODE.ERR_INIT.value
if wait_if_queuefull:
real_callback = callback
class AsyncTaskCallback(IDetectResultCallback):
def onScanResult(self, seq, file_path, callback_res):
if callback_res.error_code == ERR_CODE.ERR_DETECT_QUEUE_FULL:
return
real_callback.onScanResult(seq, file_path, callback_res)
callback = AsyncTaskCallback()
while True:
result = detector.detect(path, timeout_ms, callback)
if result != ERR_CODE.ERR_DETECT_QUEUE_FULL.value:
break
if wait_if_queuefull is False:
break
detector.waitQueueAvailable(-1)
return result
"""
同步檢測URL文件接口
@param detector 檢測器對象
@param url 待檢測的文件URL
@param md5 待檢測的文件md5
@param timeout_ms 設置超時時間,單位為毫秒
@param wait_if_queuefull 如果檢測隊列滿了,false表示不等待直接返回錯誤,true表示一直等待直到隊列不滿時
"""
def detectUrlSync(self, detector, url, md5, timeout_ms, wait_if_queuefull):
if detector is None or url is None or md5 is None:
return None
result = None
while True:
result = detector.detectUrlSync(url, md5, timeout_ms)
if result is None:
break
if result.error_code != ERR_CODE.ERR_DETECT_QUEUE_FULL:
break
if wait_if_queuefull is False:
break
detector.waitQueueAvailable(-1)
return result
"""
異步檢測URL文件接口
@param detector 檢測器對象
@param url 待檢測的文件URL
@param md5 待檢測的文件md5
@param timeout_ms 設置超時時間,單位為毫秒
@param wait_if_queuefull 如果檢測隊列滿了,false表示不等待直接返回錯誤,true表示一直等待直到隊列不滿時
@param callback 結果回調函數
"""
def detectUrl(self, detector, url, md5, timeout_ms, wait_if_queuefull, callback):
if detector is None or url is None or md5 is None or callback is None:
return ERR_CODE.ERR_INIT.value
result = ERR_CODE.ERR_INIT.value
if wait_if_queuefull:
real_callback = callback
class AsyncTaskCallback(IDetectResultCallback):
def onScanResult(self, seq, file_path, callback_res):
if callback_res.error_code == ERR_CODE.ERR_DETECT_QUEUE_FULL:
return
real_callback.onScanResult(seq, file_path, callback_res)
callback = AsyncTaskCallback()
while True:
result = detector.detectUrl(url, md5, timeout_ms, callback)
if result != ERR_CODE.ERR_DETECT_QUEUE_FULL.value:
break
if wait_if_queuefull is False:
break
detector.waitQueueAvailable(-1)
return result
"""
格式化檢測結果
@param result 檢測結果對象
@return 格式化后的字符串
"""
@staticmethod
def formatDetectResult(result):
msg = ""
if result.isSucc():
info = result.getDetectResultInfo()
msg = "[DETECT RESULT] [SUCCEED] {}".format(Sample.formatDetectResultInfo(info))
if info.compresslist is not None:
idx = 1
for comp_res in info.compresslist:
msg += "\n\t\t\t [COMPRESS FILE] [IDX:{}] {}".format(idx, Sample.formatCompressFileDetectResultInfo(comp_res))
idx += 1
else:
info = result.getErrorInfo()
msg = "[DETECT RESULT] [FAIL] md5: {}, time: {}, error_code: {}, error_message: {}".format(info.md5,
info.time, info.error_code.name, info.error_string)
return msg
@staticmethod
def formatDetectResultInfo(info):
msg = "MD5: {}, TIME: {}, RESULT: {}, SCORE: {}".format(info.md5, info.time, info.result.name, info.score)
if info.compresslist is not None:
msg += ", COMPRESS_FILES: {}".format(len(info.compresslist))
vinfo = info.getVirusInfo()
if vinfo is not None:
msg += ", VIRUS_TYPE: {}, EXT_INFO: {}".format(vinfo.virus_type, vinfo.ext_info)
return msg
@staticmethod
def formatCompressFileDetectResultInfo(info):
msg = "PATH: {}, \t\t RESULT: {}, SCORE: {}".format(info.path, info.result.name, info.score)
vinfo = info.getVirusInfo()
if vinfo is not None:
msg += ", VIRUS_TYPE: {}, EXT_INFO: {}".format(vinfo.virus_type, vinfo.ext_info)
return msg
"""
同步檢測目錄或文件
@param path 指定路徑,可以是文件或者目錄。目錄的話就會遞歸遍歷
@param is_sync 是否使用同步接口,推薦使用異步。 True是同步,False是異步
"""
def detectDirOrFileSync(self, detector, path, timeout_ms, result_map):
abs_path = os.path.abspath(path)
if os.path.isdir(abs_path):
sub_files = os.listdir(abs_path)
if len(sub_files) == 0:
return
for sub_file in sub_files:
sub_path = os.path.join(abs_path, sub_file)
self.detectDirOrFileSync(detector, sub_path, timeout_ms, result_map)
return
print("[detectFileSync] [BEGIN] queueSize: {}, path: {}, timeout: {}".format(
detector.getQueueSize(), abs_path, timeout_ms))
res = self.detectFileSync(detector, abs_path, timeout_ms, True)
print(" [ END ] {}".format(Sample.formatDetectResult(res)))
result_map[abs_path] = res
return
"""
異步檢測目錄或文件
@param path 指定路徑,可以是文件或者目錄。目錄的話就會遞歸遍歷
@param is_sync 是否使用同步接口,推薦使用異步。True是同步, False是異步
"""
def detectDirOrFile(self, detector, path, timeout_ms, callback):
abs_path = os.path.abspath(path)
if os.path.isdir(abs_path):
sub_files = os.listdir(abs_path)
if len(sub_files) == 0:
return
for sub_file in sub_files:
sub_path = os.path.join(abs_path, sub_file)
self.detectDirOrFile(detector, sub_path, timeout_ms, callback)
return
seq = self.detectFile(detector, abs_path, timeout_ms, True, callback)
print("[detectFile] [BEGIN] seq: {}, queueSize: {}, path: {}, timeout: {}".format(
seq, detector.getQueueSize(), abs_path, timeout_ms))
return
"""
開始對文件或目錄進行檢測
@param path 指定路徑,可以是文件或者目錄。目錄的話就會遞歸遍歷
@param is_sync 是否使用同步接口,推薦使用異步。 True是同步,False是異步
"""
def scan(self, detector, path, detect_timeout_ms, is_sync):
try:
print("[SCAN] [START] path: {}, detect_timeout_ms: {}, is_sync: {}".format(path, detect_timeout_ms, is_sync))
start_time = time.time()
result_map = {}
if is_sync:
self.detectDirOrFileSync(detector, path, detect_timeout_ms, result_map)
else:
class AsyncTaskCallback(IDetectResultCallback):
def onScanResult(self, seq, file_path, callback_res):
print("[detectFile] [ END ] seq: {}, queueSize: {}, {}".format(seq,
detector.getQueueSize(), Sample.formatDetectResult(callback_res)))
result_map[file_path] = callback_res
self.detectDirOrFile(detector, path, detect_timeout_ms, AsyncTaskCallback())
# 等待任務執行完成
detector.waitQueueEmpty(-1)
used_time_ms = (time.time() - start_time) * 1000
print("[SCAN] [ END ] used_time: {}, files: {}".format(int(used_time_ms), len(result_map)))
failed_count = 0
white_count = 0
black_count = 0
for file_path, res in result_map.items():
if res.isSucc():
if res.getDetectResultInfo().result == DetectResult.RESULT.RES_BLACK:
black_count += 1
else:
white_count += 1
else:
failed_count += 1
print(" fail_count: {}, white_count: {}, black_count: {}".format(
failed_count, white_count, black_count))
except Exception as e:
print(traceback.format_exc(), file=sys.stderr)
def main(self):
# 獲取檢測器實例
detector = OpenAPIDetector.get_instance()
# 讀取環境變量中的Access Key ID和Access Key Secret
access_key_id = os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID')
access_key_secret = os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET')
# 初始化
init_ret = detector.init(access_key_id, access_key_secret)
print("INIT RET: {}".format(init_ret.name))
# 設置解壓縮參數(可選,默認不解壓壓縮包)
decompress = True # 是否識別壓縮文件并解壓,默認為false
decompressMaxLayer = 5 # 最大解壓層數,decompress參數為true時生效
decompressMaxFileCount = 1000 # 最大解壓文件數,decompress參數為true時生效
initdec_ret = detector.initDecompress(decompress, decompressMaxLayer, decompressMaxFileCount)
print("INIT_DECOMPRESS RET: {}".format(initdec_ret.name))
if True:
# 示例用法1:掃描本地目錄或文件
is_sync_scan = False # 是異步檢測還是同步檢測。異步檢測性能更好。False表示異步檢測
timeout_ms = 500000 # 單個樣本檢測時間,單位為毫秒
path = "test.bin" # 待掃描的文件或目錄
# 啟動掃描,直到掃描結束
self.scan(detector, path, timeout_ms, is_sync_scan)
if True:
# 示例用法2:掃描URL文件
timeout_ms = 500000
url = "https://xxxxxxxx.oss-cn-hangzhou-1.aliyuncs.com/xxxxx/xxxxxxxxxxxxxx?Expires=1671448125&OSSAccessKeyId=xxx" # 待掃描的URL文件
md5 = "a767ffc59d93125c7505b6e21d000000"
# 同步掃描。如果需要異步掃描,調用detectUrl接口
print("[detectUrlSync] [BEGIN] URL: {}, MD5: {}, TIMEOUT: {}".format(url, md5, timeout_ms))
result = self.detectUrlSync(detector, url, md5, timeout_ms, True)
print("[detectUrlSync] [ END ] {}".format(Sample.formatDetectResult(result)))
# 反初始化
print("Over.")
detector.uninit()
if __name__ == "__main__":
sample = Sample()
sample.main()
返回結果
調用SDK進行惡意文件檢測后,只能在程序的運行結果中查看檢測結果,檢測結果不會同步到云安全中心控制臺中。控制臺中僅支持查看剩余檢測次數。風險文件總覽頁面的統計數據,例如檢測文件總數等,為OSS文件檢測的結果。
struct DetectResult {
std::string md5; // 樣本md5
long time = 0; // 用時,單位為毫秒
ERR_CODE error_code; // 錯誤碼
std::string error_string; // 擴展錯誤信息
enum RESULT {
RES_WHITE = 0, //安全文件
RES_BLACK = 1, //可疑文件
RES_PENDING = 3 //檢測中
};
RESULT result; //檢測結果
int score; //檢測分值,取值范圍0~100
std::string virus_type; //病毒類型,如“WebShell/MalScript/Hacktool”
std::string ext_info; //擴展信息為JSON字符串
struct CompressFileDetectResultInfo {
std::string path; // 壓縮文件路徑
RESULT result; // 檢測結果
int score; // 分值,取值范圍0-100
std::string virus_type; //病毒類型,如“WebShell/MalScript/Hacktool”
std::string ext_info; //擴展信息為JSON字符串
};
std::list<struct CompressFileDetectResultInfo> compresslist = null; // 如果是壓縮包,并且開啟了壓縮包解壓參數,則此處會輸出壓縮包內文件檢測結果
};
錯誤碼說明如下:
enum ERR_CODE {
ERR_INIT = -100, // 需要初始化,或者重復初始化
ERR_FILE_NOT_FOUND = -99, // 文件未找到
ERR_DETECT_QUEUE_FULL = -98, // 檢測隊列滿
ERR_CALL_API = -97, // 調用API錯誤
ERR_TIMEOUT = -96, // 超時
ERR_UPLOAD = -95, //文件上傳失敗;用戶可重新發起檢測,再次嘗試
ERR_ABORT = -94, //程序退出,樣本未得到檢測
ERR_TIMEOUT_QUEUE = -93, //隊列超時,用戶發起檢測頻率過高或超時時間過短
ERR_MD5 = -92, // MD5格式不對
ERR_URL = -91, // URL格式不對
ERR_SUCC = 0 // 成功
};
檢測分數越高,文件可能存在的風險越高。檢測分數和危險等級對應表如下:
分數區間 | 危險等級 |
0~60 | 安全 |
61~70 | 風險(低危) |
71~80 | 可疑(中危) |
81~100 | 惡意(高危) |
在云安全中心控制臺檢測OSS中存儲的文件
登錄云安全中心控制臺。在控制臺左上角,選擇需防護資產所在的區域中國。
在左側導航欄,選擇 。
單擊OSS文件檢測頁簽,選擇合適的檢測方式并執行檢測。
如果您的Bucket沒有在OSS文件檢測的列表中,您可以單擊同步Bucket,同步最新的Bucket列表。
檢測方式
說明
操作步驟
手動全量檢測
檢測單個或多個Bucket內的所有文件。
在OSS文件檢測頁簽,單擊單個Bucke操作列的檢測或選中多個Bucket后單擊批量檢測。
在檢測對話框,指定需要檢測文件的類型、檢測文件范圍、掃描路徑等。
解壓層級:如果選擇檢測的文件類型包含壓縮類型,可以設置解壓層級(最多5層,支持不解壓)和單個壓縮包解壓文件數量限制(最多1000個)。
文件解密類型:默認不解密,如果OSS文件配置了服務端加密(SSE-KMS、SSE-OSS),可以設置解密后再檢測。解密類型OSS對應SSE-OSS加密的OSS文件,解密類型KMS對應SSE-KMS加密的OSS文件。
檢測文件范圍:可選。設置檢測范圍的時間上限。設置后會檢測文件更新時間晚于該時間的文件。
掃描路徑:支持選擇按前綴匹配(輸入文件名前綴來匹配掃描指定文件)或配置到整個Bucket(掃描整個Bucket文件)。
單擊確定。
手動增量檢測
針對已檢測且上次檢測后有文件更新的Bucket,提供只檢測未檢測過的文件的功能。
在OSS文件檢測頁簽,單擊目標Bucke操作列的增量檢測。
在增量檢測對話框中,指定需要檢測文件的類型、解壓層級(壓縮類型文件支持)、文件解密類型、檢測文件范圍、掃描路徑(支持按前綴匹配和配置到整個Bucket)。
單擊確定。
自動檢測
通過配置掃描策略可為指定Bucket開啟周期性自動檢測。配置前建議了解以下信息:
一個Bucket只能在一個策略中生效。
自動檢測僅會檢測OSS新增的文件,不會重復檢測同一個文件。
在OSS文件檢測頁簽,單擊策略管理區域的策略配置。
在策略管理面板,單擊新增策略。
如果已配置策略的檢測方案符合檢測要求,您可以單擊該策略操作列的編輯,將目標Bucket添加到該策略的生效范圍內。
在策略創建面板,配置策略名稱、策略啟用狀態(默認啟用)、掃描路徑(支持按前綴匹配和配置到整個Bucket)、檢測文件范圍、檢測周期、文件檢測時間、文件檢測類型、解壓層級(壓縮類型文件支持)、文件解密類型和生效Bucket等。
單擊確定。
可選:配置釘釘機器人通知
您可以在系統配置中添加釘釘機器人通知,通過釘釘群實時接收云安全中心檢測的惡意文件告警信息。具體操作,請參見配置釘釘機器人通知。
查看和導出檢測結果
風險文件總覽
在風險文件總覽頁簽:
查看風險文件統計信息
不同等級(高危、中危、低危)風險通過不同顏色顯示:高危(紅色)、中危(橙色)、低危(灰色)。
在風險文件列表中,可以根據檢測場景列來選擇查看調用SDK(API)和在控制臺(OSS)檢測的惡意文件信息,也可以單擊列表左上方的搜索項的下拉框,根據風險等級、文件名稱、威脅標簽、MD5或最新檢測時間來查看目標惡意文件詳情。
如果是壓縮包文件,可單擊文件前的展開圖標,展示該壓縮包下的風險文件列表,該壓縮包文件的風險等級為其下風險文件中的最高風險等級。
查看目標風險文件詳情
在風險文件列表中,單擊目標風險文件操作列的詳情,在該文件的詳情面板,查看檢測出的存在風險的文件詳情、事件說明(包含惡意文件說明和處置建議)等信息。
在壓縮包文件的風險詳情面板,還可查看該壓縮包中的風險文件數、解壓后檢測的文件總數,以及風險文件列表等信息。
導出所有風險文件列表
單擊圖標,等待文件導出完成后,在提示框中單擊下載。
OSS文件檢測
在OSS文件檢測頁簽:
從OSS Bucket維度查看風險文件統計信息
不同等級(高危、中危、低危)風險通過不同顏色顯示:高危(紅色)、中危(橙色)、低危(灰色)。
查看目標OSS Bucket的檢測詳情
在Bucket列表中,單擊目標Bucket操作列的詳情,查看Bucket的基本信息和風險文件詳情。
在壓縮包的文件檢測詳情頁面,還可以查看配置文件解壓情況、解壓后的文件總數、檢測的文件數量以及已掃描、未掃描的風險文件列表。
按Bucket維度導出所有風險文件列表
在OSS文件檢測頁簽,單擊圖標,等待文件導出完成后,在提示框中單擊下載。
處理風險文件的告警事件
對于檢測出的風險文件,需要您根據風險等級以及云安全中心提供的事件說明和處置建議,進行確認和手動修復。
高危風險事件:誤報的概率較低,同時惡意性較強,建議及時處理。
中危和低危風險事件:該等級風險文件多出現在業務場景中,需要您核實對業務的影響后處理。
如果您確認是業務的正常文件或是誤報,您可以忽略相關告警事件。如果您在業務程序中調用SDK檢測文件,對于正常或誤報的文件,您也可以在業務程序中添加忽略或加入白名單的處理邏輯。
相關API
您可以通過調用API接口方式使用惡意文件檢測功能。具體使用方法如下: