數據庫加密技術屬于主動防御機制,可以防止明文存儲引起的數據泄密、突破邊界防護的外部黑客攻擊以及來自內部高權限用戶的數據竊取,從根本上解決數據庫敏感數據泄漏問題。通過加密SDK在客戶端加密的數據可以存儲在關系數據庫或非關系數據庫中。本文為您介紹數據庫敏感數據加密的使用場景、原理和示例。
使用場景
數據庫敏感數據被拖庫后,避免因明文存儲導致的數據泄露。
通常情況下,數據庫中的數據是以明文形式進行存儲和使用的,一旦數據文件(或備份文件)丟失,可能引發嚴重的數據泄露問題。而在拖庫攻擊中,明文存儲的數據對于攻擊者同樣沒有任何秘密可言。此時您需要對數據進行加密,避免數據泄露。
對高權限用戶,數據庫敏感數據加密可以防范內部竊取數據造成的數據泄露。
數據庫加密可以提供獨立于數據庫系統自身權限控制體系之外的增強權限控制的能力,由專用的加密系統為數據庫中的敏感數據設置訪問權限,從而有效限制數據庫超級用戶或其他高權限用戶對敏感數據的訪問行為,保障數據安全。
加密和解密原理
加密原理
創建密鑰。
加密SDK向密鑰管理服務KMS(Key Management Service)發送調用GenerateDataKey接口請求,申請一個數據密鑰(Data Key)。KMS返回數據密鑰以及數據密鑰密文(Encrypted Data Key)。
加密并存儲數據。
使用數據密鑰對數據進行加密,得到加密結果,進行Base64編碼。
將加密數據的Base64編碼存儲在數據庫中。
解密原理
檢索并解密數據。
從數據庫讀取密文。
將密文進行Base64解密,解析密文消息。加密SDK調用KMS的Decrypt接口將數據密鑰進行解密,KMS返回數據密鑰給本地加密客戶端。
解密數據。加密SDK(Encryption SDK)使用數據密鑰對數據密文進行解密,得到原始數據。
示例
加密SDK用于面向應用的數據庫加密,密鑰由KMS產生和管理。
通過以下Spring JPA和Python示例代碼,可以實現對User表中email字段的寫入加密和讀取解密。示例中每個字段使用一個數據密鑰,解密時設置了密鑰緩存,對同一字段進行多次查詢時會檢索緩存中可用的數據密鑰。
為保證字段能存儲加密后的字段,需要對被加密字段的長度進行擴容,擴容比例為3倍。
Spring JPA示例
阿里云賬號AccessKey擁有所有OpenAPI的訪問權限,建議您使用RAM用戶進行API訪問或日常運維。強烈建議不要把AccessKey ID和AccessKey Secret保存到工程代碼里,否則可能導致AccessKey泄露,威脅您賬號下所有資源的安全。
本示例以將AccessKey配置在環境變量ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET的方式來實現身份驗證為例。
更多認證信息配置方式,請參見Credentials 設置。
不同操作系統的環境變量配置方法不同,具體操作,請參見在Linux、macOS和Windows系統配置環境變量。
定義實體類
@Entity public class User { @Id @GeneratedValue private Long id; private String name; private String email; // getters and setters ... }
定義UserRepository類
public interface UserRepository extends CrudRepository<User, Long> { }
實現Spring JPA的AttributeConverter接口相關功能
EncryptionConverter調用加密SDK的接口,獲取數據密鑰,對指定的數據進行加密、解密。
@Converter public class EncryptionConverter implements AttributeConverter<String, String> { private static String ACCESS_KEY_ID = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"); private static String ACCESS_KEY_SECRET = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"); private static String CMK_ARN = "acs:kms:RegionId:UserId:key/CmkId"; private static AliyunConfig config; static { config = new AliyunConfig(); config.withAccessKey(ACCESS_KEY_ID, ACCESS_KEY_SECRET); } @Override public String convertToDatabaseColumn(String plainText) { BaseDataKeyProvider dataKeyProvider = new DefaultDataKeyProvider(CMK_ARN); AliyunCrypto crypto = new AliyunCrypto(config); try { CryptoResult<byte[]> encryptResult = crypto.encrypt(dataKeyProvider, plainText.getBytes(StandardCharsets.UTF_8), Collections.singletonMap("sample", "context")); return Base64.getEncoder().encodeToString(encryptResult.getResult()); } catch (InvalidAlgorithmException e) { System.out.println("Failed."); System.out.println("Error message: " + e.getMessage()); } return null; } @Override public String convertToEntityAttribute(String cipherText) { BaseDataKeyProvider dataKeyProvider = new DefaultDataKeyProvider(CMK_ARN); AliyunCrypto crypto = new AliyunCrypto(config); // *** 設置緩存數據主密鑰 *** CryptoKeyManager ckm = new CachingCryptoKeyManager(new LocalDataKeyMaterialCache()); crypto.setCryptoKeyManager(ckm); try { CryptoResult<byte[]> decryptResult = crypto.decrypt(dataKeyProvider, Base64.getDecoder().decode(cipherText)); return new String(decryptResult.getResult(), StandardCharsets.UTF_8); } catch (InvalidAlgorithmException | UnFoundDataKeyException e) { e.printStackTrace(); } return null; } }
添加@Convert注解
添加@Convert注解到需要加密的屬性(數據庫中的列)。
@Entity public class User { @Id @GeneratedValue private Long id; private String name; @Convert(converter = EncryptionConverter.class) private String email; // getters and setters ... }
Python示例
實體定義
class User(object): def get_name(self): return self.name def set_name(self, value): self.name = value def get_email(self): return self.email def set_email(self, value): self.email = value
實現數據庫操作函數
def user_add(user): # 連接數據庫。 conn = db_connection() # 得到一個可以執行SQL語句并且將結果作為字典返回的游標。 cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) # 定義要執行的SQL語句。 sql = 'insert into user(name, email) values(%s,%s);' # 執行SQL語句。 cursor.execute(sql, [user.name, user.email]) print("insert name: " + user.name) print("insert email: " + user.email) # 提交寫操作。 conn.commit() last_id = cursor.lastrowid # 關閉光標對象。 cursor.close() # 關閉數據庫連接。 conn.close() return last_id def user_select_by_id(id): # 連接數據庫。 conn = db_connection() # 得到一個可以執行SQL語句并且將結果作為字典返回的游標。 cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) # 定義要執行的SQL語句。 sql = 'select * from user where id = %s;' # 執行SQL語句。 cursor.execute(sql, [id]) result = cursor.fetchone() print("select result: " + str(result)) user = User() user.__dict__ = result # 關閉光標對象。 cursor.close() # 關閉數據庫連接。 conn.close() return user
實現數據加密、解密裝飾器(decorator)
def enc_convert(): def enc_(func): def wrapper(self, plain_text): provider = DefaultDataKeyProvider(AES_KEY_ARN) client = build_aliyun_crypto(False) cipher_text, enc_material = client.encrypt(provider, plain_text.encode("utf-8"), ENCRYPTION_CONTEXT) cipher_text_str = base64.standard_b64encode(cipher_text).decode("utf-8") f = func(self, cipher_text_str) return f return wrapper return enc_ def dec_convert(column_name): def dec_(func): def wrapper(self): user = self.__dict__ cipher_text = user.get(column_name) cipher_text_bytes = base64.standard_b64decode(cipher_text.encode("utf-8")) provider = DefaultDataKeyProvider(AES_KEY_ARN) client = build_aliyun_crypto(False) plain_text, dec_material = client.decrypt(provider, cipher_text_bytes) user[column_name] = bytes.decode(plain_text) result = User() result.__dict__ = user f = func(result) return f return wrapper return dec_
為需要保護的數據字段添加裝飾器
class User(object): def get_name(self): return self.name def set_name(self, value): self.name = value @dec_convert(column_name="email") def get_email(self): return self.email @enc_convert() def set_email(self, value): self.email = value
關于數據庫敏感數據加密的更多示例,請參見Spring JPA示例和Python示例。