存在多種方式可以將日志服務SLS的數據寫入至Hologres,本文以Flink、DataWorks數據集成為例,為您介紹如何將SLS數據實時寫入至Hologres。
前提條件
開通SLS服務并創建Project和Logstore,詳情請參見快速入門。
開通Hologres服務并連接至開發工具,詳情請參見實時數倉Hologres使用流程。
如果選擇通過Flink將SLS數據寫入Hologres,請開通實時計算Flink版服務并創建項目空間,詳情請參見開通Flink全托管和創建與管理項目空間。
如果選擇通過DataWorks數據集成將SLS數據寫入Hologre,請開通DataWorks服務并創建工作空間,詳情請參見開通DataWorks服務和創建工作空間。
背景信息
日志服務SLS是云原生觀測與分析平臺,為Log、Metric、Trace等數據提供大規模、低成本、實時的平臺化服務。日志服務一站式提供數據采集、加工、查詢與分析、可視化、告警、消費與投遞等功能,全面提升您在研發、運維、運營、安全等場景的數字化能力。
Hologres致力于高性能、高可靠、低成本、可擴展的實時計算引擎研發,為用戶提供海量數據的實時數據倉庫解決方案和亞秒級交互式查詢服務,廣泛應用在實時數據中臺建設、精細化分析、自助式分析、營銷畫像、人群圈選、實時風控等場景。您可以將SLS的數據快速寫入Hologres中,進行實時分析實時查詢,提升業務對數據的探索能力。
通過Flink將SLS數據寫入Hologres
準備SLS數據。
本次準備的SLS數據來源于SLS日志平臺的模擬數據,模擬游戲登錄和消費日志的數據。如果您有業務數據,請直接使用業務數據。
創建Hologres表。
在Hologres中創建用于接收數據的表。您可以根據業務查詢需求,為對應的字段創建索引,以提升查詢效率。索引介紹請參見建表概述。本次示例建表DDL如下。
CREATE TABLE sls_flink_holo ( content JSONB , operation TEXT, uid TEXT, topic TEXT , source TEXT , c__timestamp TIMESTAMPTZ, receive_time BIGINT, PRIMARY KEY (uid) );
通過Flink寫入數據。
在Flink中將SLS數據寫入Hologres可以參考如下文檔:
Flink讀取SLS數據:日志服務SLS源表。
Flink寫入Hologres:實時數倉Hologres結果表。
本次SLS數據通過Flink寫入Hologres的SQL作業示例如下,其中JSON類型字段直接寫入Hologres JSON類型中,因為Flink中沒有JSON類型,使用VARCHAR類型代替。
CREATE TEMPORARY TABLE sls_input ( content STRING, operation STRING, uid STRING, `__topic__` STRING METADATA VIRTUAL, `__source__` STRING METADATA VIRTUAL, `__timestamp__` BIGINT METADATA VIRTUAL, `__tag__` MAP<VARCHAR, VARCHAR> METADATA VIRTUAL ) WITH ( 'connector' = 'sls', 'endpoint' = 'sls私域endpoint',--sls私域endpoint 'accessid' = 'access id',--賬號access id 'accesskey' = 'access key',--賬號access key 'starttime' = '2024-08-30 00:00:00',--消費日志的開始時間 'project' = 'project name',--sls的project名 'logstore' = 'LogStore名稱'--LogStore名稱。 ); CREATE TEMPORARY TABLE hologres_sink ( content VARCHAR, operation VARCHAR, uid VARCHAR, topic STRING , source STRING , c__timestamp TIMESTAMP , receive_time BIGINT ) WITH ( 'connector' = 'hologres', 'dbname' = 'holo db nema', --Hologres的數據庫名稱。 'tablename' = 'holo tablene', --Hologres用于接收數據的表名稱。 'username' = 'access id', --當前阿里云賬號的AccessKey ID。 'password' = 'access key', --當前阿里云賬號的AccessKey Secret。 'endpoint' = 'holo vpc endpoint' --當前Hologres實例VPC網絡的Endpoint。 ); INSERT INTO hologres_sink SELECT content, operation, uid, `__topic__` , `__source__` , CAST ( FROM_UNIXTIME (`__timestamp__`) AS TIMESTAMP ), CAST (__tag__['__receive_time__'] AS BIGINT) AS receive_time FROM sls_input;
查詢數據。
在Hologres中查詢通過Flink寫入Hologres中的SLS數據,后續您可以根據業務需求進行數據開發。
通過DataWorks數據集成將SLS數據寫入Hologres
準備SLS數據。
本次準備的SLS數據來源于SLS日志平臺的模擬數據,模擬游戲登錄和消費日志的數據。如果您有業務數據,請直接使用業務數據。
創建Hologres表。
在Hologres中創建用于接收數據的表。并根據業務查詢需求,為對應的字段創建索引,以提升查詢效率。索引介紹請參見建表概述。本次示例建表DDL如下。
說明本示例中設置
uid
為主鍵,保證數據的唯一性,實際設置可以根據業務需求選擇。設置
uid
為Distribution Key,數據寫入時相同的uid
可以寫入至同一個Shard,提高查詢性能。
BEGIN; CREATE TABLE sls_dw_holo ( content JSONB , operation TEXT, uid TEXT, C_Topic TEXT , C_Source TEXT , timestamp BIGINT, PRIMARY KEY (uid) ); CALL set_table_property('sls_dw_holo', 'distribution_key', 'uid'); CALL set_table_property('sls_dw_holo', 'event_time_column', 'timestamp'); COMMIT;
配置數據源。
在進行數據同步之前,使用數據集成功能需要為DataWorks工作空間添加數據源。
SLS數據源使用Loghub數據源,詳情請參見配置LogHub(SLS)數據源。
配置Hologres數據源,詳情請參見配置Hologres數據源。
實時同步數據。
在數據集成中創建和運行實時同步任務,詳情請參見配置單表增量數據實時同步和實時同步任務運維。
本示例創建的實時同步任務,配置輸入為Loghub數據源,輸出為Hologres數據源,并配置如下字段同步映射關系。
查詢數據。
實時同步任務運行開始后,可以在Hologres中查詢通過DataWorks數據集成寫入Hologres中的SLS數據。