本文為您介紹如何基于GitHub實時事件數據通過MaxCompute構建離線數倉、通過Flink和Hologres構建實時數倉,然后通過Hologres和MaxCompute分別進行實時與離線數據分析,從而實現實時離線一體化解決方案。
背景信息
隨著社會數字化發展,企業對數據時效性的需求越來越強烈。除傳統的面向海量數據加工場景設計的離線場景外,大量業務需要解決面向實時加工、實時存儲、實時分析的實時場景問題,為了應對這樣的情形,提出了離線實時一體化的概念。
實時離線一體化是指將實時數據和離線數據在同一平臺上管理和處理的技術。它能夠實現實時數據處理和離線數據分析的無縫銜接,從而提高數據分析效率和精度。其優勢在于:
提高數據處理效率:將實時數據和離線數據整合在同一平臺上,大大提高了數據處理效率,降低數據傳輸和轉換成本。
提高數據分析精度:將實時數據和離線數據進行混合分析,從而提高數據分析精度和準確性。
降低系統復雜度:減少數據管理和處理的復雜度,使數據管理和處理更加簡單和高效。
提高數據應用價值:更加充分地發揮數據的應用價值,為企業提供更好的決策支持。
阿里云在此方向上進行了諸多方案設計,推出了化繁為簡的實時離線一體化數倉,通過大數據計算服務MaxCompute和實時數倉Hologres分別對應上述的離線與實時場景,同時匹配Flink的實時加工能力,共同構成阿里云一體化數倉的核心引擎組件。
方案架構
使用MaxCompute和Hologres對GitHub公開事件數據集進行實時離線一體化實踐的完整鏈路圖如下所示。
其中ECS將GitHub實時與離線事件數據收集匯總后作為數據源,分別進入實時鏈路與離線鏈路,最后兩條鏈路數據匯總到Hologres,統一對外提供服務。
實時鏈路:通過Flink對日志服務中的數據實時加工并寫入Hologres。Flink是強大的流式計算引擎,Hologres支持數據實時寫入與更新、寫入即可查,二者原生集成,支持高吞吐、低延時、有模型、高質量的實時數倉開發,最終滿足業務洞察實時性需求,如最新事件提取、熱點事件分析等場景。
離線鏈路:通過MaxCompute對海量離線數據進行處理并歸檔。阿里云OSS(Object Storage Service)是阿里云提供的云存儲服務,可以用于存儲各類數據,本次實踐引用的原始數據是JSON格式,OSS可以提供方便、安全、低成本、可靠的存儲能力。MaxCompute是適用于數據分析場景的企業級SaaS(Software as a Service)模式云數據倉庫,可以直接通過外表的方式讀取并解析OSS中的半結構化數據,將高價值可用數據集成至MaxCompute內部存儲,然后結合DataWorks進行數據開發,生成離線數據倉庫。
Hologres與MaxCompute底層無縫打通,因此可以通過Hologres對MaxCompute海量歷史數據進行加速查詢分析,滿足業務對歷史數據的低頻高性能查詢需求。還可以輕松實現通過離線鏈路對實時數據的修正,解決實時鏈路中可能出現的數據遺漏等問題。
該方案優勢如下:
離線鏈路穩定高效:支持數據小時級寫入更新,可以批量處理大規模數據,進行復雜的計算和分析,降低計算成本,提高數據處理效率。
實時鏈路成熟:支持實時寫入、實時事件計算、實時分析,實時鏈路簡化,數據秒級響應。
統一存儲與服務:均由Hologres對外提供服務,數據集中存儲,對外接口一致(OLAP、KeyValue統一為SQL接口)。
實時離線融合:數據冗余少、移動少,數據可修正。
通過一站式開發,最終實現數據秒級響應,全鏈路狀態可見,架構組件少、依賴少,運維成本、人工成本均有效降低。
業務與數據認知
大量開發人員在GitHub上進行開源項目的開發工作,并在項目的開發過程中產生海量事件。GitHub會記錄每次事件的類型及詳情、開發者、代碼倉庫等信息,并開放其中的公開事件,包括加星標、提交代碼等,具體事件類型請參見Webhook events and payloads。
GitHub通過OpenAPI公布其公開事件,API僅開放5分鐘前的實時事件,詳情請參見Events。該API可用于獲取實時數據。
GH Archive項目則是將GitHub公開事件按小時進行匯總,并允許開發人員訪問,項目具體信息請參見GH Archive。該項目可用于獲取離線數據。
GitHub業務認知
Github的業務核心為管理代碼與互動交流,主要涉及三個一級實體對象:開發者(Developer)、代碼倉庫(Repository)和組織(Organization)。
在本次Github公開事件數據分析中,事件
作為一個實體對象被存儲和記錄下來。
原始公開事件數據認知
某原始事件JSON編碼數據示例如下:
{
"id": "19541192931",
"type": "WatchEvent",
"actor":
{
"id": 23286640,
"login": "herekeo",
"display_login": "herekeo",
"gravatar_id": "",
"url": "https://api.github.com/users/herekeo",
"avatar_url": "https://avatars.githubusercontent.com/u/23286640?"
},
"repo":
{
"id": 52760178,
"name": "crazyguitar/pysheeet",
"url": "https://api.github.com/repos/crazyguitar/pysheeet"
},
"payload":
{
"action": "started"
},
"public": true,
"created_at": "2022-01-01T00:03:04Z"
}
本分析實踐涉及15類公開事件(不包含未出現及不再記錄的事件),詳細的事件類型及描述請參見Github公開事件類型。
前提條件
已創建云服務器ECS實例并綁定彈性公網IP,用于提取GitHub API中的實時事件數據,詳情請參見創建方式導航和綁定和解綁彈性公網IP。
已開通對象存儲OSS并在ECS中安裝ossutil工具,用于存儲GH Archive提供的JSON數據文件,詳情請參見開通OSS服務和安裝ossutil。
已開通大數據計算服務MaxCompute并創建Project,詳情請參見創建MaxCompute項目。
已開通大數據開發治理平臺DataWorks并創建工作空間,用于創建離線調度任務,詳情請參見創建工作空間。
已開通日志服務SLS并創建Project和Logstore,用于將ECS提取到的數據作為日志進行收集,詳情請參見快速入門。
已開通實時計算Flink實例,用于將SLS收集的日志數據實時寫入Hologres,詳情請參見開通實時計算Flink版。
已開通實時數倉Hologres,詳情請參見購買Hologres。
離線數據倉庫建設(小時級更新)
通過ECS下載原始數據文件并上傳至OSS
ECS用例用于下載GH Archive提供的JSON數據文件。
對于歷史數據可通過
wget
命令下載,例如wget https://data.gharchive.org/{2012..2022}-{01..12}-{01..31}-{0..23}.json.gz
下載2012年到2022年每個小時的數據。對于未來每小時產生的新數據,可以通過如下步驟設置小時級定時任務下載。
說明請確保已在ECS中安裝ossutil工具,詳情請參見安裝ossutil。建議您直接下載ossutil安裝包上傳ECS實例,通過
yum install unzip
安裝unzip解壓軟件,直接解壓ossutil,移動至/usr/bin/
目錄下即可。請確保已在ECS相同地域下創建OSS Bucket,您可以使用自定義OSS Bucket名稱。本示例對應的OSS Bucket名稱為
githubevents
。本示例ECS文件下載目錄為
/opt/hourlydata/gh_data
,您可以自定義其他目錄。
使用如下命令在
/opt/hourlydata
目錄下,新建名稱為download_code.sh
的文件。cd /opt/hourlydata vim download_code.sh
在文件內輸入
i
后進入編輯模式,添加如下示例腳本命令。d=$(TZ=UTC date --date='1 hour ago' '+%Y-%m-%d-%-H') h=$(TZ=UTC date --date='1 hour ago' '+%Y-%m-%d-%H') url=https://data.gharchive.org/$z68uejxpaoma.json.gz echo ${url} #將數據下載至./gh_data/目錄下,您可以自定義其他目錄。 wget ${url} -P ./gh_data/ #切換至gh_data目錄下 cd gh_data #解壓下載數據為json文件 gzip -d $z68uejxpaoma.json echo $z68uejxpaoma.json #切換根目錄。 cd /root #使用ossutil工具上傳數據至OSS #在名稱為githubevents的OSS Bucket下創建hr=${h}的目錄 ossutil mkdir oss://githubevents/hr=${h} #將/hourlydata/gh_data(文件存儲目錄可以自定義其他地方)目錄下的數據上傳至OSS。 ossutil cp -r /opt/hourlydata/gh_data oss://githubevents/hr=${h} -u echo oss uploaded successfully! rm -rf /opt/hourlydata/gh_data/$z68uejxpaoma.json echo ecs deleted!
按鍵盤Esc鍵,輸入
:wq
并回車以保存并關閉文件。使用如下命令設置每小時的第10分鐘執行
download_code.sh
腳本文件。#1 執行以下指令,按鍵盤I鍵進入編輯狀態。 crontab -e #2 添加以下指令,完成后按鍵盤Esc鍵,輸入:wq退出。 10 * * * * cd /opt/hourlydata && sh download_code.sh > download.log
執行后每個小時的第10分鐘會下載前一個小時的JSON文件,在ECS解壓后上傳至OSS中(路徑為
oss://githubevents
)。為了之后每次只讀取前一個小時的文件,在上傳文件時對每個文件建立一個名稱為‘hr=%Y-%M-%D-%H’
的目錄作為分區,之后每次寫入數據只讀取最新分區下的文件。
通過外部表將OSS數據導入MaxCompute
請在MaxCompute客戶端或DataWorks中的ODPS SQL節點執行如下命令,詳情請參見使用本地客戶端(odpscmd)連接或開發ODPS SQL任務。
創建用于轉換OSS中存儲的JSON文件的外部表
githubevents
:CREATE EXTERNAL TABLE IF NOT EXISTS githubevents ( col STRING ) PARTITIONED BY ( hr STRING ) STORED AS textfile LOCATION 'oss://oss-cn-hangzhou-internal.aliyuncs.com/githubevents/' ;
MaxCompute中創建OSS外部表詳情請參見創建OSS外部表。
創建用于存儲數據的事實表
dwd_github_events_odps
,其DDL如下:CREATE TABLE IF NOT EXISTS dwd_github_events_odps ( id BIGINT COMMENT '事件ID' ,actor_id BIGINT COMMENT '事件發起人ID' ,actor_login STRING COMMENT '事件發起人登錄名' ,repo_id BIGINT COMMENT 'repoID' ,repo_name STRING COMMENT 'repo全名:owner/Repository_name' ,org_id BIGINT COMMENT 'repo所屬組織ID' ,org_login STRING COMMENT 'repo所屬組織名稱' ,`type` STRING COMMENT '事件類型' ,created_at DATETIME COMMENT '事件發生時間' ,action STRING COMMENT '事件行為' ,iss_or_pr_id BIGINT COMMENT 'issue/pull_request ID' ,number BIGINT COMMENT 'issue/pull_request 序號' ,comment_id BIGINT COMMENT 'comment(評論) ID' ,commit_id STRING COMMENT 'commit(提交記錄) ID' ,member_id BIGINT COMMENT '成員ID' ,rev_or_push_or_rel_id BIGINT COMMENT 'review/push/release ID' ,ref STRING COMMENT '創建/刪除的資源名稱' ,ref_type STRING COMMENT '創建/刪除的資源類型' ,state STRING COMMENT 'issue/pull_request/pull_request_review的狀態' ,author_association STRING COMMENT 'actor與repo之間的關系' ,language STRING COMMENT '請求合并代碼的語言' ,merged BOOLEAN COMMENT '是否接受合并' ,merged_at DATETIME COMMENT '代碼合并時間' ,additions BIGINT COMMENT '代碼增加行數' ,deletions BIGINT COMMENT '代碼減少行數' ,changed_files BIGINT COMMENT 'pull request 改變文件數量' ,push_size BIGINT COMMENT '提交數量' ,push_distinct_size BIGINT COMMENT '不同的提交數量' ,hr STRING COMMENT '事件發生所在小時,如00點23分,hr=00' ,`month` STRING COMMENT '事件發生所在月,如2015年10月,month=2015-10' ,`year` STRING COMMENT '事件發生所在年,如2015年,year=2015' ) PARTITIONED BY ( ds STRING COMMENT '事件發生所在日,ds=yyyy-mm-dd' );
將JSON數據解析寫入事實表。
使用如下命令引入分區并進行JSON解析寫入
dwd_github_events_odps
表中:msck repair table githubevents add partitions; set odps.sql.hive.compatible = true; set odps.sql.split.hive.bridge = true; INSERT into TABLE dwd_github_events_odps PARTITION(ds) SELECT CAST(GET_JSON_OBJECT(col,'$.id') AS BIGINT ) AS id ,CAST(GET_JSON_OBJECT(col,'$.actor.id')AS BIGINT) AS actor_id ,GET_JSON_OBJECT(col,'$.actor.login') AS actor_login ,CAST(GET_JSON_OBJECT(col,'$.repo.id')AS BIGINT) AS repo_id ,GET_JSON_OBJECT(col,'$.repo.name') AS repo_name ,CAST(GET_JSON_OBJECT(col,'$.org.id')AS BIGINT) AS org_id ,GET_JSON_OBJECT(col,'$.org.login') AS org_login ,GET_JSON_OBJECT(col,'$.type') as type ,to_date(GET_JSON_OBJECT(col,'$.created_at'), 'yyyy-mm-ddThh:mi:ssZ') AS created_at ,GET_JSON_OBJECT(col,'$.payload.action') AS action ,case WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.id')AS BIGINT) WHEN GET_JSON_OBJECT(col,'$.type')="IssuesEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.issue.id')AS BIGINT) END AS iss_or_pr_id ,case WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.number')AS BIGINT) WHEN GET_JSON_OBJECT(col,'$.type')="IssuesEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.issue.number')AS BIGINT) ELSE CAST(GET_JSON_OBJECT(col,'$.payload.number')AS BIGINT) END AS number ,CAST(GET_JSON_OBJECT(col,'$.payload.comment.id')AS BIGINT) AS comment_id ,GET_JSON_OBJECT(col,'$.payload.comment.commit_id') AS commit_id ,CAST(GET_JSON_OBJECT(col,'$.payload.member.id')AS BIGINT) AS member_id ,case WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestReviewEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.review.id')AS BIGINT) WHEN GET_JSON_OBJECT(col,'$.type')="PushEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.push_id')AS BIGINT) WHEN GET_JSON_OBJECT(col,'$.type')="ReleaseEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.release.id')AS BIGINT) END AS rev_or_push_or_rel_id ,GET_JSON_OBJECT(col,'$.payload.ref') AS ref ,GET_JSON_OBJECT(col,'$.payload.ref_type') AS ref_type ,case WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestEvent" THEN GET_JSON_OBJECT(col,'$.payload.pull_request.state') WHEN GET_JSON_OBJECT(col,'$.type')="IssuesEvent" THEN GET_JSON_OBJECT(col,'$.payload.issue.state') WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestReviewEvent" THEN GET_JSON_OBJECT(col,'$.payload.review.state') END AS state ,case WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestEvent" THEN GET_JSON_OBJECT(col,'$.payload.pull_request.author_association') WHEN GET_JSON_OBJECT(col,'$.type')="IssuesEvent" THEN GET_JSON_OBJECT(col,'$.payload.issue.author_association') WHEN GET_JSON_OBJECT(col,'$.type')="IssueCommentEvent" THEN GET_JSON_OBJECT(col,'$.payload.comment.author_association') WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestReviewEvent" THEN GET_JSON_OBJECT(col,'$.payload.review.author_association') END AS author_association ,GET_JSON_OBJECT(col,'$.payload.pull_request.base.repo.language') AS language ,CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.merged') AS BOOLEAN) AS merged ,to_date(GET_JSON_OBJECT(col,'$.payload.pull_request.merged_at'), 'yyyy-mm-ddThh:mi:ssZ') AS merged_at ,CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.additions')AS BIGINT) AS additions ,CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.deletions')AS BIGINT) AS deletions ,CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.changed_files')AS BIGINT) AS changed_files ,CAST(GET_JSON_OBJECT(col,'$.payload.size')AS BIGINT) AS push_size ,CAST(GET_JSON_OBJECT(col,'$.payload.distinct_size')AS BIGINT) AS push_distinct_size ,SUBSTR(GET_JSON_OBJECT(col,'$.created_at'),12,2) as hr ,REPLACE(SUBSTR(GET_JSON_OBJECT(col,'$.created_at'),1,7),'/','-') as month ,SUBSTR(GET_JSON_OBJECT(col,'$.created_at'),1,4) as year ,REPLACE(SUBSTR(GET_JSON_OBJECT(col,'$.created_at'),1,10),'/','-') as ds from githubevents where hr = cast(to_char(dateadd(getdate(),-9,'hh'), 'yyyy-mm-dd-hh') as string);
查詢數據。
使用如下命令查詢
dwd_github_events_odps
表數據:SET odps.sql.allow.fullscan=true; SELECT * FROM dwd_github_events_odps where ds = '2023-03-31' limit 10;
示例返回結果如下:
實時數據倉庫建設
通過ECS獲取實時數據
ECS實例用于從GitHub API中提取實時事件數據。本文僅以如下腳本為例,展示一種通過GitHub API采集實時數據的方法。
該腳本每次運行會執行1分鐘,采集這段時間內API提供的實時事件數據,并以JSON格式存儲每個事件數據。
該腳本不保證采集到全部的實時事件數據。
持續從GitHub API中采集數據需要提供Accept和Authorization。其中Accept為固定值,Authorization需要填寫從GitHub中申請的訪問令牌。訪問令牌的創建方法請參見此處。
使用如下命令在
/opt/realtime
目錄下新建名稱為download_realtime_data.py
的文件。cd /opt/realtime vim download_realtime_data.py
在文件內輸入
i
后進入編輯模式,添加如下示例內容。#!python import requests import json import sys import time # 獲取API URL def get_next_link(resp): resp_link = resp.headers['link'] link = '' for l in resp_link.split(', '): link = l.split('; ')[0][1:-1] rel = l.split('; ')[1] if rel == 'rel="next"': return link return None # 采集API中一頁的數據 def download(link, fname): # 定義GitHub API的Accept和Authorization headers = {"Accept": "application/vnd.github+json","Authorization": "<Bearer> <github_api_token>"} resp = requests.get(link, headers=headers) if int(resp.status_code) != 200: return None with open(fname, 'a') as f: for j in resp.json(): f.write(json.dumps(j)) f.write('\n') print('downloaded {} events to {}'.format(len(resp.json()), fname)) return resp # 采集API中多頁的數據 def download_all_data(fname): link = 'https://api.github.com/events?per_page=100&page=1' while True: resp = download(link, fname) if resp is None: break link = get_next_link(resp) if link is None: break # 定義當前時間 def get_current_ms(): return round(time.time()*1000) # 定義腳本每次執行時長1分鐘 def main(fname): current_ms = get_current_ms() while get_current_ms() - current_ms < 60*1000: download_all_data(fname) time.sleep(0.1) # 執行腳本 if __name__ == '__main__': if len(sys.argv) < 2: print('usage: python {} <log_file>'.format(sys.argv[0])) exit(0) main(sys.argv[1])
按Esc鍵,輸入
:wq
并回車以保存并關閉文件。創建
run_py.sh
文件用于執行download_realtime_data.py
并將每次執行采集到的數據分別存儲,內容如下。python /opt/realtime/download_realtime_data.py /opt/realtime/gh_realtime_data/$(date '+%Y-%m-%d-%H:%M:%S').json
創建
delete_log.sh
文件用于刪除歷史數據,內容如下。d=$(TZ=UTC date --date='2 day ago' '+%Y-%m-%d') rm -f /opt/realtime/gh_realtime_data/*$z68uejxpaoma*.json
使用如下命令每分鐘采集一次GitHub數據,每天刪除一次歷史數據。
#1 執行以下指令,按鍵盤I鍵進入編輯狀態。 crontab -e #2 添加以下指令,完成后按鍵盤Esc鍵,輸入:wq退出。 * * * * * bash /opt/realtime/run_py.sh 1 1 * * * bash /opt/realtime/delete_log.sh
通過SLS采集ECS數據
SLS用于將ECS中提取到的實時事件數據作為日志進行收集。
SLS支持通過Logtail采集ECS上的日志。由于本文涉及的數據為JSON格式,因此可以使用Logtail的JSON模式快速采集ECS中的增量JSON日志,采集方法請參見使用JSON模式采集日志。其中本文定義SLS對原始數據的頂層鍵值對進行解析。
Logtail配置的日志路徑參數本示例設置為/opt/realtime/gh_realtime_data/**/*.json
。
配置完成后,SLS即可持續完成對ECS中增量事件數據的采集。采集到的數據情況示例如下圖。
通過Flink實時寫入SLS數據至Hologres
Flink用于將SLS采集的日志數據實時寫入Hologres。通過在Flink中使用SLS源表、Hologres結果表,即可實現數據從SLS到Hologres的實時寫入,詳情請參見從SLS日志服務導入。
創建Hologres內部表。
本文創建的內部表中只保留了原始JSON數據的部分鍵值,并將事件
id
、日期ds
設為主鍵,將事件id
設為Distribution Key,將日期ds
設為分區鍵,將事件發生時間created_at
設為event_time_column。您可以根據實際查詢需求,為其他字段創建索引,以提升查詢效率。索引介紹請參見CREATE TABLE。本次示例建表DDL如下。DROP TABLE IF EXISTS gh_realtime_data; BEGIN; CREATE TABLE gh_realtime_data ( id bigint, actor_id bigint, actor_login text, repo_id bigint, repo_name text, org_id bigint, org_login text, type text, created_at timestamp with time zone NOT NULL, action text, iss_or_pr_id bigint, number bigint, comment_id bigint, commit_id text, member_id bigint, rev_or_push_or_rel_id bigint, ref text, ref_type text, state text, author_association text, language text, merged boolean, merged_at timestamp with time zone, additions bigint, deletions bigint, changed_files bigint, push_size bigint, push_distinct_size bigint, hr text, month text, year text, ds text, PRIMARY KEY (id,ds) ) PARTITION BY LIST (ds); CALL set_table_property('public.gh_realtime_data', 'distribution_key', 'id'); CALL set_table_property('public.gh_realtime_data', 'event_time_column', 'created_at'); CALL set_table_property('public.gh_realtime_data', 'clustering_key', 'created_at'); COMMENT ON COLUMN public.gh_realtime_data.id IS '事件ID'; COMMENT ON COLUMN public.gh_realtime_data.actor_id IS '事件發起人ID'; COMMENT ON COLUMN public.gh_realtime_data.actor_login IS '事件發起人登錄名'; COMMENT ON COLUMN public.gh_realtime_data.repo_id IS 'repoID'; COMMENT ON COLUMN public.gh_realtime_data.repo_name IS 'repo名稱'; COMMENT ON COLUMN public.gh_realtime_data.org_id IS 'repo所屬組織ID'; COMMENT ON COLUMN public.gh_realtime_data.org_login IS 'repo所屬組織名稱'; COMMENT ON COLUMN public.gh_realtime_data.type IS '事件類型'; COMMENT ON COLUMN public.gh_realtime_data.created_at IS '事件發生時間'; COMMENT ON COLUMN public.gh_realtime_data.action IS '事件行為'; COMMENT ON COLUMN public.gh_realtime_data.iss_or_pr_id IS 'issue/pull_request ID'; COMMENT ON COLUMN public.gh_realtime_data.number IS 'issue/pull_request 序號'; COMMENT ON COLUMN public.gh_realtime_data.comment_id IS 'comment(評論)ID'; COMMENT ON COLUMN public.gh_realtime_data.commit_id IS '提交記錄ID'; COMMENT ON COLUMN public.gh_realtime_data.member_id IS '成員ID'; COMMENT ON COLUMN public.gh_realtime_data.rev_or_push_or_rel_id IS 'review/push/release ID'; COMMENT ON COLUMN public.gh_realtime_data.ref IS '創建/刪除的資源名稱'; COMMENT ON COLUMN public.gh_realtime_data.ref_type IS '創建/刪除的資源類型'; COMMENT ON COLUMN public.gh_realtime_data.state IS 'issue/pull_request/pull_request_review的狀態'; COMMENT ON COLUMN public.gh_realtime_data.author_association IS 'actor與repo之間的關系'; COMMENT ON COLUMN public.gh_realtime_data.language IS '編程語言'; COMMENT ON COLUMN public.gh_realtime_data.merged IS '是否接受合并'; COMMENT ON COLUMN public.gh_realtime_data.merged_at IS '代碼合并時間'; COMMENT ON COLUMN public.gh_realtime_data.additions IS '代碼增加行數'; COMMENT ON COLUMN public.gh_realtime_data.deletions IS '代碼減少行數'; COMMENT ON COLUMN public.gh_realtime_data.changed_files IS 'pull request 改變文件數量'; COMMENT ON COLUMN public.gh_realtime_data.push_size IS '提交數量'; COMMENT ON COLUMN public.gh_realtime_data.push_distinct_size IS '不同的提交數量'; COMMENT ON COLUMN public.gh_realtime_data.hr IS '事件發生所在小時,如00點23分,hr=00'; COMMENT ON COLUMN public.gh_realtime_data.month IS '事件發生所在月,如2015年10月,month=2015-10'; COMMENT ON COLUMN public.gh_realtime_data.year IS '事件發生所在年,如2015年,year=2015'; COMMENT ON COLUMN public.gh_realtime_data.ds IS '事件發生所在日,ds=yyyy-mm-dd'; COMMIT;
通過Flink實時寫入數據。
通過Flink對SLS的數據進一步解析并實時寫入到Hologres中。在Flink中使用如下語句對寫入的數據進行過濾,丟棄事件ID、事件發生時間(
created_at
)為空的臟數據,并且只保留近期發生的事件數據。CREATE TEMPORARY TABLE sls_input ( actor varchar, created_at varchar, id bigint, org varchar, payload varchar, public varchar, repo varchar, type varchar ) WITH ( 'connector' = 'sls', 'endpoint' = '<endpoint>',--sls私域endpoint 'accessid' = '<accesskey id>',--賬號access id 'accesskey' = '<accesskey secret>',--賬號access key 'project' = '<project name>',--sls的project名 'logstore' = '<logstore name>'--sls的LogStore名稱 'starttime' = '2023-04-06 00:00:00',--sls數據采集開始時間 ); CREATE TEMPORARY TABLE hologres_sink ( id bigint, actor_id bigint, actor_login string, repo_id bigint, repo_name string, org_id bigint, org_login string, type string, created_at timestamp, action string, iss_or_pr_id bigint, number bigint, comment_id bigint, commit_id string, member_id bigint, rev_or_push_or_rel_id bigint, `ref` string, ref_type string, state string, author_association string, `language` string, merged boolean, merged_at timestamp, additions bigint, deletions bigint, changed_files bigint, push_size bigint, push_distinct_size bigint, hr string, `month` string, `year` string, ds string ) WITH ( 'connector' = 'hologres', 'dbname' = '<hologres dbname>', --Hologres的數據庫名稱 'tablename' = '<hologres tablename>', --Hologres用于接收數據的表名稱 'username' = '<accesskey id>', --當前阿里云賬號的AccessKey ID 'password' = '<accesskey secret>', --當前阿里云賬號的AccessKey Secret 'endpoint' = '<endpoint>', --當前Hologres實例VPC網絡的Endpoint 'jdbcretrycount' = '1', --連接故障時的重試次數 'partitionrouter' = 'true', --是否寫入分區表 'createparttable' = 'true', --是否自動創建分區 'mutatetype' = 'insertorignore' --數據寫入模式 ); INSERT INTO hologres_sink SELECT id ,CAST(JSON_VALUE(actor, '$.id') AS bigint) AS actor_id ,JSON_VALUE(actor, '$.login') AS actor_login ,CAST(JSON_VALUE(repo, '$.id') AS bigint) AS repo_id ,JSON_VALUE(repo, '$.name') AS repo_name ,CAST(JSON_VALUE(org, '$.id') AS bigint) AS org_id ,JSON_VALUE(org, '$.login') AS org_login ,type ,TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC') AS created_at ,JSON_VALUE(payload, '$.action') AS action ,CASE WHEN type='PullRequestEvent' THEN CAST(JSON_VALUE(payload, '$.pull_request.id') AS bigint) WHEN type='IssuesEvent' THEN CAST(JSON_VALUE(payload, '$.issue.id') AS bigint) END AS iss_or_pr_id ,CASE WHEN type='PullRequestEvent' THEN CAST(JSON_VALUE(payload, '$.pull_request.number') AS bigint) WHEN type='IssuesEvent' THEN CAST(JSON_VALUE(payload, '$.issue.number') AS bigint) ELSE CAST(JSON_VALUE(payload, '$.number') AS bigint) END AS number ,CAST(JSON_VALUE(payload, '$.comment.id') AS bigint) AS comment_id ,JSON_VALUE(payload, '$.comment.commit_id') AS commit_id ,CAST(JSON_VALUE(payload, '$.member.id') AS bigint) AS member_id ,CASE WHEN type='PullRequestReviewEvent' THEN CAST(JSON_VALUE(payload, '$.review.id') AS bigint) WHEN type='PushEvent' THEN CAST(JSON_VALUE(payload, '$.push_id') AS bigint) WHEN type='ReleaseEvent' THEN CAST(JSON_VALUE(payload, '$.release.id') AS bigint) END AS rev_or_push_or_rel_id ,JSON_VALUE(payload, '$.ref') AS `ref` ,JSON_VALUE(payload, '$.ref_type') AS ref_type ,CASE WHEN type='PullRequestEvent' THEN JSON_VALUE(payload, '$.pull_request.state') WHEN type='IssuesEvent' THEN JSON_VALUE(payload, '$.issue.state') WHEN type='PullRequestReviewEvent' THEN JSON_VALUE(payload, '$.review.state') END AS state ,CASE WHEN type='PullRequestEvent' THEN JSON_VALUE(payload, '$.pull_request.author_association') WHEN type='IssuesEvent' THEN JSON_VALUE(payload, '$.issue.author_association') WHEN type='IssueCommentEvent' THEN JSON_VALUE(payload, '$.comment.author_association') WHEN type='PullRequestReviewEvent' THEN JSON_VALUE(payload, '$.review.author_association') END AS author_association ,JSON_VALUE(payload, '$.pull_request.base.repo.language') AS `language` ,CAST(JSON_VALUE(payload, '$.pull_request.merged') AS boolean) AS merged ,TO_TIMESTAMP_TZ(replace(JSON_VALUE(payload, '$.pull_request.merged_at'),'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC') AS merged_at ,CAST(JSON_VALUE(payload, '$.pull_request.additions') AS bigint) AS additions ,CAST(JSON_VALUE(payload, '$.pull_request.deletions') AS bigint) AS deletions ,CAST(JSON_VALUE(payload, '$.pull_request.changed_files') AS bigint) AS changed_files ,CAST(JSON_VALUE(payload, '$.size') AS bigint) AS push_size ,CAST(JSON_VALUE(payload, '$.distinct_size') AS bigint) AS push_distinct_size ,SUBSTRING(TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC'),12,2) as hr ,REPLACE(SUBSTRING(TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC'),1,7),'/','-') as `month` ,SUBSTRING(TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC'),1,4) as `year` ,SUBSTRING(TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC'),1,10) as ds FROM sls_input WHERE id IS NOT NULL AND created_at IS NOT NULL AND to_date(replace(created_at,'T',' ')) >= date_add(CURRENT_DATE, -1);
參數說明請參見日志服務SLS源表和實時數倉Hologres結果表。
說明由于GitHub原始事件數據采用的時區為UTC、原始數據不帶有時區屬性,Hologres的默認時區為東八區,因此需要在Flink實時寫入Hologres過程中對數據時區進行調整:需要在Flink SQL中對源表數據賦予UTC時區屬性,并在啟動作業時在作業啟動配置頁面的Flink配置區域添加
table.local-time-zone:Asia/Shanghai
語句將Flink系統時區定義為Asia/Shanghai
。查詢數據。
在Hologres中查詢通過Flink寫入Hologres中的SLS數據,后續您可以根據業務需求進行數據開發。
SELECT * FROM public.gh_realtime_data limit 10;
結果示例如下:
使用離線數據修正實時數據
在本文的場景中,實時數據存在遺漏的可能,因此可以使用離線數據對實時數據進行修正。通過如下步驟可以完成對前一日實時數據的修正,您可以根據自身業務需要,調整數據修正的周期。
在Hologres中創建外部表,獲取MaxCompute離線數據。
IMPORT FOREIGN SCHEMA <maxcompute_project_name> LIMIT to ( <foreign_table_name> ) FROM SERVER odps_server INTO public OPTIONS(if_table_exist 'update',if_unsupported_type 'error');
參數說明請參見IMPORT FOREIGN SCHEMA。
通過創建臨時表實現離線數據修正前一日實時數據。
說明Hologres從V2.1.17版本起支持Serverless Computing能力,針對大數據量離線導入、大型ETL作業、外表大數據量查詢等場景,使用Serverless Computing執行該類任務可以直接使用額外的Serverless資源,避免使用實例自身資源,無需為實例預留額外的計算資源,顯著提升實例穩定性、減少OOM概率,且僅需為任務單獨付費。Serverless Computing詳情請參見Serverless Computing概述,Serverless Computing使用方法請參見Serverless Computing使用指南。
-- 清理潛在的臨時表 DROP TABLE IF EXISTS gh_realtime_data_tmp; -- 創建臨時表 SET hg_experimental_enable_create_table_like_properties = ON; CALL HG_CREATE_TABLE_LIKE ('gh_realtime_data_tmp', 'select * from gh_realtime_data'); -- (可選)推薦使用Serverless Computing執行大數據量離線導入和ETL作業 SET hg_computing_resource = 'serverless'; -- 向臨時表插入數據并更新統計信息 INSERT INTO gh_realtime_data_tmp SELECT * FROM <foreign_table_name> WHERE ds = current_date - interval '1 day' ON CONFLICT (id, ds) DO NOTHING; ANALYZE gh_realtime_data_tmp; -- 重置配置,保證非必要的SQL不會使用serverless資源。 RESET hg_computing_resource; -- 已有臨時子表替換原子表 BEGIN; DROP TABLE IF EXISTS "gh_realtime_data_<yesterday_date>"; ALTER TABLE gh_realtime_data_tmp RENAME TO "gh_realtime_data_<yesterday_date>"; ALTER TABLE gh_realtime_data ATTACH PARTITION "gh_realtime_data_<yesterday_date>" FOR VALUES IN ('<yesterday_date>'); COMMIT;
數據分析
針對已獲取到的海量數據,可以進行豐富的數據分析。您可以結合自身業務需要分析的時間范圍,對數據倉庫進行進一步分層設計,以滿足實時數據分析、離線數據分析、實時離線一體化分析等多方面訴求。
如下示例針對上文獲取到的實時數據進行分析,您也可以針對具體代碼倉庫或開發者進行數據分析。
查詢今日公開事件總數。
SELECT count(*) FROM gh_realtime_data WHERE created_at >= date_trunc('day', now());
返回結果示例如下:
count ------ 1006
查詢過去1天最活躍(事件數最多)的幾個項目。
SELECT repo_name, COUNT(*) AS events FROM gh_realtime_data WHERE created_at >= now() - interval '1 day' GROUP BY repo_name ORDER BY events DESC LIMIT 5;
返回結果示例如下:
repo_name events ----------------------------------------+------ leo424y/heysiri.ml 29 arm-on/plan 10 Christoffel-T/fiverr-pat-20230331 9 mate-academy/react_dynamic-list-of-goods 9 openvinotoolkit/openvino 7
查詢過去1天最活躍(事件數最多)的幾位開發者。
SELECT actor_login, COUNT(*) AS events FROM gh_realtime_data WHERE created_at >= now() - interval '1 day' AND actor_login NOT LIKE '%[bot]' GROUP BY actor_login ORDER BY events DESC LIMIT 5;
返回結果示例如下:
actor_login events ------------------+------ direwolf-github 13 arm-on 10 sergii-nosachenko 9 Christoffel-T 9 yangwang201911 7
查詢過去1小時最火編程語言排行。
SELECT language, count(*) total FROM gh_realtime_data WHERE created_at > now() - interval '1 hour' AND language IS NOT NULL GROUP BY language ORDER BY total DESC LIMIT 10;
返回結果示例如下:
language total -----------+---- JavaScript 25 C++ 15 Python 14 TypeScript 13 Java 8 PHP 8
查詢過去1天項目加星數排行。
說明本示例并未考慮用戶取消星標等情況。
SELECT repo_id, repo_name, COUNT(actor_login) total FROM gh_realtime_data WHERE type = 'WatchEvent' AND created_at > now() - interval '1 day' GROUP BY repo_id, repo_name ORDER BY total DESC LIMIT 10;
返回結果示例如下:
repo_id repo_name total ---------+----------------------------------+----- 618058471 facebookresearch/segment-anything 4 619959033 nomic-ai/gpt4all 1 97249406 denysdovhan/wtfjs 1 9791525 digininja/DVWA 1 168118422 aylei/interview 1 343520006 joehillen/sysz 1 162279822 agalwood/Motrix 1 577723410 huggingface/swift-coreml-diffusers 1 609539715 e2b-dev/e2b 1 254839429 maniackk/KKCallStack 1
查詢今日用戶和項目日活。
SELECT uniq (actor_id) actor_num, uniq (repo_id) repo_num FROM gh_realtime_data WHERE created_at > date_trunc('day', now());
返回結果示例如下:
actor_num repo_num ---------+-------- 743 816