本文以GitHub公開事件數據為例,為您介紹使用Flink、Hologres構建實時數倉,并通過Hologres對接BI分析工具(本文以DataV為例),實現海量數據實時分析的通用架構與核心步驟。
示例架構
搭建實時數倉時,Flink可對待處理數據進行實時清洗,完成后Hologres可直接讀取Flink中的數據,并對接BI分析工具將數據實時展示在大屏中,示例架構如圖所示。
其中:
待處理數據:
本實踐使用GitHub公開事件作為示例數據,更多關于數據集的介紹請參見業務與數據認知。
實時清洗:
Flink是流式計算引擎。Flink與Hologres原生深度集成,支持高吞吐、低延時、有模型、高質量的實時數倉開發。
數據服務:
Hologres是兼容PostgreSQL協議的實時數倉引擎,支持海量數據實時寫入與更新、實時數據寫入即可查。
實時大屏:
本實踐以DataV為例,為您展示搭建實時大屏后查看并分析數據的效果。
實踐步驟
準備工作
本實踐使用已存儲在Flink中的Github公共事件作為示例數據,因此您無需操作數據集成步驟,可使用Hologres直接讀取Flink中的示例數據。本實踐需準備的環境如下所示。
創建Hologres內部表
您需要先創建一個Hologres內部表并創建相應的索引,用于后續數據實時寫入。本實踐以實時寫入Github公開事件中今日最活躍項目數據為例,需提前創建的內部表示例代碼如下。
-- 新建schema用于創建內表并導入數據
CREATE SCHEMA IF NOT EXISTS hologres_dataset_github_event;
DROP TABLE IF EXISTS hologres_dataset_github_event.hologres_github_event;
BEGIN;
CREATE TABLE hologres_dataset_github_event.hologres_github_event (
id text,
created_at_bigint bigint,
created_at timestamp with time zone NOT NULL,
type text,
actor_id text,
actor_login text,
repo_id text,
repo_name text,
org text,
org_login text
);
CALL set_table_property ('hologres_dataset_github_event.hologres_github_event', 'distribution_key', 'id');
CALL set_table_property ('hologres_dataset_github_event.hologres_github_event', 'event_time_column', 'created_at');
CALL set_table_property ('hologres_dataset_github_event.hologres_github_event', 'clustering_key', 'created_at');
COMMENT ON COLUMN hologres_dataset_github_event.hologres_github_event.id IS '事件ID';
COMMENT ON COLUMN hologres_dataset_github_event.hologres_github_event.created_at_bigint IS '事件發生時間戳';
COMMENT ON COLUMN hologres_dataset_github_event.hologres_github_event.created_at IS '事件發生時間';
COMMENT ON COLUMN hologres_dataset_github_event.hologres_github_event.type IS '事件類型';
COMMENT ON COLUMN hologres_dataset_github_event.hologres_github_event.actor_id IS '事件發起人ID';
COMMENT ON COLUMN hologres_dataset_github_event.hologres_github_event.actor_login IS '事件發起人登錄名';
COMMENT ON COLUMN hologres_dataset_github_event.hologres_github_event.repo_id IS 'repoID';
COMMENT ON COLUMN hologres_dataset_github_event.hologres_github_event.repo_name IS 'repo名稱';
COMMENT ON COLUMN hologres_dataset_github_event.hologres_github_event.org IS 'repo所屬組織ID';
COMMENT ON COLUMN hologres_dataset_github_event.hologres_github_event.org_login IS 'repo所屬組織名稱';
COMMIT;
通過Flink實時寫入數據至Hologres
如果您申請了免費試用版的Flink,可參見教程文檔使用內置公開數據集快速體驗實時計算Flink版進入Flink SQL作業編輯運行頁面。本實踐的Flink SQL作業示例代碼如下,其中SLS的project、endpoint、logstore等參數值請參見教程文檔使用內置公開數據集快速體驗實時計算Flink版,填入對應Region的固定值。
CREATE TEMPORARY TABLE sls_input (
id STRING, -- 每個事件的唯一ID。
created_at BIGINT, -- 事件時間,單位秒。
created_at_ts as TO_TIMESTAMP(created_at * 1000), -- 事件時間戳(當前會話時區下的時間戳,如:Asia/Shanghai)。
type STRING, -- Github事件類型,如:ForkEvent, WatchEvent, IssuesEvent, CommitCommentEvent等。
actor_id STRING, -- Github用戶ID。
actor_login STRING, -- Github用戶名。
repo_id STRING, -- Github倉庫ID。
repo_name STRING, -- Github倉庫名,如:apache/flink, apache/spark, alibaba/fastjson等。
org STRING, -- Github組織ID。
org_login STRING -- Github組織名,如: apache,google,alibaba等。
)
WITH (
'connector' = 'sls', -- 實時采集的Github事件存放在阿里云SLS中。
'project' = '<yourSlsProject>', -- 存放公開數據的SLS項目。例如'github-events-hangzhou'。
'endPoint' = '<yourSlsEndpoint>', -- 公開數據僅限VVP通過私網地址訪問。例如'https://cn-hangzhou-intranet.log.aliyuncs.com'。
'logStore' = 'realtime-github-events', -- 存放公開數據的SLS logStore。
'accessId' = '****', -- 僅有內置數據集只讀權限的AK,參見Flink教程。
'accessKey' = '****', -- 僅有內置數據集只讀權限的SK,參見Flink教程。
'batchGetSize' = '500' -- 批量讀取數據,每批最多拉取500條。
);
CREATE TEMPORARY TABLE hologres_sink (
id STRING,
created_at_bigint BIGINT,
created_at TIMESTAMP,
type STRING,
actor_id STRING,
actor_login STRING,
repo_id STRING,
repo_name STRING,
org STRING,
org_login STRING
)
WITH (
'connector' = 'hologres',
'dbname' = 'holo db name', --Hologres的數據庫名稱
'tablename' = 'schema_name.table_name', --Hologres用于接收數據的表名稱
'username' = 'access id', --當前阿里云賬號的AccessKey ID
'password' = 'access key', --當前阿里云賬號的AccessKey Secret
'endpoint' = 'holo vpc endpoint', --當前Hologres實例VPC網絡的Endpoint
'jdbcretrycount' = '1', --連接故障時的重試次數
'partitionrouter' = 'true', --是否寫入分區表
'createparttable' = 'true', --是否自動創建分區
'mutatetype' = 'insertorignore' --數據寫入模式
);
INSERT INTO hologres_sink
SELECT
*
FROM
sls_input
WHERE
id IS NOT NULL
AND created_at_ts IS NOT NULL
AND created_at_ts >= date_add(CURRENT_DATE, - 1);
查詢實時數據
在Hologres中通過內部表查詢今日最活躍項目。
SELECT
repo_name,
COUNT(*) AS events
FROM
hologres_dataset_github_event.hologres_github_event
WHERE
created_at >= CURRENT_DATE
GROUP BY
repo_name
ORDER BY
events DESC
LIMIT 5;
(可選)通過DataV搭建實時大屏
您可以通過DataV的數據大屏模板,基于Hologres數據源來快速搭建GitHub事件數據實時大屏。
創建Hologres數據源。
將數據所在的Hologres實例和數據庫創建為DataV的數據源,詳情請參見DataV。
創建可視化應用。
登錄DataV控制臺。
在工作臺頁面,單擊創建PC端看板。
選擇使用Hologres實時分析GitHub事件數據模板。
修改模板中相關組件的數據源。
以左上角的今日公開事件總數為例:
單擊今日公開事件總數對應的數字框,點擊右側數據源,選擇數據源類型為實時數倉Hologres。
選擇已有數據源為您已創建的數據源。
如果您在Hologres中的表名和Schema與本實踐相同,則不需修改SQL。
修改完成后,數據響應結果刷新,大屏中成功展示實時數據。
按照示例更新大屏中的數據源和表名,需更新組件及更新后效果如下圖所示。
單擊右上角發布,完成大屏搭建。
您也可以單擊預覽,預覽實時更新的數據大屏。