由于Flink無法提前預知您要使用的上下游系統,當作業需要訪問不同的上下游系統來讀寫數據時,可能需要使用您的AccessKey信息作為訪問憑證。主賬號的AccessKey具備云賬號下資源的全部權限,一旦泄露可能會造成嚴重后果。本文通過對RAM用戶授予相關上下游更小化權限,并結合Flink變量功能對AccessKey進行加密,以進一步提升訪問安全性。
方案說明
本方案通過減少暴露面積原則來實現更安全的AccessKey訪問。即不使用主賬號的AccessKey,通過主賬號授權子賬號的方式,讓子賬號具備訪問某些上下游的權限或更小化權限,然后利用Flink變量功能使用AccessKey,降低AccessKey明文泄漏的風險,減少意外情況下AccessKey的泄漏導致您主賬號下所有的云資源受到的相關安全風險。
本文創建了一個RAM用戶,并為RAM用戶授予日志服務SLS Project下指定Logstore權限,并利用變量功能,實現Flink作業讀寫SLS數據。
示例教程
使用阿里云賬號(主賬號)或RAM管理員創建RAM用戶,具體操作請參見創建RAM用戶。
訪問方式必須勾選OpenAPI調用訪問,啟用后,會自動為RAM用戶生成一個AccessKey ID和AccessKey Secret。
重要RAM用戶的AccessKey Secret只在創建時顯示,不支持查看,請務必妥善保管。
為RAM用戶授予SLS相關權限。
創建自定義權限策略。
在左側導航欄,選擇 。
在權限策略頁面,單擊創建權限策略。
在腳本編輯頁簽,將配置框中的原有腳本替換為如下內容,然后單擊繼續編輯基本信息。
Project和Logstore需要根據您的實際情況替換,更多權限策略請參見RAM自定義授權示例。
指定Logstore只讀權限
{ "Version": "1", "Statement": [ { "Action": "log:ListProject", "Resource": "acs:log:*:*:project/*", "Effect": "Allow" }, { "Action": "log:List*", "Resource": "acs:log:*:*:project/<指定的Project名稱>/logstore/*", "Effect": "Allow" }, { "Action": [ "log:Get*", "log:List*" ], "Resource": "acs:log:*:*:project/<指定的Project名稱>/logstore/<指定的Logstore名稱>", "Effect": "Allow" } ] }
指定Logstore寫入權限
授予RAM用戶向指定Logstore寫入數據的權限,不包含查詢等其他操作權限。
權限策略中的Logstore包括了Logstore和MetricStore。當您的操作對象為MetricStore時,如下策略同樣適用。
{ "Version":"1", "Statement":[ { "Effect":"Allow", "Action":[ "log:PostLogStoreLogs" ], "Resource":[ "acs:log:*:*:project/<指定的Project名稱>/logstore/<指定的Logstore名稱>" ] } ] }
輸入權限策略名稱和備注,單擊確定。
為RAM用戶添加上一步創建的自定義權限策略。具體操作,請參見為RAM用戶授權。
配置變量,降低AccessKey明文泄漏的風險。
為步驟1創建RAM用戶時得到的AccessKey ID和AccessKey Secret創建變量,創建后只需調用變量名使用,無需填寫具體值,具體操作步驟請參見新增變量。本文針對AccessKey ID和AccessKey Secret分別創建了名為slslak和slsaks的變量。
創建Flink作業,讀取SLS數據。
在SQL作業開發時以
${secret_values.變量名}
格式使用變量,避免明文AccessKey帶來的安全風險,示例如下。CREATE TEMPORARY TABLE sls_input( `__source__` STRING METADATA VIRTUAL, __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL, `__topic__` STRING METADATA VIRTUAL, deploymentName STRING, `level`STRING, `location` STRING, message STRING, thread STRING, `time`STRING ) WITH ( 'connector' = 'sls', 'endpoint' ='cn-beijing-intranet.log.aliyuncs.com', 'accessId' = '${secret_values.slsak}', 'accessKey' = '${secret_values.slsaks}', 'starttime' = '2024-08-30 15:39:00', 'project' ='test', 'logstore' ='flinktest' ); CREATE TEMPORARY TABLE blackhole_sink( `__source__` STRING, `__topic__` STRING, deploymentName STRING, `level` STRING, `location` STRING, message STRING, thread STRING, `time` STRING, receive_time BIGINT ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT `__source__`, `__topic__`, deploymentName, `level`, `location`, message, thread, `time`, cast(__tag__['__receive_time__'] as bigint) as receive_time FROM sls_input;
相關文檔
Flink讀寫OSS時,您也可以為RAM用戶配置指定讀寫Bucket等權限策略,詳情請參見RAM Policy。
SQL作業開發具體操作請參見SQL作業開發。
Flink提供了作業變量和項目變量功能,可以避免明文AccessKey、密碼等信息帶來的安全風險,支持在多個場景使用。詳情請參見新增變量。