日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

惡意文件檢測SDK

惡意文件檢測SDK功能依托云安全中心多引擎檢測平臺,可識別離線文件和阿里云OSS文件中存在的常見病毒,例如勒索病毒、挖礦程序等,防止惡意文件傳播和執行。本文介紹如何使用惡意文件檢測SDK功能。

功能說明

惡意文件檢測SDK為云端檢測方案,使用該功能會將您的文件上傳到云端進行檢測。

檢測的病毒類型

支持檢測的病毒類型(virus_type)中英文映射表

病毒類型(virus_type

病毒名稱

Backdoor

反彈Shell后門

DDoS

DDoS木馬

Downloader

下載器木馬

Engtest

引擎測試程序

Hacktool

黑客工具

Trojan

高危程序

Malbaseware

被污染的基礎軟件

MalScript

惡意腳本

Malware

惡意程序

Miner

挖礦程序

Proxytool

代理工具

RansomWare

勒索病毒

RiskWare

風險軟件

Rootkit

Rootkit

Stealer

竊密工具

Scanner

掃描器

Suspicious

可疑程序

Virus

感染型病毒

WebShell

網站后門

Worm

蠕蟲病毒

AdWare

廣告程序

Patcher

破解程序

Gametool

私服工具

檢測文件說明

  • 支持對未加密的壓縮包解壓并進行檢測。

    • 在調用SDK檢測離線文件場景下,默認不解壓,需要您自行配置解壓,掃描時可以設置是否識別壓縮文件并解壓、最大解壓層級和最大解壓文件數。

    • 在OSS文件檢測場景下,默認不解壓,需要您自行配置解壓,支持全局和單個Bucket粒度下配置壓縮包的解壓層級。

  • 支持對已配置服務端加密的OSS數據進行解密后檢測。OSS針對不同使用場景提供了以下服務器端加密方式,詳細內容,請參見服務器端加密

    • 使用KMS托管密鑰進行加解密(SSE-KMS):使用KMS托管的默認CMK(Customer Master Key)或指定CMK進行加解密操作。數據無需通過網絡發送到KMS服務端進行加解密。

    • 使用OSS完全托管密鑰進行加解密(SSE-OSS):使用OSS完全托管的密鑰加密每個Object。

檢測惡意文件的方式

  • 在業務服務器中調用SDK檢測離線文件

    在您的業務程序中接入云安全中心的惡意文件檢測SDK,通過調用SDK的方式檢測惡意文件,在返回結果中獲取惡意文件信息,且支持在云安全中心控制臺查看存在風險的文件檢測結果。

    支持通過Java或Python方式接入。

  • 在云安全中心控制臺檢測OSS中存儲的文件

    如果您待檢測的文件存儲在阿里云對象存儲OSS Bucket中,可以直接在云安全中心控制臺執行對目標OSS Bucket內文件的檢測,并支持查看存在風險的文件列表。

提供檢測結果

云安全中心通過整合多個知名病毒檢測引擎的結果,并結合安全專家的經驗,基于文件潛在惡意程度和檢測準確程度兩個關鍵維度,評定檢測出惡意文件事件的風險等級(高危中危低危),并給出相應惡意文件說明和處置建議。

日志分析

如果已開通云安全中心日志分析服務,會將惡意文件檢測記錄投遞到云安全中心的專屬Logstore(日志庫)中。詳細說明,請參見日志分析說明惡意文件檢測字段說明

應用場景

應用場景

惡意文件檢測說明

服務器應用場景

在服務器環境中,會常遇到各種大規模傳播的惡意文件,例如蠕蟲病毒、挖礦軟件、DDoS木馬以及惡意腳本等。

這些惡意文件通常具備自我復制和廣泛傳播的特性,旨在消耗系統資源、發起分布式拒絕服務攻擊或控制服務器以實施非法活動。

服務器定向攻擊場景

在服務器定向攻擊場景中,為了確保系統的安全和穩定運行,需要特別關注黑客工具、代理工具及后門程序等惡意文件。

這類攻擊突出隱蔽性和針對性,黑客通過植入此類惡意文件來竊取敏感信息、操控系統或作為跳板進一步滲透網絡。

全系統環境檢測

在各種環境中,都應密切關注破壞力強的勒索病毒和感染型病毒。

勒索病毒通過加密用戶數據勒索贖金,感染型病毒能自我復制并擴散至其他文件,兩者均能造成嚴重的數據損失和系統癱瘓。

辦公網與文件存儲環境檢測

在辦公網絡和文件存儲環境中,特別需要注意防范惡意文檔類文件(例如含有宏病毒的Office文件、帶有惡意負載的壓縮包等)的潛在威脅。

這些文件通常通過偽裝成日常工作交流中的正常文檔,誘導用戶打開從而實施攻擊。例如竊取登錄憑據、部署遠程訪問木馬等。

使用限制

  • 在OSS文件檢測場景下,一次惡意文件檢測僅可以檢測一個不超過500 MB的文件。

  • 在調用SDK檢測場景下,一次惡意文件檢測僅可以檢測一個不超過100 MB的文件。

  • 試用版和企業版的默認接口請求頻率不同。

    • 試用版:10次/秒。

    • 企業版:20次/秒。

  • 支持檢測的壓縮包文件類型有.7z.zip.tar.gz.rar.ar.bz2.xz.lzma

  • 一個壓縮包支持解壓的層級最多為5層,解壓后的文件總個數最多為1,000個,解壓后的所有文件總大小最多為1 GB。對于超出限制范圍的文件,不會被檢測。

  • 惡意文件的檢測速度會受到網速、計算機性能、云產品限制等方面的影響。惡意文件檢測SDK內部采用隊列的方式,緩解外部峰值時的請求來提高并發率。當內部的隊列滿時,拒絕外部繼續提交請求,并使用等待接口,等待隊列有空間可用時再處理外部請求。

    您可以通過增加隊列長度來提高并發量,該方法會影響部分樣本的檢測時長。timeout_ms參數為樣本超時時間,單位為毫秒。為了減少超時,建議您將timeout_ms參數設置為60,000毫秒(即60秒)。

  • 在OSS文件檢測場景下,僅支持存儲類型為標準存儲低頻訪問的文件檢測,不支持歸檔存儲類型的文件檢測。存儲類型詳細內容,請參見存儲類型概述

  • 在OSS文件檢測場景下,支持檢測的OSS Bucket的所屬地域包括:華北1(青島)、華北2(北京)、華北3(張家口)、華北5(呼和浩特)、華東1(杭州)、華東2(上海)、華南1(深圳)、華南2(河源)、華南3(廣州)、西南1(成都)、中國香港、新加坡、印度尼西亞(雅加達)、泰國(曼谷)、菲律賓(馬尼拉)、馬來西亞(吉隆坡)、韓國(首爾)、日本(東京)、美國(硅谷)、英國(倫敦)、美國(弗吉尼亞)、德國(法蘭克福)。

計費說明

使用惡意文件檢測SDK功能會消耗文件檢測次數。如果是壓縮包文件,每個壓縮包文件消耗的文件檢測次數按照解壓后文件個數計算。

  • 經過企業實名認證的阿里云賬號可以免費試用惡意文件檢測SDK功能。試用版提供10,000次惡意文件檢測次數。

  • 購買企業版可以選擇您所需的惡意文件檢測次數。

計費項詳細說明,請參見計費概述

開通服務并檢測惡意文件

前提條件

如果您使用的是RAM用戶,請確保已為RAM用戶授予AliyunYundunSASFullAccess權限。具體操作,請參見為RAM用戶授權

步驟一:開通服務

云安全中心支持通過免費試用和付費購買的方式開通服務。

免費試用

如果您的阿里云賬號已通過企業認證,您可以通過免費試用開通惡意文件檢測SDK服務,并獲取1萬次惡意文件檢測次數。每個阿里云賬號僅有一次免費試用機會。

  1. 登錄云安全中心控制臺。在控制臺左上角,選擇需防護資產所在的區域中國

  2. 在左側導航欄,選擇風險治理 > 惡意文件檢測SDK

  3. 惡意文件檢測SDK頁面,單擊立即試用

付費購買

如果免費試用無法滿足需求,您可以通過付費購買的方式開通惡意文件檢測SDK服務。

重要

如果存在未使用的免費試用次數,付費購買次數后,剩余的免費試用次數將累計在您付費購買的次數中。

  1. 登錄云安全中心控制臺。在控制臺左上角,選擇需防護資產所在的區域中國

  2. 在左側導航欄,選擇風險治理 > 惡意文件檢測SDK

  3. 惡意文件檢測SDK頁面,單擊立即購買。在購買面板,選擇惡意文件檢測SDK,并按照要檢測文件的數量購買足夠的惡意文件檢測次數

    如果您已購買云安全中心付費版本,您可以直接選擇所需的惡意文件檢測次數。如未購買,請根據您的需求選擇云安全中心版本:

    • 無需使用云安全中心的其他安全防護功能時,版本選擇僅采購增值服務

    • 如需使用云安全中心的其他功能,例如漏洞修復、容器威脅檢測,請選擇相應的云安全中心版本。各版本的功能差異詳情,請參見功能特性

  4. 仔細閱讀并選中服務協議,單擊立即購買并完成支付。

開通惡意文件檢測SDK服務后,您可以在惡意文件檢測SDK頁面,查看您的惡意文件檢測SDK的剩余檢測次數。如果剩余檢測次數不足以支撐后續業務需求,您可以單擊升級配置,購買更多的資源。更多說明,請參見升級與降配

image.png

步驟二:檢測惡意文件

根據您的業務場景,選擇以下方式檢測目標環境內的惡意文件。

重要

執行惡意文件檢測前,請確保當前阿里云賬號下有足夠的剩余檢測次數。如果剩余檢測次數不足,您可以在惡意文件檢測SDK頁面的風險文件總覽頁簽下,單擊升級配置購買足夠的檢測次數。

在業務服務器中調用SDK檢測離線文件

準備工作

  • 已配置環境變量ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET

    阿里云SDK支持通過定義ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET環境變量來創建默認的訪問憑證。調用接口時,程序直接訪問憑證,讀取您的訪問密鑰(即AccessKey)并自動完成鑒權。更多信息,請參見配置環境變量

  • 您需要選擇接入方式并參考下表的說明獲取SDK包。

    接入方式

    版本要求

    獲取SDK包

    Java接入

    必須使用1.8或更高版本的JDK。

    您可以通過以下方式獲取Java SDK。

    離線導入安裝:在聯網環境下訪問Java SDK代碼庫并下載Java SDK,將下載的Java SDK添加到項目工程中。

    Python接入

    Python 3.6及以上版本。

    您可以通過以下方式獲取部署SDK包。

    • 使用pip快速安裝(聯網環境下推薦):

      pip install -U alibabacloud_filedetect
    • 離線安裝(無互聯網連接的環境):在聯網環境下訪問Python代碼庫并下載Python SDK。將Python SDK上傳至項目工程環境,解壓壓縮包后,運行以下安裝命令:

      # 切換至python SDK根目錄
      cd alibabacloud-file-detect-python-sdk-master
      # 安裝SDK,注意python的版本
      python setup.py install

示例代碼

重要

使用Python接入時,請替換下面示例中的測試用例路徑參數,即path

package com.aliyun.filedetect.sample;

import java.io.File;
import java.util.HashMap;
import java.util.Map;

import com.aliyun.filedetect.*;

public class Sample {

	/**
	 * 同步檢測文件接口
	 * @param detector 檢測器對象
	 * @param path 待檢測的文件路徑
	 * @param timeout_ms 設置超時時間,單位為毫秒
	 * @param wait_if_queuefull 如果檢測隊列滿了,false表示不等待直接返回錯誤,true表示一直等待直到隊列不滿時
	 * @throws InterruptedException 
	 */
	public static DetectResult detectFileSync(OpenAPIDetector detector, String path, int timeout_ms, boolean wait_if_queuefull) throws InterruptedException {
		if (null == detector || null == path) return null;
		DetectResult result = null;
		while(true) {
			result = detector.detectSync(path, timeout_ms);
			if (null == result) break;
			if (result.error_code != ERR_CODE.ERR_DETECT_QUEUE_FULL) break;
			if (!wait_if_queuefull) break;
			detector.waitQueueAvailable(-1);
		}
		return result;
	}
	
	/**
	 * 異步檢測文件接口
	 * @param detector 檢測器對象
	 * @param path 待檢測的文件路徑
	 * @param timeout_ms 設置超時時間,單位為毫秒
	 * @param wait_if_queuefull 如果檢測隊列滿了,false表示不等待直接返回錯誤,true表示一直等待直到隊列不滿時
	 * @param callback 結果回調函數
	 * @throws InterruptedException 
	 */
	public static int detectFile(OpenAPIDetector detector, String path, int timeout_ms, boolean wait_if_queuefull, IDetectResultCallback callback) throws InterruptedException {
		if (null == detector || null == path || null == callback) return ERR_CODE.ERR_INIT.value();
		int result = ERR_CODE.ERR_INIT.value();
		if (wait_if_queuefull) {
			final IDetectResultCallback real_callback = callback;
			callback = new IDetectResultCallback() {
				public void onScanResult(int seq, String file_path, DetectResult callback_res) {
					if (callback_res.error_code == ERR_CODE.ERR_DETECT_QUEUE_FULL) return;
					real_callback.onScanResult(seq, file_path, callback_res);
				}
			};
		}
		while(true) {
			result = detector.detect(path, timeout_ms, callback);
			if (result != ERR_CODE.ERR_DETECT_QUEUE_FULL.value()) break;
			if (!wait_if_queuefull) break;
			detector.waitQueueAvailable(-1);
		}
		return result;
	}
	
	/**
	 * 同步檢測URL文件接口
	 * @param detector 檢測器對象
	 * @param url 待檢測的文件URL
	 * @param md5 待檢測的文件md5
	 * @param timeout_ms 設置超時時間,單位為毫秒
	 * @param wait_if_queuefull 如果檢測隊列滿了,false表示不等待直接返回錯誤,true表示一直等待直到隊列不滿時
	 * @throws InterruptedException 
	 */
	public static DetectResult detectUrlSync(OpenAPIDetector detector, String url, String md5, int timeout_ms, boolean wait_if_queuefull) throws InterruptedException {
		if (null == detector || null == url || null == md5) return null;
		DetectResult result = null;
		while(true) {
			result = detector.detectUrlSync(url, md5, timeout_ms);
			if (null == result) break;
			if (result.error_code != ERR_CODE.ERR_DETECT_QUEUE_FULL) break;
			if (!wait_if_queuefull) break;
			detector.waitQueueAvailable(-1);
		}
		return result;
	}
	
	/**
	 * 異步檢測URL文件接口
	 * @param detector 檢測器對象
	 * @param url 待檢測的文件URL
	 * @param md5 待檢測的文件md5
	 * @param timeout_ms 設置超時時間,單位為毫秒
	 * @param wait_if_queuefull 如果檢測隊列滿了,false表示不等待直接返回錯誤,true表示一直等待直到隊列不滿時
	 * @param callback 結果回調函數
	 * @throws InterruptedException 
	 */
	public static int detectUrl(OpenAPIDetector detector, String url, String md5, int timeout_ms, boolean wait_if_queuefull, IDetectResultCallback callback) throws InterruptedException {
		if (null == detector || null == url || null == md5 || null == callback) return ERR_CODE.ERR_INIT.value();
		int result = ERR_CODE.ERR_INIT.value();
		if (wait_if_queuefull) {
			final IDetectResultCallback real_callback = callback;
			callback = new IDetectResultCallback() {
				public void onScanResult(int seq, String file_path, DetectResult callback_res) {
					if (callback_res.error_code == ERR_CODE.ERR_DETECT_QUEUE_FULL) return;
					real_callback.onScanResult(seq, file_path, callback_res);
				}
			};
		}
		while(true) {
			result = detector.detectUrl(url, md5, timeout_ms, callback);
			if (result != ERR_CODE.ERR_DETECT_QUEUE_FULL.value()) break;
			if (!wait_if_queuefull) break;
			detector.waitQueueAvailable(-1);
		}
		return result;
	}
	
	/**
	 * 格式化檢測結果
	 * @param result 檢測結果對象
	 * @return 格式化后的字符串
	 */
	public static String formatDetectResult(DetectResult result) {
		if (result.isSucc()) {
			DetectResult.DetectResultInfo info = result.getDetectResultInfo();
			String msg = String.format("[DETECT RESULT] [SUCCEED] %s", formatDetectResultInfo(info));
			if (info.compresslist != null) {
				int idx = 1;
				for (DetectResult.CompressFileDetectResultInfo comp_res : info.compresslist) {
					msg += String.format("\n\t\t\t [COMPRESS FILE] [IDX:%d] %s", idx++, formatCompressFileDetectResultInfo(comp_res));
				}
			}
			return msg;
		} 
		DetectResult.ErrorInfo info = result.getErrorInfo();
		return String.format("[DETECT RESULT] [FAIL] md5: %s, time: %d, error_code: %s, error_message: %s"
				, info.md5, info.time, info.error_code.name(), info.error_string);
	}
	
	private static String formatDetectResultInfo(DetectResult.DetectResultInfo info) {
		String msg = String.format("MD5: %s, TIME: %d, RESULT: %s, SCORE: %d", info.md5, info.time, info.result.name(), info.score);
		if (info.compresslist != null) {
			msg += String.format(", COMPRESS_FILES: %d", info.compresslist.size());
		}
		DetectResult.VirusInfo vinfo = info.getVirusInfo();
		if (vinfo != null) {
			msg += String.format(", VIRUS_TYPE: %s, EXT_INFO: %s", vinfo.virus_type, vinfo.ext_info);
		}
		return msg;
	}
	private static String formatCompressFileDetectResultInfo(DetectResult.CompressFileDetectResultInfo info) {
		String msg = String.format("PATH: %s, \t\t RESULT: %s, SCORE: %d", info.path, info.result.name(), info.score);
		DetectResult.VirusInfo vinfo = info.getVirusInfo();
		if (vinfo != null) {
			msg += String.format(", VIRUS_TYPE: %s, EXT_INFO: %s", vinfo.virus_type, vinfo.ext_info);
		}
		return msg;
	}
	
	/**
	 * 同步檢測目錄或文件
	 * @param path 指定路徑,可以是文件或者目錄。目錄的話就會遞歸遍歷
	 * @param is_sync 是否使用同步接口,推薦使用異步。 true是同步, false是異步
	 * @throws InterruptedException 
	 */
	public static void detectDirOrFileSync(OpenAPIDetector detector, String path, int timeout_ms, Map<String, DetectResult> result_map) throws InterruptedException {
		File file = new File(path);
		String abs_path = file.getAbsolutePath();
		if (file.isDirectory()) {
			String[] ss = file.list();
	        if (ss == null) return;
	        for (String s : ss) {
	        	String subpath = abs_path + File.separator + s;
	        	detectDirOrFileSync(detector, subpath, timeout_ms, result_map);
	        }
			return;
		}

    	System.out.println(String.format("[detectFileSync] [BEGIN] queueSize: %d, path: %s, timeout: %d", detector.getQueueSize(), abs_path, timeout_ms));
		DetectResult res = detectFileSync(detector, abs_path, timeout_ms, true);
    	System.err.println(String.format("                 [ END ] %s", formatDetectResult(res)));
		result_map.put(abs_path, res);
	}
	
	/**
	 * 異步檢測目錄或文件
	 * @param path 指定路徑,可以是文件或者目錄。目錄的話就會遞歸遍歷
	 * @param is_sync 是否使用同步接口,推薦使用異步。 true是同步, false是異步
	 * @throws InterruptedException 
	 */
	public static void detectDirOrFile(OpenAPIDetector detector, String path, int timeout_ms, IDetectResultCallback callback) throws InterruptedException {
		File file = new File(path);
		String abs_path = file.getAbsolutePath();
		if (file.isDirectory()) {
			String[] ss = file.list();
	        if (ss == null) return;
	        for (String s : ss) {
	        	String subpath = abs_path + File.separator + s;
	        	detectDirOrFile(detector, subpath, timeout_ms, callback);
	        }
	        return;
		}

    	
		int seq = detectFile(detector, abs_path, timeout_ms, true, callback);
		System.out.println(String.format("[detectFile] [BEGIN] seq: %d, queueSize: %d, path: %s, timeout: %d", seq, detector.getQueueSize(), abs_path, timeout_ms));
	}
	
	/**
	 * 開始對文件或目錄進行
	 * @param path 指定路徑,可以是文件或者目錄。目錄的話就會遞歸遍歷
	 * @param is_sync 是否使用同步接口,推薦使用異步。 true是同步, false是異步
	 * @throws InterruptedException 
	 */
	public static void scan(final OpenAPIDetector detector, String path, int detect_timeout_ms, boolean is_sync) throws InterruptedException {
		System.out.println(String.format("[SCAN] [START] path: %s, detect_timeout_ms: %d, is_sync: %b", path, detect_timeout_ms, is_sync));
		long start_time = System.currentTimeMillis();
		final Map<String, DetectResult> result_map = new HashMap<>();
		if (is_sync) {
			detectDirOrFileSync(detector, path, detect_timeout_ms, result_map);
		} else {
			detectDirOrFile(detector, path, detect_timeout_ms, new IDetectResultCallback() {
				public void onScanResult(int seq, String file_path, DetectResult callback_res) {
			    	System.err.println(String.format("[detectFile] [ END ] seq: %d, queueSize: %d, %s", seq, detector.getQueueSize(), formatDetectResult(callback_res)));
					result_map.put(file_path, callback_res);
				}
			});
			// 等待任務執行完成
			detector.waitQueueEmpty(-1);
		}
		long used_time = System.currentTimeMillis() - start_time;
		System.out.println(String.format("[SCAN] [ END ] used_time: %d, files: %d", used_time, result_map.size()));
		
		int fail_count = 0;
		int white_count = 0;
		int black_count = 0;
		for (Map.Entry<String, DetectResult> entry : result_map.entrySet()) {
			DetectResult res = entry.getValue();
			if (res.isSucc()) {
				if (res.getDetectResultInfo().result == DetectResult.RESULT.RES_BLACK) {
					black_count ++;
				} else {
					white_count ++;
				}
			} else {
				fail_count ++;
			}
		}
		System.out.println(String.format("             fail_count: %d, white_count: %d, black_count: %d"
				, fail_count, white_count, black_count));
	}
	
    public static void main(String[] args_) throws Exception {
    	// 獲取檢測器實例
    	OpenAPIDetector detector = OpenAPIDetector.getInstance();
    	
    	// 初始化
    	ERR_CODE init_ret = detector.init(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
    	System.out.println("INIT RET: " + init_ret.name());
    	
    	// 設置解壓縮參數(可選,默認不解壓壓縮包)
    	boolean decompress = true; // 是否識別壓縮文件并解壓,默認為false
    	int decompressMaxLayer = 5; // 最大解壓層數,decompress參數為true時生效
    	int decompressMaxFileCount = 1000; // 最大解壓文件數,decompress參數為true時生效
    	ERR_CODE initdec_ret = detector.initDecompress(decompress, decompressMaxLayer, decompressMaxFileCount);
    	System.out.println("INIT_DECOMPRESS RET: " + initdec_ret.name());
    	
    	if (true) {
    		// 示例用法1:掃描本地目錄或文件
    		boolean is_sync_scan = false; // 是異步檢測還是同步檢測。異步檢測性能更好。false表示異步檢測
        	int timeout_ms = 500000;  // 單個樣本檢測時間,單位為毫秒
        	String path = "test2.php"; // 待掃描的文件或目錄
        	// 啟動掃描,直到掃描結束
        	scan(detector, path, timeout_ms, is_sync_scan);
    	}
    	
    	if (true) {
    		// 示例用法2:掃描URL文件
        	int timeout_ms = 500000;  // 單個樣本檢測時間,單位為毫秒
        	String url = "https://xxxxxxxx.oss-cn-hangzhou-1.aliyuncs.com/xxxxx/xxxxxxxxxxxxxx?Expires=1*****25&OSSAccessKeyId=xxx"; // 待掃描的URL文件
        	String md5 = "a767f*************6e21d000000"; // 待掃描的文件MD5
        	// 同步掃描。如果需要異步掃描,調用detectUrl接口
        	System.out.println(String.format("[detectUrlSync] [BEGIN] URL: %s, MD5: %s, TIMEOUT: %d", url, md5, timeout_ms));
        	DetectResult result = detectUrlSync(detector, url, md5, timeout_ms, true);
        	System.err.println(String.format("[detectUrlSync] [ END ] %s", formatDetectResult(result)));
    	}
    	
    	
		// 反初始化
		System.out.println("Over.");
    	detector.uninit();
    }
}
# -*- coding: utf-8 -*-
import os
import sys
from typing import List
import threading
import time
import traceback

from alibabacloud_filedetect.OpenAPIDetector import OpenAPIDetector
from alibabacloud_filedetect.IDetectResultCallback import IDetectResultCallback
from alibabacloud_filedetect.ERR_CODE import ERR_CODE
from alibabacloud_filedetect.DetectResult import DetectResult

class Sample(object):
    
    def __init__(self):
        pass


    """
    同步檢測文件接口
    @param detector 檢測器對象
    @param path 待檢測的文件路徑
    @param timeout_ms 設置超時時間,單位為毫秒
    @param wait_if_queuefull 如果檢測隊列滿了,False表示不等待直接返回錯誤,True表示一直等待直到隊列不滿時
    """
    def detectFileSync(self, detector, path, timeout_ms, wait_if_queuefull):
        if detector is None or path is None:
            return None
        result = None
        while True:
            result = detector.detectSync(path, timeout_ms)
            if result is None:
                break
            if result.error_code != ERR_CODE.ERR_DETECT_QUEUE_FULL:
                break
            if wait_if_queuefull is False:
                break
            detector.waitQueueAvailable(-1)
        return result


    """
    異步檢測文件接口
    @param detector 檢測器對象
    @param path 待檢測的文件路徑
    @param timeout_ms 設置超時時間,單位為毫秒
    @param wait_if_queuefull 如果檢測隊列滿了,False表示不等待直接返回錯誤,True表示一直等待直到隊列不滿時
    @param callback 結果回調函數
    """
    def detectFile(self, detector, path, timeout_ms, wait_if_queuefull, callback):
        if detector is None or path is None or callback is None:
            return ERR_CODE.ERR_INIT.value
        result = ERR_CODE.ERR_INIT.value
        if wait_if_queuefull:
            real_callback = callback
            class AsyncTaskCallback(IDetectResultCallback):
                def onScanResult(self, seq, file_path, callback_res):
                    if callback_res.error_code == ERR_CODE.ERR_DETECT_QUEUE_FULL:
                        return
                    real_callback.onScanResult(seq, file_path, callback_res)
            callback = AsyncTaskCallback()
        while True:
            result = detector.detect(path, timeout_ms, callback)
            if result != ERR_CODE.ERR_DETECT_QUEUE_FULL.value:
                break
            if wait_if_queuefull is False:
                break
            detector.waitQueueAvailable(-1)
        return result
    

    """
    同步檢測URL文件接口
    @param detector 檢測器對象
	@param url 待檢測的文件URL
	@param md5 待檢測的文件md5
	@param timeout_ms 設置超時時間,單位為毫秒
	@param wait_if_queuefull 如果檢測隊列滿了,false表示不等待直接返回錯誤,true表示一直等待直到隊列不滿時
    """
    def detectUrlSync(self, detector, url, md5, timeout_ms, wait_if_queuefull):
        if detector is None or url is None or md5 is None:
            return None
        result = None
        while True:
            result = detector.detectUrlSync(url, md5, timeout_ms)
            if result is None:
                break
            if result.error_code != ERR_CODE.ERR_DETECT_QUEUE_FULL:
                break
            if wait_if_queuefull is False:
                break
            detector.waitQueueAvailable(-1)
        return result


    """
    異步檢測URL文件接口
	@param detector 檢測器對象
	@param url 待檢測的文件URL
	@param md5 待檢測的文件md5
	@param timeout_ms 設置超時時間,單位為毫秒
	@param wait_if_queuefull 如果檢測隊列滿了,false表示不等待直接返回錯誤,true表示一直等待直到隊列不滿時
	@param callback 結果回調函數
    """
    def detectUrl(self, detector, url, md5, timeout_ms, wait_if_queuefull, callback):
        if detector is None or url is None or md5 is None or callback is None:
            return ERR_CODE.ERR_INIT.value
        result = ERR_CODE.ERR_INIT.value
        if wait_if_queuefull:
            real_callback = callback
            class AsyncTaskCallback(IDetectResultCallback):
                def onScanResult(self, seq, file_path, callback_res):
                    if callback_res.error_code == ERR_CODE.ERR_DETECT_QUEUE_FULL:
                        return
                    real_callback.onScanResult(seq, file_path, callback_res)
            callback = AsyncTaskCallback()
        while True:
            result = detector.detectUrl(url, md5, timeout_ms, callback)
            if result != ERR_CODE.ERR_DETECT_QUEUE_FULL.value:
                break
            if wait_if_queuefull is False:
                break
            detector.waitQueueAvailable(-1)
        return result


    """
    格式化檢測結果
    @param result 檢測結果對象
    @return 格式化后的字符串
    """
    @staticmethod
    def formatDetectResult(result):
        msg = ""
        if result.isSucc():
            info = result.getDetectResultInfo()
            msg = "[DETECT RESULT] [SUCCEED] {}".format(Sample.formatDetectResultInfo(info))
            if info.compresslist is not None:
                idx = 1
                for comp_res in info.compresslist:
                    msg += "\n\t\t\t [COMPRESS FILE] [IDX:{}] {}".format(idx, Sample.formatCompressFileDetectResultInfo(comp_res))
                    idx += 1
        else:
            info = result.getErrorInfo()
            msg = "[DETECT RESULT] [FAIL] md5: {}, time: {}, error_code: {}, error_message: {}".format(info.md5,
                info.time, info.error_code.name, info.error_string)
        return msg


    @staticmethod
    def formatDetectResultInfo(info):
        msg = "MD5: {}, TIME: {}, RESULT: {}, SCORE: {}".format(info.md5, info.time, info.result.name, info.score)
        if info.compresslist is not None:
            msg += ", COMPRESS_FILES: {}".format(len(info.compresslist))
        vinfo = info.getVirusInfo()
        if vinfo is not None:
            msg += ", VIRUS_TYPE: {}, EXT_INFO: {}".format(vinfo.virus_type, vinfo.ext_info)
        return msg


    @staticmethod
    def formatCompressFileDetectResultInfo(info):
        msg = "PATH: {}, \t\t RESULT: {}, SCORE: {}".format(info.path, info.result.name, info.score)
        vinfo = info.getVirusInfo()
        if vinfo is not None:
            msg += ", VIRUS_TYPE: {}, EXT_INFO: {}".format(vinfo.virus_type, vinfo.ext_info)
        return msg


    """
    同步檢測目錄或文件
    @param path 指定路徑,可以是文件或者目錄。目錄的話就會遞歸遍歷
    @param is_sync 是否使用同步接口,推薦使用異步。 True是同步,False是異步
    """
    def detectDirOrFileSync(self, detector, path, timeout_ms, result_map):
        abs_path = os.path.abspath(path)
        if os.path.isdir(abs_path):
            sub_files = os.listdir(abs_path)
            if len(sub_files) == 0:
                return
            for sub_file in sub_files:
                sub_path = os.path.join(abs_path, sub_file)
                self.detectDirOrFileSync(detector, sub_path, timeout_ms, result_map)
            return
        
        print("[detectFileSync] [BEGIN] queueSize: {}, path: {}, timeout: {}".format(
            detector.getQueueSize(), abs_path, timeout_ms))
        res = self.detectFileSync(detector, abs_path, timeout_ms, True)
        print("                 [ END ] {}".format(Sample.formatDetectResult(res)))
        result_map[abs_path] = res
        return


    """
    異步檢測目錄或文件
    @param path 指定路徑,可以是文件或者目錄。目錄的話就會遞歸遍歷
    @param is_sync 是否使用同步接口,推薦使用異步。True是同步, False是異步
    """
    def detectDirOrFile(self, detector, path, timeout_ms, callback):
        abs_path = os.path.abspath(path)
        if os.path.isdir(abs_path):
            sub_files = os.listdir(abs_path)
            if len(sub_files) == 0:
                return
            for sub_file in sub_files:
                sub_path = os.path.join(abs_path, sub_file)
                self.detectDirOrFile(detector, sub_path, timeout_ms, callback)
            return
        
        seq = self.detectFile(detector, abs_path, timeout_ms, True, callback)
        print("[detectFile] [BEGIN] seq: {}, queueSize: {}, path: {}, timeout: {}".format(
            seq, detector.getQueueSize(), abs_path, timeout_ms))   
        return

    
    """
    開始對文件或目錄進行檢測
    @param path 指定路徑,可以是文件或者目錄。目錄的話就會遞歸遍歷
    @param is_sync 是否使用同步接口,推薦使用異步。 True是同步,False是異步
    """
    def scan(self, detector, path, detect_timeout_ms, is_sync):
        try:
            print("[SCAN] [START] path: {}, detect_timeout_ms: {}, is_sync: {}".format(path, detect_timeout_ms, is_sync))
            start_time = time.time()
            result_map = {}
            if is_sync:
                self.detectDirOrFileSync(detector, path, detect_timeout_ms, result_map)
            else:
                class AsyncTaskCallback(IDetectResultCallback):
                    def onScanResult(self, seq, file_path, callback_res):
                        print("[detectFile] [ END ] seq: {}, queueSize: {}, {}".format(seq,
                            detector.getQueueSize(), Sample.formatDetectResult(callback_res)))
                        result_map[file_path] = callback_res
                self.detectDirOrFile(detector, path, detect_timeout_ms, AsyncTaskCallback())
                # 等待任務執行完成
                detector.waitQueueEmpty(-1)
            
            used_time_ms = (time.time() - start_time) * 1000 
            print("[SCAN] [ END ] used_time: {}, files: {}".format(int(used_time_ms), len(result_map)))
            
            failed_count = 0
            white_count = 0
            black_count = 0
            for file_path, res in result_map.items():
                if res.isSucc():
                    if res.getDetectResultInfo().result == DetectResult.RESULT.RES_BLACK:
                        black_count += 1
                    else:
                        white_count += 1
                else:
                    failed_count += 1
            
            print("               fail_count: {}, white_count: {}, black_count: {}".format(
                failed_count, white_count, black_count))

        except Exception as e:
            print(traceback.format_exc(), file=sys.stderr)


    def main(self):

        # 獲取檢測器實例
        detector = OpenAPIDetector.get_instance()

        # 讀取環境變量中的Access Key ID和Access Key Secret
        access_key_id = os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID')
        access_key_secret = os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET')

        # 初始化
        init_ret = detector.init(access_key_id, access_key_secret)
        print("INIT RET: {}".format(init_ret.name))

        # 設置解壓縮參數(可選,默認不解壓壓縮包)
        decompress = True # 是否識別壓縮文件并解壓,默認為false
        decompressMaxLayer = 5 # 最大解壓層數,decompress參數為true時生效
        decompressMaxFileCount = 1000 # 最大解壓文件數,decompress參數為true時生效
        initdec_ret = detector.initDecompress(decompress, decompressMaxLayer, decompressMaxFileCount)
        print("INIT_DECOMPRESS RET: {}".format(initdec_ret.name))

        if True:
            # 示例用法1:掃描本地目錄或文件
            is_sync_scan = False # 是異步檢測還是同步檢測。異步檢測性能更好。False表示異步檢測
            timeout_ms = 500000 # 單個樣本檢測時間,單位為毫秒
            path = "test.bin" # 待掃描的文件或目錄
            # 啟動掃描,直到掃描結束
            self.scan(detector, path, timeout_ms, is_sync_scan)

        if True:
            # 示例用法2:掃描URL文件
            timeout_ms = 500000
            url = "https://xxxxxxxx.oss-cn-hangzhou-1.aliyuncs.com/xxxxx/xxxxxxxxxxxxxx?Expires=1671448125&OSSAccessKeyId=xxx" # 待掃描的URL文件
            md5 = "a767ffc59d93125c7505b6e21d000000"
            # 同步掃描。如果需要異步掃描,調用detectUrl接口
            print("[detectUrlSync] [BEGIN] URL: {}, MD5: {}, TIMEOUT: {}".format(url, md5, timeout_ms))
            result = self.detectUrlSync(detector, url, md5, timeout_ms, True)
            print("[detectUrlSync] [ END ] {}".format(Sample.formatDetectResult(result)))

        # 反初始化
        print("Over.")
        detector.uninit()
        

if __name__ == "__main__":
    sample = Sample()
    sample.main()

返回結果

調用SDK進行惡意文件檢測后,只能在程序的運行結果中查看檢測結果,檢測結果不會同步到云安全中心控制臺中。控制臺中僅支持查看剩余檢測次數。風險文件總覽頁面的統計數據,例如檢測文件總數等,為OSS文件檢測的結果。

struct DetectResult {
    std::string md5; // 樣本md5
    long time = 0; // 用時,單位為毫秒

    ERR_CODE error_code;    // 錯誤碼
    std::string error_string;     // 擴展錯誤信息

    enum RESULT {
        RES_WHITE = 0,       //安全文件
        RES_BLACK = 1,       //可疑文件
        RES_PENDING = 3           //檢測中
     };
    RESULT result;          //檢測結果

    int score;                //檢測分值,取值范圍0~100
    std::string virus_type;    //病毒類型,如“WebShell/MalScript/Hacktool”
    std::string ext_info;    //擴展信息為JSON字符串

    struct CompressFileDetectResultInfo {
        std::string path; // 壓縮文件路徑
        RESULT result; // 檢測結果
        int score;	// 分值,取值范圍0-100
        std::string virus_type;    //病毒類型,如“WebShell/MalScript/Hacktool”
        std::string ext_info;    //擴展信息為JSON字符串
	};
    std::list<struct CompressFileDetectResultInfo> compresslist = null; // 如果是壓縮包,并且開啟了壓縮包解壓參數,則此處會輸出壓縮包內文件檢測結果
    
};

錯誤碼說明如下:

enum ERR_CODE {
    ERR_INIT = -100,            	// 需要初始化,或者重復初始化
    ERR_FILE_NOT_FOUND = -99,  		// 文件未找到
    ERR_DETECT_QUEUE_FULL = -98, 	// 檢測隊列滿
    ERR_CALL_API = -97, 			// 調用API錯誤
    ERR_TIMEOUT = -96, 				// 超時
    ERR_UPLOAD = -95, 				//文件上傳失敗;用戶可重新發起檢測,再次嘗試
    ERR_ABORT = -94, 				//程序退出,樣本未得到檢測
    ERR_TIMEOUT_QUEUE = -93, 		//隊列超時,用戶發起檢測頻率過高或超時時間過短
	ERR_MD5 = -92, 					// MD5格式不對
	ERR_URL = -91, 					// URL格式不對
    
    ERR_SUCC = 0              		// 成功
};

檢測分數越高,文件可能存在的風險越高。檢測分數和危險等級對應表如下:

分數區間

危險等級

0~60

安全

61~70

風險(低危)

71~80

可疑(中危)

81~100

惡意(高危)

在云安全中心控制臺檢測OSS中存儲的文件

  1. 登錄云安全中心控制臺。在控制臺左上角,選擇需防護資產所在的區域中國

  2. 在左側導航欄,選擇風險治理 > 惡意文件檢測SDK

  3. 單擊OSS文件檢測頁簽,選擇合適的檢測方式并執行檢測。

    如果您的Bucket沒有在OSS文件檢測的列表中,您可以單擊同步Bucket,同步最新的Bucket列表。

    檢測方式

    說明

    操作步驟

    手動全量檢測

    檢測單個或多個Bucket內的所有文件。

    1. OSS文件檢測頁簽,單擊單個Bucke操作列的檢測或選中多個Bucket后單擊批量檢測

    2. 檢測對話框,指定需要檢測文件的類型、檢測文件范圍、掃描路徑等。

      • 解壓層級:如果選擇檢測的文件類型包含壓縮類型,可以設置解壓層級(最多5層,支持不解壓)和單個壓縮包解壓文件數量限制(最多1000個)。

      • 文件解密類型:默認不解密,如果OSS文件配置了服務端加密(SSE-KMS、SSE-OSS),可以設置解密后再檢測。解密類型OSS對應SSE-OSS加密的OSS文件,解密類型KMS對應SSE-KMS加密的OSS文件。

      • 檢測文件范圍:可選。設置檢測范圍的時間上限。設置后會檢測文件更新時間晚于該時間的文件。

      • 掃描路徑:支持選擇按前綴匹配(輸入文件名前綴來匹配掃描指定文件)或配置到整個Bucket(掃描整個Bucket文件)。

    3. 單擊確定

    手動增量檢測

    針對已檢測且上次檢測后有文件更新的Bucket,提供只檢測未檢測過的文件的功能。

    1. OSS文件檢測頁簽,單擊目標Bucke操作列的增量檢測

    2. 增量檢測對話框中,指定需要檢測文件的類型、解壓層級(壓縮類型文件支持)、文件解密類型、檢測文件范圍、掃描路徑(支持按前綴匹配配置到整個Bucket)。

    3. 單擊確定

    自動檢測

    通過配置掃描策略可為指定Bucket開啟周期性自動檢測。配置前建議了解以下信息:

    • 一個Bucket只能在一個策略中生效。

    • 自動檢測僅會檢測OSS新增的文件,不會重復檢測同一個文件。

    1. OSS文件檢測頁簽,單擊策略管理區域的策略配置

    2. 策略管理面板,單擊新增策略

      如果已配置策略的檢測方案符合檢測要求,您可以單擊該策略操作列的編輯,將目標Bucket添加到該策略的生效范圍內。

    3. 策略創建面板配置策略名稱策略啟用狀態(默認啟用)、掃描路徑(支持按前綴匹配配置到整個Bucket)、檢測文件范圍檢測周期文件檢測時間文件檢測類型解壓層級(壓縮類型文件支持)、文件解密類型生效Bucket等。

    4. 單擊確定。

可選:配置釘釘機器人通知

您可以在系統配置中添加釘釘機器人通知,通過釘釘群實時接收云安全中心檢測的惡意文件告警信息。具體操作,請參見配置釘釘機器人通知

image

查看和導出檢測結果

風險文件總覽

風險文件總覽頁簽:

  • 查看風險文件統計信息

    image

    • 不同等級(高危中危低危)風險通過不同顏色顯示:高危(紅色)中危(橙色)低危(灰色)

    • 在風險文件列表中,可以根據檢測場景列來選擇查看調用SDK(API)和在控制臺(OSS)檢測的惡意文件信息,也可以單擊列表左上方的搜索項的下拉框,根據風險等級文件名稱威脅標簽MD5最新檢測時間來查看目標惡意文件詳情。

      如果是壓縮包文件,可單擊文件前的展開image圖標,展示該壓縮包下的風險文件列表,該壓縮包文件的風險等級為其下風險文件中的最高風險等級。

  • 查看目標風險文件詳情

    • 在風險文件列表中,單擊目標風險文件操作列的詳情,在該文件的詳情面板,查看檢測出的存在風險的文件詳情事件說明(包含惡意文件說明和處置建議)等信息

      在壓縮包文件的風險詳情面板,還可查看該壓縮包中的風險文件數、解壓后檢測的文件總數,以及風險文件列表等信息。

      image

  • 導出所有風險文件列表

    單擊image..png圖標,等待文件導出完成后,在提示框中單擊下載

OSS文件檢測

OSS文件檢測頁簽:

  • 從OSS Bucket維度查看風險文件統計信息

    不同等級(高危中危低危)風險通過不同顏色顯示:高危(紅色)中危(橙色)低危(灰色)

    image

  • 查看目標OSS Bucket的檢測詳情

    在Bucket列表中,單擊目標Bucket操作列的詳情,查看Bucket的基本信息和風險文件詳情。

    在壓縮包的文件檢測詳情頁面,還可以查看配置文件解壓情況、解壓后的文件總數、檢測的文件數量以及已掃描、未掃描的風險文件列表。

    image

  • 按Bucket維度導出所有風險文件列表

    OSS文件檢測頁簽,單擊image..png圖標,等待文件導出完成后,在提示框中單擊下載

處理風險文件的告警事件

對于檢測出的風險文件,需要您根據風險等級以及云安全中心提供的事件說明和處置建議,進行確認和手動修復。

  • 高危風險事件:誤報的概率較低,同時惡意性較強,建議及時處理。

  • 中危和低危風險事件:該等級風險文件多出現在業務場景中,需要您核實對業務的影響后處理。

    如果您確認是業務的正常文件或是誤報,您可以忽略相關告警事件。如果您在業務程序中調用SDK檢測文件,對于正常或誤報的文件,您也可以在業務程序中添加忽略或加入白名單的處理邏輯。

相關API

您可以通過調用API接口方式使用惡意文件檢測功能。具體使用方法如下: