基于GitHub公開事件數(shù)據(jù)集的離線實(shí)時一體化實(shí)踐
本文為您介紹如何基于GitHub實(shí)時事件數(shù)據(jù)通過MaxCompute構(gòu)建離線數(shù)倉、通過Flink和Hologres構(gòu)建實(shí)時數(shù)倉,然后通過Hologres和MaxCompute分別進(jìn)行實(shí)時與離線數(shù)據(jù)分析,從而實(shí)現(xiàn)實(shí)時離線一體化解決方案。
背景信息
隨著社會數(shù)字化發(fā)展,企業(yè)對數(shù)據(jù)時效性的需求越來越強(qiáng)烈。除傳統(tǒng)的面向海量數(shù)據(jù)加工場景設(shè)計的離線場景外,大量業(yè)務(wù)需要解決面向?qū)崟r加工、實(shí)時存儲、實(shí)時分析的實(shí)時場景問題,為了應(yīng)對這樣的情形,提出了離線實(shí)時一體化的概念。
實(shí)時離線一體化是指將實(shí)時數(shù)據(jù)和離線數(shù)據(jù)在同一平臺上管理和處理的技術(shù)。它能夠?qū)崿F(xiàn)實(shí)時數(shù)據(jù)處理和離線數(shù)據(jù)分析的無縫銜接,從而提高數(shù)據(jù)分析效率和精度。其優(yōu)勢在于:
提高數(shù)據(jù)處理效率:將實(shí)時數(shù)據(jù)和離線數(shù)據(jù)整合在同一平臺上,大大提高了數(shù)據(jù)處理效率,降低數(shù)據(jù)傳輸和轉(zhuǎn)換成本。
提高數(shù)據(jù)分析精度:將實(shí)時數(shù)據(jù)和離線數(shù)據(jù)進(jìn)行混合分析,從而提高數(shù)據(jù)分析精度和準(zhǔn)確性。
降低系統(tǒng)復(fù)雜度:減少數(shù)據(jù)管理和處理的復(fù)雜度,使數(shù)據(jù)管理和處理更加簡單和高效。
提高數(shù)據(jù)應(yīng)用價值:更加充分地發(fā)揮數(shù)據(jù)的應(yīng)用價值,為企業(yè)提供更好的決策支持。
阿里云在此方向上進(jìn)行了諸多方案設(shè)計,推出了化繁為簡的實(shí)時離線一體化數(shù)倉,通過大數(shù)據(jù)計算服務(wù)MaxCompute和實(shí)時數(shù)倉Hologres分別對應(yīng)上述的離線與實(shí)時場景,同時匹配Flink的實(shí)時加工能力,共同構(gòu)成阿里云一體化數(shù)倉的核心引擎組件。
方案架構(gòu)
使用MaxCompute和Hologres對GitHub公開事件數(shù)據(jù)集進(jìn)行實(shí)時離線一體化實(shí)踐的完整鏈路圖如下所示。
其中ECS將GitHub實(shí)時與離線事件數(shù)據(jù)收集匯總后作為數(shù)據(jù)源,分別進(jìn)入實(shí)時鏈路與離線鏈路,最后兩條鏈路數(shù)據(jù)匯總到Hologres,統(tǒng)一對外提供服務(wù)。
實(shí)時鏈路:通過Flink對日志服務(wù)中的數(shù)據(jù)實(shí)時加工并寫入Hologres。Flink是強(qiáng)大的流式計算引擎,Hologres支持?jǐn)?shù)據(jù)實(shí)時寫入與更新、寫入即可查,二者原生集成,支持高吞吐、低延時、有模型、高質(zhì)量的實(shí)時數(shù)倉開發(fā),最終滿足業(yè)務(wù)洞察實(shí)時性需求,如最新事件提取、熱點(diǎn)事件分析等場景。
離線鏈路:通過MaxCompute對海量離線數(shù)據(jù)進(jìn)行處理并歸檔。阿里云OSS(Object Storage Service)是阿里云提供的云存儲服務(wù),可以用于存儲各類數(shù)據(jù),本次實(shí)踐引用的原始數(shù)據(jù)是JSON格式,OSS可以提供方便、安全、低成本、可靠的存儲能力。MaxCompute是適用于數(shù)據(jù)分析場景的企業(yè)級SaaS(Software as a Service)模式云數(shù)據(jù)倉庫,可以直接通過外表的方式讀取并解析OSS中的半結(jié)構(gòu)化數(shù)據(jù),將高價值可用數(shù)據(jù)集成至MaxCompute內(nèi)部存儲,然后結(jié)合DataWorks進(jìn)行數(shù)據(jù)開發(fā),生成離線數(shù)據(jù)倉庫。
Hologres與MaxCompute底層無縫打通,因此可以通過Hologres對MaxCompute海量歷史數(shù)據(jù)進(jìn)行加速查詢分析,滿足業(yè)務(wù)對歷史數(shù)據(jù)的低頻高性能查詢需求。還可以輕松實(shí)現(xiàn)通過離線鏈路對實(shí)時數(shù)據(jù)的修正,解決實(shí)時鏈路中可能出現(xiàn)的數(shù)據(jù)遺漏等問題。
該方案優(yōu)勢如下:
離線鏈路穩(wěn)定高效:支持?jǐn)?shù)據(jù)小時級寫入更新,可以批量處理大規(guī)模數(shù)據(jù),進(jìn)行復(fù)雜的計算和分析,降低計算成本,提高數(shù)據(jù)處理效率。
實(shí)時鏈路成熟:支持實(shí)時寫入、實(shí)時事件計算、實(shí)時分析,實(shí)時鏈路簡化,數(shù)據(jù)秒級響應(yīng)。
統(tǒng)一存儲與服務(wù):均由Hologres對外提供服務(wù),數(shù)據(jù)集中存儲,對外接口一致(OLAP、KeyValue統(tǒng)一為SQL接口)。
實(shí)時離線融合:數(shù)據(jù)冗余少、移動少,數(shù)據(jù)可修正。
通過一站式開發(fā),最終實(shí)現(xiàn)數(shù)據(jù)秒級響應(yīng),全鏈路狀態(tài)可見,架構(gòu)組件少、依賴少,運(yùn)維成本、人工成本均有效降低。
業(yè)務(wù)與數(shù)據(jù)認(rèn)知
大量開發(fā)人員在GitHub上進(jìn)行開源項(xiàng)目的開發(fā)工作,并在項(xiàng)目的開發(fā)過程中產(chǎn)生海量事件。GitHub會記錄每次事件的類型及詳情、開發(fā)者、代碼倉庫等信息,并開放其中的公開事件,包括加星標(biāo)、提交代碼等,具體事件類型請參見Webhook events and payloads。
GitHub通過OpenAPI公布其公開事件,API僅開放5分鐘前的實(shí)時事件,詳情請參見Events。該API可用于獲取實(shí)時數(shù)據(jù)。
GH Archive項(xiàng)目則是將GitHub公開事件按小時進(jìn)行匯總,并允許開發(fā)人員訪問,項(xiàng)目具體信息請參見GH Archive。該項(xiàng)目可用于獲取離線數(shù)據(jù)。
GitHub業(yè)務(wù)認(rèn)知
Github的業(yè)務(wù)核心為管理代碼與互動交流,主要涉及三個一級實(shí)體對象:開發(fā)者(Developer)、代碼倉庫(Repository)和組織(Organization)。
在本次Github公開事件數(shù)據(jù)分析中,事件
作為一個實(shí)體對象被存儲和記錄下來。
原始公開事件數(shù)據(jù)認(rèn)知
某原始事件JSON編碼數(shù)據(jù)示例如下:
{
"id": "19541192931",
"type": "WatchEvent",
"actor":
{
"id": 23286640,
"login": "herekeo",
"display_login": "herekeo",
"gravatar_id": "",
"url": "https://api.github.com/users/herekeo",
"avatar_url": "https://avatars.githubusercontent.com/u/23286640?"
},
"repo":
{
"id": 52760178,
"name": "crazyguitar/pysheeet",
"url": "https://api.github.com/repos/crazyguitar/pysheeet"
},
"payload":
{
"action": "started"
},
"public": true,
"created_at": "2022-01-01T00:03:04Z"
}
本分析實(shí)踐涉及15類公開事件(不包含未出現(xiàn)及不再記錄的事件),詳細(xì)的事件類型及描述請參見Github公開事件類型。
前提條件
已創(chuàng)建云服務(wù)器ECS實(shí)例并綁定彈性公網(wǎng)IP,用于提取GitHub API中的實(shí)時事件數(shù)據(jù),詳情請參見創(chuàng)建方式導(dǎo)航和綁定和解綁彈性公網(wǎng)IP。
已開通對象存儲OSS并在ECS中安裝ossutil工具,用于存儲GH Archive提供的JSON數(shù)據(jù)文件,詳情請參見開通OSS服務(wù)和安裝ossutil。
已開通大數(shù)據(jù)計算服務(wù)MaxCompute并創(chuàng)建Project,詳情請參見創(chuàng)建MaxCompute項(xiàng)目。
已開通大數(shù)據(jù)開發(fā)治理平臺DataWorks并創(chuàng)建工作空間,用于創(chuàng)建離線調(diào)度任務(wù),詳情請參見創(chuàng)建工作空間。
已開通日志服務(wù)SLS并創(chuàng)建Project和Logstore,用于將ECS提取到的數(shù)據(jù)作為日志進(jìn)行收集,詳情請參見快速入門。
已開通實(shí)時計算Flink實(shí)例,用于將SLS收集的日志數(shù)據(jù)實(shí)時寫入Hologres,詳情請參見開通Flink全托管。
已開通實(shí)時數(shù)倉Hologres,詳情請參見購買Hologres。
離線數(shù)據(jù)倉庫建設(shè)(小時級更新)
通過ECS下載原始數(shù)據(jù)文件并上傳至OSS
ECS用例用于下載GH Archive提供的JSON數(shù)據(jù)文件,對于歷史數(shù)據(jù)可通過wget
命令下載,例如wget https://data.gharchive.org/{2012..2022}-{01..12}-{01..31}-{0..23}.json.gz
下載2012年到2022年每個小時的數(shù)據(jù);對于未來每小時產(chǎn)生的新數(shù)據(jù),可以通過如下步驟設(shè)置小時級定時任務(wù)下載。
使用如下命令創(chuàng)建名稱為
download_code.sh
的文件:vim download_code.sh
在文件內(nèi)輸入
i
后進(jìn)入編輯模式,添加如下示例腳本命令:說明請確保已在ECS中安裝ossutil工具,詳情請參見安裝ossutil。本示例對應(yīng)的OSS Bucket名稱為
githubevents
。d=$(TZ=UTC date --date='1 hour ago' '+%Y-%m-%d-%-H') h=$(TZ=UTC date --date='1 hour ago' '+%Y-%m-%d-%H') url=https://data.gharchive.org/$z68uejxpaoma.json.gz echo ${url} wget ${url} -P ./gh_data/ cd gh_data gzip -d $z68uejxpaoma.json echo $z68uejxpaoma.json #使用ossutil工具上傳數(shù)據(jù)至OSS cd /root ./ossutil64 mkdir oss://githubevents/hr=${h} ./ossutil64 cp -r /hourlydata/gh_data oss://githubevents/hr=${h} -u echo oss uploaded successfully! rm -rf /hourlydata/gh_data/$z68uejxpaoma.json echo ecs deleted!
按Esc鍵,輸入
:wq
并回車以保存并關(guān)閉文件。使用如下命令設(shè)置每小時的第10分鐘執(zhí)行
download_code.sh
腳本文件。crontab -e 10 * * * * cd /hourlydata && sh download_code.sh > download.log
執(zhí)行后每個小時的第10分鐘會下載前一個小時的JSON文件,在ECS解壓后上傳至OSS中(路徑為
oss://githubevents
)。為了之后每次只讀取前一個小時的文件,在上傳文件時對每個文件建一個名稱為‘hr=%Y-%M-%D-%H’
的目錄作為分區(qū),之后每次寫入數(shù)據(jù)只讀取最新分區(qū)下的文件。
通過外部表將OSS數(shù)據(jù)導(dǎo)入MaxCompute
請在MaxCompute客戶端或DataWorks中的ODPS SQL節(jié)點(diǎn)執(zhí)行如下命令,詳情請參見使用客戶端(odpscmd)連接或創(chuàng)建ODPS SQL節(jié)點(diǎn)。
創(chuàng)建用于轉(zhuǎn)換OSS中存儲的JSON文件的外部表
githubevents
:CREATE EXTERNAL TABLE IF NOT EXISTS githubevents ( col STRING ) PARTITIONED BY ( hr STRING ) STORED AS textfile LOCATION 'oss://oss-cn-hangzhou-internal.aliyuncs.com/githubevents/' ;
MaxCompute中創(chuàng)建OSS外部表詳情請參見創(chuàng)建OSS外部表。
創(chuàng)建用于存儲數(shù)據(jù)的事實(shí)表
dwd_github_events_odps
,其DDL如下:CREATE TABLE IF NOT EXISTS dwd_github_events_odps ( id BIGINT COMMENT '事件ID' ,actor_id BIGINT COMMENT '事件發(fā)起人ID' ,actor_login STRING COMMENT '事件發(fā)起人登錄名' ,repo_id BIGINT COMMENT 'repoID' ,repo_name STRING COMMENT 'repo全名:owner/Repository_name' ,org_id BIGINT COMMENT 'repo所屬組織ID' ,org_login STRING COMMENT 'repo所屬組織名稱' ,`type` STRING COMMENT '事件類型' ,created_at DATETIME COMMENT '事件發(fā)生時間' ,action STRING COMMENT '事件行為' ,iss_or_pr_id BIGINT COMMENT 'issue/pull_request ID' ,number BIGINT COMMENT 'issue/pull_request 序號' ,comment_id BIGINT COMMENT 'comment(評論) ID' ,commit_id STRING COMMENT 'commit(提交記錄) ID' ,member_id BIGINT COMMENT '成員ID' ,rev_or_push_or_rel_id BIGINT COMMENT 'review/push/release ID' ,ref STRING COMMENT '創(chuàng)建/刪除的資源名稱' ,ref_type STRING COMMENT '創(chuàng)建/刪除的資源類型' ,state STRING COMMENT 'issue/pull_request/pull_request_review的狀態(tài)' ,author_association STRING COMMENT 'actor與repo之間的關(guān)系' ,language STRING COMMENT '請求合并代碼的語言' ,merged BOOLEAN COMMENT '是否接受合并' ,merged_at DATETIME COMMENT '代碼合并時間' ,additions BIGINT COMMENT '代碼增加行數(shù)' ,deletions BIGINT COMMENT '代碼減少行數(shù)' ,changed_files BIGINT COMMENT 'pull request 改變文件數(shù)量' ,push_size BIGINT COMMENT '提交數(shù)量' ,push_distinct_size BIGINT COMMENT '不同的提交數(shù)量' ,hr STRING COMMENT '事件發(fā)生所在小時,如00點(diǎn)23分,hr=00' ,`month` STRING COMMENT '事件發(fā)生所在月,如2015年10月,month=2015-10' ,`year` STRING COMMENT '事件發(fā)生所在年,如2015年,year=2015' ) PARTITIONED BY ( ds STRING COMMENT '事件發(fā)生所在日,ds=yyyy-mm-dd' ) ;
將JSON數(shù)據(jù)解析寫入事實(shí)表。
使用如下命令引入分區(qū)并進(jìn)行JSON解析寫入
dwd_github_events_odps
表中:msck repair table githubevents add partitions; set odps.sql.hive.compatible = true; set odps.sql.split.hive.bridge = true; INSERT into TABLE dwd_github_events_odps PARTITION(ds) SELECT CAST(GET_JSON_OBJECT(col,'$.id') AS BIGINT ) AS id ,CAST(GET_JSON_OBJECT(col,'$.actor.id')AS BIGINT) AS actor_id ,GET_JSON_OBJECT(col,'$.actor.login') AS actor_login ,CAST(GET_JSON_OBJECT(col,'$.repo.id')AS BIGINT) AS repo_id ,GET_JSON_OBJECT(col,'$.repo.name') AS repo_name ,CAST(GET_JSON_OBJECT(col,'$.org.id')AS BIGINT) AS org_id ,GET_JSON_OBJECT(col,'$.org.login') AS org_login ,GET_JSON_OBJECT(col,'$.type') as type ,to_date(GET_JSON_OBJECT(col,'$.created_at'), 'yyyy-mm-ddThh:mi:ssZ') AS created_at ,GET_JSON_OBJECT(col,'$.payload.action') AS action ,case WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.id')AS BIGINT) WHEN GET_JSON_OBJECT(col,'$.type')="IssuesEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.issue.id')AS BIGINT) END AS iss_or_pr_id ,case WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.number')AS BIGINT) WHEN GET_JSON_OBJECT(col,'$.type')="IssuesEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.issue.number')AS BIGINT) ELSE CAST(GET_JSON_OBJECT(col,'$.payload.number')AS BIGINT) END AS number ,CAST(GET_JSON_OBJECT(col,'$.payload.comment.id')AS BIGINT) AS comment_id ,GET_JSON_OBJECT(col,'$.payload.comment.commit_id') AS commit_id ,CAST(GET_JSON_OBJECT(col,'$.payload.member.id')AS BIGINT) AS member_id ,case WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestReviewEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.review.id')AS BIGINT) WHEN GET_JSON_OBJECT(col,'$.type')="PushEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.push_id')AS BIGINT) WHEN GET_JSON_OBJECT(col,'$.type')="ReleaseEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.release.id')AS BIGINT) END AS rev_or_push_or_rel_id ,GET_JSON_OBJECT(col,'$.payload.ref') AS ref ,GET_JSON_OBJECT(col,'$.payload.ref_type') AS ref_type ,case WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestEvent" THEN GET_JSON_OBJECT(col,'$.payload.pull_request.state') WHEN GET_JSON_OBJECT(col,'$.type')="IssuesEvent" THEN GET_JSON_OBJECT(col,'$.payload.issue.state') WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestReviewEvent" THEN GET_JSON_OBJECT(col,'$.payload.review.state') END AS state ,case WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestEvent" THEN GET_JSON_OBJECT(col,'$.payload.pull_request.author_association') WHEN GET_JSON_OBJECT(col,'$.type')="IssuesEvent" THEN GET_JSON_OBJECT(col,'$.payload.issue.author_association') WHEN GET_JSON_OBJECT(col,'$.type')="IssueCommentEvent" THEN GET_JSON_OBJECT(col,'$.payload.comment.author_association') WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestReviewEvent" THEN GET_JSON_OBJECT(col,'$.payload.review.author_association') END AS author_association ,GET_JSON_OBJECT(col,'$.payload.pull_request.base.repo.language') AS language ,CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.merged') AS BOOLEAN) AS merged ,to_date(GET_JSON_OBJECT(col,'$.payload.pull_request.merged_at'), 'yyyy-mm-ddThh:mi:ssZ') AS merged_at ,CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.additions')AS BIGINT) AS additions ,CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.deletions')AS BIGINT) AS deletions ,CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.changed_files')AS BIGINT) AS changed_files ,CAST(GET_JSON_OBJECT(col,'$.payload.size')AS BIGINT) AS push_size ,CAST(GET_JSON_OBJECT(col,'$.payload.distinct_size')AS BIGINT) AS push_distinct_size ,SUBSTR(GET_JSON_OBJECT(col,'$.created_at'),12,2) as hr ,REPLACE(SUBSTR(GET_JSON_OBJECT(col,'$.created_at'),1,7),'/','-') as month ,SUBSTR(GET_JSON_OBJECT(col,'$.created_at'),1,4) as year ,REPLACE(SUBSTR(GET_JSON_OBJECT(col,'$.created_at'),1,10),'/','-') as ds from githubevents where hr = cast(to_char(dateadd(getdate(),-9,'hh'), 'yyyy-mm-dd-hh') as string);
查詢數(shù)據(jù)。
使用如下命令查詢
dwd_github_events_odps
表數(shù)據(jù):set odps.sql.allow.fullscan=true; select * from dwd_github_events_odps limit 10;
示例返回結(jié)果如下:
實(shí)時數(shù)據(jù)倉庫建設(shè)
通過ECS獲取實(shí)時數(shù)據(jù)
ECS實(shí)例用于從GitHub API中提取實(shí)時事件數(shù)據(jù)。本文僅以如下腳本為例,展示一種通過GitHub API采集實(shí)時數(shù)據(jù)的方法。
該腳本每次運(yùn)行會執(zhí)行1分鐘,采集這段時間內(nèi)API提供的實(shí)時事件數(shù)據(jù),并以JSON格式存儲每個事件數(shù)據(jù)。
該腳本不保證采集到全部的實(shí)時事件數(shù)據(jù)。
持續(xù)從GitHub API中采集數(shù)據(jù)需要提供Accept和Authorization。其中Accept為固定值,Authorization需要填寫從GitHub中申請的訪問令牌。訪問令牌的創(chuàng)建方法請參見此處。
使用如下命令創(chuàng)建名稱為
download_realtime_data.py
的文件。vim download_realtime_data.py
在文件內(nèi)輸入
i
后進(jìn)入編輯模式,添加如下示例內(nèi)容。#!python import requests import json import sys import time # 獲取API URL def get_next_link(resp): resp_link = resp.headers['link'] link = '' for l in resp_link.split(', '): link = l.split('; ')[0][1:-1] rel = l.split('; ')[1] if rel == 'rel="next"': return link return None # 采集API中一頁的數(shù)據(jù) def download(link, fname): # 定義GitHub API的Accept和Authorization headers = {"Accept": "application/vnd.github+json"[, "Authorization": "Bearer <github_api_token>"]} resp = requests.get(link, headers=headers) if int(resp.status_code) != 200: return None with open(fname, 'a') as f: for j in resp.json(): f.write(json.dumps(j)) f.write('\n') print('downloaded {} events to {}'.format(len(resp.json()), fname)) return resp # 采集API中多頁的數(shù)據(jù) def download_all_data(fname): link = 'https://api.github.com/events?per_page=100&page=1' while True: resp = download(link, fname) if resp is None: break link = get_next_link(resp) if link is None: break # 定義當(dāng)前時間 def get_current_ms(): return round(time.time()*1000) # 定義腳本每次執(zhí)行時長1分鐘 def main(fname): current_ms = get_current_ms() while get_current_ms() - current_ms < 60*1000: download_all_data(fname) time.sleep(0.1) # 執(zhí)行腳本 if __name__ == '__main__': if len(sys.argv) < 2: print('usage: python {} <log_file>'.format(sys.argv[0])) exit(0) main(sys.argv[1])
按Esc鍵,輸入
:wq
并回車以保存并關(guān)閉文件。創(chuàng)建
run_py.sh
文件用于執(zhí)行download_realtime_data.py
并將每次執(zhí)行采集到的數(shù)據(jù)分別存儲,內(nèi)容如下。python /root/download_realtime_data.py /root/gh_realtime_data/$(date '+%Y-%m-%d-%H:%M:%S').json
創(chuàng)建
delete_log.sh
文件用于刪除歷史數(shù)據(jù),內(nèi)容如下。d=$(TZ=UTC date --date='2 day ago' '+%Y-%m-%d') rm -f /root/gh_realtime_data/*$z68uejxpaoma*.json
使用如下命令每分鐘采集一次GitHub數(shù)據(jù)、每天刪除一次歷史數(shù)據(jù)。
crontab -e * * * * * bash /root/run_py.sh 1 1 * * * bash /root/delete_log.sh
通過SLS采集ECS數(shù)據(jù)
SLS用于將ECS中提取到的實(shí)時事件數(shù)據(jù)作為日志進(jìn)行收集。
SLS支持通過Logtail采集ECS上的日志。由于本文涉及的數(shù)據(jù)為JSON格式,因此可以使用Logtail的JSON模式快速采集ECS中的增量JSON日志,采集方法請參見使用JSON模式采集日志。其中本文定義SLS對原始數(shù)據(jù)的頂層鍵值對進(jìn)行解析。
Logtail配置的日志路徑參數(shù)本示例設(shè)置為/root/gh_realtime_data/**/*.json
。
配置完成后,SLS即可持續(xù)完成對ECS中增量事件數(shù)據(jù)的采集。采集到的數(shù)據(jù)情況示例如下圖。
通過Flink實(shí)時寫入SLS數(shù)據(jù)至Hologres
Flink用于將SLS采集的日志數(shù)據(jù)實(shí)時寫入Hologres。通過在Flink中使用SLS源表、Hologres結(jié)果表,即可實(shí)現(xiàn)數(shù)據(jù)從SLS到Hologres的實(shí)時寫入,詳情請參見日志服務(wù)數(shù)據(jù)同步至Hologres。
創(chuàng)建Hologres內(nèi)部表。
本文創(chuàng)建的內(nèi)部表中只保留了原始JSON數(shù)據(jù)的部分鍵值,并將事件
id
、日期ds
設(shè)為主鍵,將事件id
設(shè)為Distribution Key,將日期ds
設(shè)為分區(qū)鍵,將事件發(fā)生時間created_at
設(shè)為event_time_column。您可以根據(jù)實(shí)際查詢需求,為其他字段創(chuàng)建索引,以提升查詢效率。索引介紹請參見建表概述。本次示例建表DDL如下。DROP TABLE IF EXISTS gh_realtime_data; BEGIN; CREATE TABLE gh_realtime_data ( id bigint, actor_id bigint, actor_login text, repo_id bigint, repo_name text, org_id bigint, org_login text, type text, created_at timestamp with time zone NOT NULL, action text, iss_or_pr_id bigint, number bigint, comment_id bigint, commit_id text, member_id bigint, rev_or_push_or_rel_id bigint, ref text, ref_type text, state text, author_association text, language text, merged boolean, merged_at timestamp with time zone, additions bigint, deletions bigint, changed_files bigint, push_size bigint, push_distinct_size bigint, hr text, month text, year text, ds text, PRIMARY KEY (id,ds) ) PARTITION BY LIST (ds); CALL set_table_property('public.gh_realtime_data', 'distribution_key', 'id'); CALL set_table_property('public.gh_realtime_data', 'event_time_column', 'created_at'); CALL set_table_property('public.gh_realtime_data', 'clustering_key', 'created_at'); COMMENT ON COLUMN public.gh_realtime_data.id IS '事件ID'; COMMENT ON COLUMN public.gh_realtime_data.actor_id IS '事件發(fā)起人ID'; COMMENT ON COLUMN public.gh_realtime_data.actor_login IS '事件發(fā)起人登錄名'; COMMENT ON COLUMN public.gh_realtime_data.repo_id IS 'repoID'; COMMENT ON COLUMN public.gh_realtime_data.repo_name IS 'repo名稱'; COMMENT ON COLUMN public.gh_realtime_data.org_id IS 'repo所屬組織ID'; COMMENT ON COLUMN public.gh_realtime_data.org_login IS 'repo所屬組織名稱'; COMMENT ON COLUMN public.gh_realtime_data.type IS '事件類型'; COMMENT ON COLUMN public.gh_realtime_data.created_at IS '事件發(fā)生時間'; COMMENT ON COLUMN public.gh_realtime_data.action IS '事件行為'; COMMENT ON COLUMN public.gh_realtime_data.iss_or_pr_id IS 'issue/pull_request ID'; COMMENT ON COLUMN public.gh_realtime_data.number IS 'issue/pull_request 序號'; COMMENT ON COLUMN public.gh_realtime_data.comment_id IS 'comment(評論)ID'; COMMENT ON COLUMN public.gh_realtime_data.commit_id IS '提交記錄ID'; COMMENT ON COLUMN public.gh_realtime_data.member_id IS '成員ID'; COMMENT ON COLUMN public.gh_realtime_data.rev_or_push_or_rel_id IS 'review/push/release ID'; COMMENT ON COLUMN public.gh_realtime_data.ref IS '創(chuàng)建/刪除的資源名稱'; COMMENT ON COLUMN public.gh_realtime_data.ref_type IS '創(chuàng)建/刪除的資源類型'; COMMENT ON COLUMN public.gh_realtime_data.state IS 'issue/pull_request/pull_request_review的狀態(tài)'; COMMENT ON COLUMN public.gh_realtime_data.author_association IS 'actor與repo之間的關(guān)系'; COMMENT ON COLUMN public.gh_realtime_data.language IS '編程語言'; COMMENT ON COLUMN public.gh_realtime_data.merged IS '是否接受合并'; COMMENT ON COLUMN public.gh_realtime_data.merged_at IS '代碼合并時間'; COMMENT ON COLUMN public.gh_realtime_data.additions IS '代碼增加行數(shù)'; COMMENT ON COLUMN public.gh_realtime_data.deletions IS '代碼減少行數(shù)'; COMMENT ON COLUMN public.gh_realtime_data.changed_files IS 'pull request 改變文件數(shù)量'; COMMENT ON COLUMN public.gh_realtime_data.push_size IS '提交數(shù)量'; COMMENT ON COLUMN public.gh_realtime_data.push_distinct_size IS '不同的提交數(shù)量'; COMMENT ON COLUMN public.gh_realtime_data.hr IS '事件發(fā)生所在小時,如00點(diǎn)23分,hr=00'; COMMENT ON COLUMN public.gh_realtime_data.month IS '事件發(fā)生所在月,如2015年10月,month=2015-10'; COMMENT ON COLUMN public.gh_realtime_data.year IS '事件發(fā)生所在年,如2015年,year=2015'; COMMENT ON COLUMN public.gh_realtime_data.ds IS '事件發(fā)生所在日,ds=yyyy-mm-dd'; COMMIT;
通過Flink實(shí)時寫入數(shù)據(jù)。
通過Flink對SLS的數(shù)據(jù)進(jìn)一步解析并實(shí)時寫入到Hologres中。在Flink中使用如下語句對寫入的數(shù)據(jù)進(jìn)行過濾,丟棄事件ID、事件發(fā)生時間(
created_at
)為空的臟數(shù)據(jù),并且只保留近期發(fā)生的事件數(shù)據(jù)。CREATE TEMPORARY TABLE sls_input ( actor varchar, created_at varchar, id bigint, org varchar, payload varchar, public varchar, repo varchar, type varchar ) WITH ( 'connector' = 'sls', 'endpoint' = '<endpoint>',--sls私域endpoint 'accessid' = '<accesskey id>',--賬號access id 'accesskey' = '<accesskey secret>',--賬號access key 'project' = '<project name>',--sls的project名 'logstore' = '<logstore name>'--sls的LogStore名稱 ); CREATE TEMPORARY TABLE hologres_sink ( id bigint, actor_id bigint, actor_login string, repo_id bigint, repo_name string, org_id bigint, org_login string, type string, created_at timestamp, action string, iss_or_pr_id bigint, number bigint, comment_id bigint, commit_id string, member_id bigint, rev_or_push_or_rel_id bigint, `ref` string, ref_type string, state string, author_association string, `language` string, merged boolean, merged_at timestamp, additions bigint, deletions bigint, changed_files bigint, push_size bigint, push_distinct_size bigint, hr string, `month` string, `year` string, ds string ) with ( 'connector' = 'hologres', 'dbname' = '<hologres dbname>', --Hologres的數(shù)據(jù)庫名稱 'tablename' = '<hologres tablename>', --Hologres用于接收數(shù)據(jù)的表名稱 'username' = '<accesskey id>', --當(dāng)前阿里云賬號的AccessKey ID 'password' = '<accesskey secret>', --當(dāng)前阿里云賬號的AccessKey Secret 'endpoint' = '<endpoint>', --當(dāng)前Hologres實(shí)例VPC網(wǎng)絡(luò)的Endpoint 'jdbcretrycount' = '1', --連接故障時的重試次數(shù) 'partitionrouter' = 'true', --是否寫入分區(qū)表 'createparttable' = 'true', --是否自動創(chuàng)建分區(qū) 'mutatetype' = 'insertorignore' --數(shù)據(jù)寫入模式 ); INSERT INTO hologres_sink SELECT id ,CAST(JSON_VALUE(actor, '$.id') AS bigint) AS actor_id ,JSON_VALUE(actor, '$.login') AS actor_login ,CAST(JSON_VALUE(repo, '$.id') AS bigint) AS repo_id ,JSON_VALUE(repo, '$.name') AS repo_name ,CAST(JSON_VALUE(org, '$.id') AS bigint) AS org_id ,JSON_VALUE(org, '$.login') AS org_login ,type ,TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC') AS created_at ,JSON_VALUE(payload, '$.action') AS action ,CASE WHEN type='PullRequestEvent' THEN CAST(JSON_VALUE(payload, '$.pull_request.id') AS bigint) WHEN type='IssuesEvent' THEN CAST(JSON_VALUE(payload, '$.issue.id') AS bigint) END AS iss_or_pr_id ,CASE WHEN type='PullRequestEvent' THEN CAST(JSON_VALUE(payload, '$.pull_request.number') AS bigint) WHEN type='IssuesEvent' THEN CAST(JSON_VALUE(payload, '$.issue.number') AS bigint) ELSE CAST(JSON_VALUE(payload, '$.number') AS bigint) END AS number ,CAST(JSON_VALUE(payload, '$.comment.id') AS bigint) AS comment_id ,JSON_VALUE(payload, '$.comment.commit_id') AS commit_id ,CAST(JSON_VALUE(payload, '$.member.id') AS bigint) AS member_id ,CASE WHEN type='PullRequestReviewEvent' THEN CAST(JSON_VALUE(payload, '$.review.id') AS bigint) WHEN type='PushEvent' THEN CAST(JSON_VALUE(payload, '$.push_id') AS bigint) WHEN type='ReleaseEvent' THEN CAST(JSON_VALUE(payload, '$.release.id') AS bigint) END AS rev_or_push_or_rel_id ,JSON_VALUE(payload, '$.ref') AS `ref` ,JSON_VALUE(payload, '$.ref_type') AS ref_type ,CASE WHEN type='PullRequestEvent' THEN JSON_VALUE(payload, '$.pull_request.state') WHEN type='IssuesEvent' THEN JSON_VALUE(payload, '$.issue.state') WHEN type='PullRequestReviewEvent' THEN JSON_VALUE(payload, '$.review.state') END AS state ,CASE WHEN type='PullRequestEvent' THEN JSON_VALUE(payload, '$.pull_request.author_association') WHEN type='IssuesEvent' THEN JSON_VALUE(payload, '$.issue.author_association') WHEN type='IssueCommentEvent' THEN JSON_VALUE(payload, '$.comment.author_association') WHEN type='PullRequestReviewEvent' THEN JSON_VALUE(payload, '$.review.author_association') END AS author_association ,JSON_VALUE(payload, '$.pull_request.base.repo.language') AS `language` ,CAST(JSON_VALUE(payload, '$.pull_request.merged') AS boolean) AS merged ,TO_TIMESTAMP_TZ(replace(JSON_VALUE(payload, '$.pull_request.merged_at'),'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC') AS merged_at ,CAST(JSON_VALUE(payload, '$.pull_request.additions') AS bigint) AS additions ,CAST(JSON_VALUE(payload, '$.pull_request.deletions') AS bigint) AS deletions ,CAST(JSON_VALUE(payload, '$.pull_request.changed_files') AS bigint) AS changed_files ,CAST(JSON_VALUE(payload, '$.size') AS bigint) AS push_size ,CAST(JSON_VALUE(payload, '$.distinct_size') AS bigint) AS push_distinct_size ,SUBSTRING(TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC'),12,2) as hr ,REPLACE(SUBSTRING(TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC'),1,7),'/','-') as `month` ,SUBSTRING(TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC'),1,4) as `year` ,SUBSTRING(TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC'),1,10) as ds FROM sls_input WHERE id IS NOT NULL AND created_at IS NOT NULL AND to_date(replace(created_at,'T',' ')) >= date_add(CURRENT_DATE, -1) ;
參數(shù)說明請參見日志服務(wù)SLS源表和實(shí)時數(shù)倉Hologres結(jié)果表。
說明由于GitHub原始事件數(shù)據(jù)采用的時區(qū)為UTC、原始數(shù)據(jù)不帶有時區(qū)屬性,Hologres的默認(rèn)時區(qū)為東八區(qū),因此需要在Flink實(shí)時寫入Hologres過程中對數(shù)據(jù)時區(qū)進(jìn)行調(diào)整:需要在Flink SQL中對源表數(shù)據(jù)賦予UTC時區(qū)屬性,并在啟動作業(yè)時在作業(yè)啟動配置頁面的Flink配置區(qū)域添加
table.local-time-zone:Asia/Shanghai
語句將Flink系統(tǒng)時區(qū)定義為Asia/Shanghai
。查詢數(shù)據(jù)。
在Hologres中查詢通過Flink寫入Hologres中的SLS數(shù)據(jù),后續(xù)您可以根據(jù)業(yè)務(wù)需求進(jìn)行數(shù)據(jù)開發(fā)。
SELECT * FROM public.gh_realtime_data limit 10;
結(jié)果示例如下:
使用離線數(shù)據(jù)修正實(shí)時數(shù)據(jù)
在本文的場景中,實(shí)時數(shù)據(jù)存在遺漏的可能,因此可以使用離線數(shù)據(jù)對實(shí)時數(shù)據(jù)進(jìn)行修正。通過如下步驟可以完成對前一日實(shí)時數(shù)據(jù)的修正,您可以根據(jù)自身業(yè)務(wù)需要,調(diào)整數(shù)據(jù)修正的周期。
在Hologres中創(chuàng)建外部表,獲取MaxCompute離線數(shù)據(jù)。
IMPORT FOREIGN SCHEMA <maxcompute_project_name> LIMIT to ( <foreign_table_name> ) FROM SERVER odps_server INTO public OPTIONS(if_table_exist 'update',if_unsupported_type 'error');
參數(shù)說明請參見IMPORT FOREIGN SCHEMA。
通過創(chuàng)建臨時表實(shí)現(xiàn)離線數(shù)據(jù)修正前一日實(shí)時數(shù)據(jù)。
-- 清理潛在的臨時表 DROP TABLE IF EXISTS gh_realtime_data_tmp; -- 創(chuàng)建臨時表 SET hg_experimental_enable_create_table_like_properties = ON; CALL HG_CREATE_TABLE_LIKE ('gh_realtime_data_tmp', 'select * from gh_realtime_data'); -- 向臨時表插入數(shù)據(jù)并更新統(tǒng)計信息 INSERT INTO gh_realtime_data_tmp SELECT * FROM <foreign_table_name> WHERE ds = current_date - interval '1 day' ON CONFLICT (id, ds) DO NOTHING; ANALYZE gh_realtime_data_tmp; -- 已有臨時子表替換原子表 BEGIN; DROP TABLE IF EXISTS "gh_realtime_data_<yesterday_date>"; ALTER TABLE gh_realtime_data_tmp RENAME TO "gh_realtime_data_<yesterday_date>"; ALTER TABLE gh_realtime_data ATTACH PARTITION "gh_realtime_data_<yesterday_date>" FOR VALUES IN ('<yesterday_date>'); COMMIT;
數(shù)據(jù)分析
針對已獲取到的海量數(shù)據(jù),可以進(jìn)行豐富的數(shù)據(jù)分析。您可以結(jié)合自身業(yè)務(wù)需要分析的時間范圍,對數(shù)據(jù)倉庫進(jìn)行進(jìn)一步分層設(shè)計,以滿足實(shí)時數(shù)據(jù)分析、離線數(shù)據(jù)分析、實(shí)時離線一體化分析等多方面訴求。
如下示例針對上文獲取到的實(shí)時數(shù)據(jù)進(jìn)行分析,您也可以針對具體代碼倉庫或開發(fā)者進(jìn)行數(shù)據(jù)分析。
查詢今日公開事件總數(shù)。
SELECT count(*) FROM gh_realtime_data WHERE created_at >= date_trunc('day', now());
返回結(jié)果示例如下:
count ------ 1006
查詢過去1天最活躍(事件數(shù)最多)的幾個項(xiàng)目。
SELECT repo_name, COUNT(*) AS events FROM gh_realtime_data WHERE created_at >= now() - interval '1 day' GROUP BY repo_name ORDER BY events DESC LIMIT 5;
返回結(jié)果示例如下:
repo_name events ----------------------------------------+------ leo424y/heysiri.ml 29 arm-on/plan 10 Christoffel-T/fiverr-pat-20230331 9 mate-academy/react_dynamic-list-of-goods 9 openvinotoolkit/openvino 7
查詢過去1天最活躍(事件數(shù)最多)的幾位開發(fā)者。
SELECT actor_login, COUNT(*) AS events FROM gh_realtime_data WHERE created_at >= now() - interval '1 day' AND actor_login NOT LIKE '%[bot]' GROUP BY actor_login ORDER BY events DESC LIMIT 5;
返回結(jié)果示例如下:
actor_login events ------------------+------ direwolf-github 13 arm-on 10 sergii-nosachenko 9 Christoffel-T 9 yangwang201911 7
查詢過去1小時最火編程語言排行。
SELECT language, count(*) total FROM gh_realtime_data WHERE created_at > now() - interval '1 hour' AND language IS NOT NULL GROUP BY language ORDER BY total DESC LIMIT 10;
返回結(jié)果示例如下:
language total -----------+---- JavaScript 25 C++ 15 Python 14 TypeScript 13 Java 8 PHP 8
查詢過去1天項(xiàng)目加星數(shù)排行。
說明本示例并未考慮用戶取消星標(biāo)等情況。
SELECT repo_id, repo_name, COUNT(actor_login) total FROM gh_realtime_data WHERE type = 'WatchEvent' AND created_at > now() - interval '1 day' GROUP BY repo_id, repo_name ORDER BY total DESC LIMIT 10;
返回結(jié)果示例如下:
repo_id repo_name total ---------+----------------------------------+----- 618058471 facebookresearch/segment-anything 4 619959033 nomic-ai/gpt4all 1 97249406 denysdovhan/wtfjs 1 9791525 digininja/DVWA 1 168118422 aylei/interview 1 343520006 joehillen/sysz 1 162279822 agalwood/Motrix 1 577723410 huggingface/swift-coreml-diffusers 1 609539715 e2b-dev/e2b 1 254839429 maniackk/KKCallStack 1
查詢今日用戶和項(xiàng)目日活。
SELECT uniq (actor_id) actor_num, uniq (repo_id) repo_num FROM gh_realtime_data WHERE created_at > date_trunc('day', now());
返回結(jié)果示例如下:
actor_num repo_num ---------+-------- 743 816