日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

基于GitHub公開事件數(shù)據(jù)集的離線實(shí)時一體化實(shí)踐

本文為您介紹如何基于GitHub實(shí)時事件數(shù)據(jù)通過MaxCompute構(gòu)建離線數(shù)倉、通過Flink和Hologres構(gòu)建實(shí)時數(shù)倉,然后通過Hologres和MaxCompute分別進(jìn)行實(shí)時與離線數(shù)據(jù)分析,從而實(shí)現(xiàn)實(shí)時離線一體化解決方案。

背景信息

隨著社會數(shù)字化發(fā)展,企業(yè)對數(shù)據(jù)時效性的需求越來越強(qiáng)烈。除傳統(tǒng)的面向海量數(shù)據(jù)加工場景設(shè)計的離線場景外,大量業(yè)務(wù)需要解決面向?qū)崟r加工、實(shí)時存儲、實(shí)時分析的實(shí)時場景問題,為了應(yīng)對這樣的情形,提出了離線實(shí)時一體化的概念。

實(shí)時離線一體化是指將實(shí)時數(shù)據(jù)和離線數(shù)據(jù)在同一平臺上管理和處理的技術(shù)。它能夠?qū)崿F(xiàn)實(shí)時數(shù)據(jù)處理和離線數(shù)據(jù)分析的無縫銜接,從而提高數(shù)據(jù)分析效率和精度。其優(yōu)勢在于:

  • 提高數(shù)據(jù)處理效率:將實(shí)時數(shù)據(jù)和離線數(shù)據(jù)整合在同一平臺上,大大提高了數(shù)據(jù)處理效率,降低數(shù)據(jù)傳輸和轉(zhuǎn)換成本。

  • 提高數(shù)據(jù)分析精度:將實(shí)時數(shù)據(jù)和離線數(shù)據(jù)進(jìn)行混合分析,從而提高數(shù)據(jù)分析精度和準(zhǔn)確性。

  • 降低系統(tǒng)復(fù)雜度:減少數(shù)據(jù)管理和處理的復(fù)雜度,使數(shù)據(jù)管理和處理更加簡單和高效。

  • 提高數(shù)據(jù)應(yīng)用價值:更加充分地發(fā)揮數(shù)據(jù)的應(yīng)用價值,為企業(yè)提供更好的決策支持。

阿里云在此方向上進(jìn)行了諸多方案設(shè)計,推出了化繁為簡的實(shí)時離線一體化數(shù)倉,通過大數(shù)據(jù)計算服務(wù)MaxCompute和實(shí)時數(shù)倉Hologres分別對應(yīng)上述的離線與實(shí)時場景,同時匹配Flink的實(shí)時加工能力,共同構(gòu)成阿里云一體化數(shù)倉的核心引擎組件。

方案架構(gòu)

使用MaxCompute和Hologres對GitHub公開事件數(shù)據(jù)集進(jìn)行實(shí)時離線一體化實(shí)踐的完整鏈路圖如下所示。

image

其中ECS將GitHub實(shí)時與離線事件數(shù)據(jù)收集匯總后作為數(shù)據(jù)源,分別進(jìn)入實(shí)時鏈路與離線鏈路,最后兩條鏈路數(shù)據(jù)匯總到Hologres,統(tǒng)一對外提供服務(wù)。

  • 實(shí)時鏈路:通過Flink對日志服務(wù)中的數(shù)據(jù)實(shí)時加工并寫入Hologres。Flink是強(qiáng)大的流式計算引擎,Hologres支持?jǐn)?shù)據(jù)實(shí)時寫入與更新、寫入即可查,二者原生集成,支持高吞吐、低延時、有模型、高質(zhì)量的實(shí)時數(shù)倉開發(fā),最終滿足業(yè)務(wù)洞察實(shí)時性需求,如最新事件提取、熱點(diǎn)事件分析等場景。

  • 離線鏈路:通過MaxCompute對海量離線數(shù)據(jù)進(jìn)行處理并歸檔。阿里云OSS(Object Storage Service)是阿里云提供的云存儲服務(wù),可以用于存儲各類數(shù)據(jù),本次實(shí)踐引用的原始數(shù)據(jù)是JSON格式,OSS可以提供方便、安全、低成本、可靠的存儲能力。MaxCompute是適用于數(shù)據(jù)分析場景的企業(yè)級SaaS(Software as a Service)模式云數(shù)據(jù)倉庫,可以直接通過外表的方式讀取并解析OSS中的半結(jié)構(gòu)化數(shù)據(jù),將高價值可用數(shù)據(jù)集成至MaxCompute內(nèi)部存儲,然后結(jié)合DataWorks進(jìn)行數(shù)據(jù)開發(fā),生成離線數(shù)據(jù)倉庫。

  • Hologres與MaxCompute底層無縫打通,因此可以通過Hologres對MaxCompute海量歷史數(shù)據(jù)進(jìn)行加速查詢分析,滿足業(yè)務(wù)對歷史數(shù)據(jù)的低頻高性能查詢需求。還可以輕松實(shí)現(xiàn)通過離線鏈路對實(shí)時數(shù)據(jù)的修正,解決實(shí)時鏈路中可能出現(xiàn)的數(shù)據(jù)遺漏等問題。

該方案優(yōu)勢如下:

  • 離線鏈路穩(wěn)定高效:支持?jǐn)?shù)據(jù)小時級寫入更新,可以批量處理大規(guī)模數(shù)據(jù),進(jìn)行復(fù)雜的計算和分析,降低計算成本,提高數(shù)據(jù)處理效率。

  • 實(shí)時鏈路成熟:支持實(shí)時寫入、實(shí)時事件計算、實(shí)時分析,實(shí)時鏈路簡化,數(shù)據(jù)秒級響應(yīng)。

  • 統(tǒng)一存儲與服務(wù):均由Hologres對外提供服務(wù),數(shù)據(jù)集中存儲,對外接口一致(OLAP、KeyValue統(tǒng)一為SQL接口)。

  • 實(shí)時離線融合:數(shù)據(jù)冗余少、移動少,數(shù)據(jù)可修正。

通過一站式開發(fā),最終實(shí)現(xiàn)數(shù)據(jù)秒級響應(yīng),全鏈路狀態(tài)可見,架構(gòu)組件少、依賴少,運(yùn)維成本、人工成本均有效降低。

業(yè)務(wù)與數(shù)據(jù)認(rèn)知

大量開發(fā)人員在GitHub上進(jìn)行開源項(xiàng)目的開發(fā)工作,并在項(xiàng)目的開發(fā)過程中產(chǎn)生海量事件。GitHub會記錄每次事件的類型及詳情、開發(fā)者、代碼倉庫等信息,并開放其中的公開事件,包括加星標(biāo)、提交代碼等,具體事件類型請參見Webhook events and payloads。

  • GitHub通過OpenAPI公布其公開事件,API僅開放5分鐘前的實(shí)時事件,詳情請參見Events。該API可用于獲取實(shí)時數(shù)據(jù)。

  • GH Archive項(xiàng)目則是將GitHub公開事件按小時進(jìn)行匯總,并允許開發(fā)人員訪問,項(xiàng)目具體信息請參見GH Archive。該項(xiàng)目可用于獲取離線數(shù)據(jù)。

GitHub業(yè)務(wù)認(rèn)知

Github的業(yè)務(wù)核心為管理代碼與互動交流,主要涉及三個一級實(shí)體對象:開發(fā)者(Developer)、代碼倉庫(Repository)和組織(Organization)。image

在本次Github公開事件數(shù)據(jù)分析中,事件作為一個實(shí)體對象被存儲和記錄下來。

image

原始公開事件數(shù)據(jù)認(rèn)知

某原始事件JSON編碼數(shù)據(jù)示例如下:

{
    "id": "19541192931",
    "type": "WatchEvent",
    "actor":
    {
        "id": 23286640,
        "login": "herekeo",
        "display_login": "herekeo",
        "gravatar_id": "",
        "url": "https://api.github.com/users/herekeo",
        "avatar_url": "https://avatars.githubusercontent.com/u/23286640?"
    },
    "repo":
    {
        "id": 52760178,
        "name": "crazyguitar/pysheeet",
        "url": "https://api.github.com/repos/crazyguitar/pysheeet"
    },
    "payload":
    {
        "action": "started"
    },
    "public": true,
    "created_at": "2022-01-01T00:03:04Z"
}

本分析實(shí)踐涉及15類公開事件(不包含未出現(xiàn)及不再記錄的事件),詳細(xì)的事件類型及描述請參見Github公開事件類型。

前提條件

  • 已創(chuàng)建云服務(wù)器ECS實(shí)例并綁定彈性公網(wǎng)IP,用于提取GitHub API中的實(shí)時事件數(shù)據(jù),詳情請參見創(chuàng)建方式導(dǎo)航綁定和解綁彈性公網(wǎng)IP。

  • 已開通對象存儲OSS并在ECS中安裝ossutil工具,用于存儲GH Archive提供的JSON數(shù)據(jù)文件,詳情請參見開通OSS服務(wù)安裝ossutil。

  • 已開通大數(shù)據(jù)計算服務(wù)MaxCompute并創(chuàng)建Project,詳情請參見創(chuàng)建MaxCompute項(xiàng)目

  • 已開通大數(shù)據(jù)開發(fā)治理平臺DataWorks并創(chuàng)建工作空間,用于創(chuàng)建離線調(diào)度任務(wù),詳情請參見創(chuàng)建工作空間。

  • 已開通日志服務(wù)SLS并創(chuàng)建Project和Logstore,用于將ECS提取到的數(shù)據(jù)作為日志進(jìn)行收集,詳情請參見快速入門

  • 已開通實(shí)時計算Flink實(shí)例,用于將SLS收集的日志數(shù)據(jù)實(shí)時寫入Hologres,詳情請參見開通Flink全托管

  • 已開通實(shí)時數(shù)倉Hologres,詳情請參見購買Hologres

離線數(shù)據(jù)倉庫建設(shè)(小時級更新

通過ECS下載原始數(shù)據(jù)文件并上傳至OSS

ECS用例用于下載GH Archive提供的JSON數(shù)據(jù)文件,對于歷史數(shù)據(jù)可通過wget命令下載,例如wget https://data.gharchive.org/{2012..2022}-{01..12}-{01..31}-{0..23}.json.gz下載2012年到2022年每個小時的數(shù)據(jù);對于未來每小時產(chǎn)生的新數(shù)據(jù),可以通過如下步驟設(shè)置小時級定時任務(wù)下載。

  1. 使用如下命令創(chuàng)建名稱為download_code.sh的文件:

    vim download_code.sh
  2. 在文件內(nèi)輸入i后進(jìn)入編輯模式,添加如下示例腳本命令:

    說明

    請確保已在ECS中安裝ossutil工具,詳情請參見安裝ossutil。本示例對應(yīng)的OSS Bucket名稱為githubevents。

    d=$(TZ=UTC date --date='1 hour ago' '+%Y-%m-%d-%-H')
    h=$(TZ=UTC date --date='1 hour ago' '+%Y-%m-%d-%H')
    url=https://data.gharchive.org/$z68uejxpaoma.json.gz
    echo ${url}
    
    wget ${url} -P ./gh_data/
    cd gh_data
    gzip -d $z68uejxpaoma.json
    
    echo $z68uejxpaoma.json
    #使用ossutil工具上傳數(shù)據(jù)至OSS
    cd /root
    ./ossutil64 mkdir oss://githubevents/hr=${h}
    ./ossutil64 cp -r /hourlydata/gh_data oss://githubevents/hr=${h} -u
    echo oss uploaded successfully!
    
    rm -rf /hourlydata/gh_data/$z68uejxpaoma.json
    echo ecs deleted!
  3. Esc鍵,輸入:wq并回車以保存并關(guān)閉文件。

  4. 使用如下命令設(shè)置每小時的第10分鐘執(zhí)行download_code.sh腳本文件。

    crontab -e
    10 * * * * cd /hourlydata && sh download_code.sh > download.log

    執(zhí)行后每個小時的第10分鐘會下載前一個小時的JSON文件,在ECS解壓后上傳至OSS中(路徑為oss://githubevents)。為了之后每次只讀取前一個小時的文件,在上傳文件時對每個文件建一個名稱為‘hr=%Y-%M-%D-%H’的目錄作為分區(qū),之后每次寫入數(shù)據(jù)只讀取最新分區(qū)下的文件。

通過外部表將OSS數(shù)據(jù)導(dǎo)入MaxCompute

請在MaxCompute客戶端或DataWorks中的ODPS SQL節(jié)點(diǎn)執(zhí)行如下命令,詳情請參見使用客戶端(odpscmd)連接創(chuàng)建ODPS SQL節(jié)點(diǎn)

  1. 創(chuàng)建用于轉(zhuǎn)換OSS中存儲的JSON文件的外部表githubevents

    CREATE EXTERNAL TABLE IF NOT EXISTS githubevents
    (
        col  STRING
    )
    PARTITIONED BY 
    (
        hr   STRING
    )
    STORED AS textfile
    LOCATION 'oss://oss-cn-hangzhou-internal.aliyuncs.com/githubevents/'
    ;

    MaxCompute中創(chuàng)建OSS外部表詳情請參見創(chuàng)建OSS外部表。

  2. 創(chuàng)建用于存儲數(shù)據(jù)的事實(shí)表dwd_github_events_odps,其DDL如下:

    CREATE TABLE IF NOT EXISTS dwd_github_events_odps
    (
        id                     BIGINT COMMENT '事件ID'
        ,actor_id              BIGINT COMMENT '事件發(fā)起人ID'
        ,actor_login           STRING COMMENT '事件發(fā)起人登錄名'
        ,repo_id               BIGINT COMMENT 'repoID'
        ,repo_name             STRING COMMENT 'repo全名:owner/Repository_name'
        ,org_id                BIGINT COMMENT 'repo所屬組織ID'
        ,org_login             STRING COMMENT 'repo所屬組織名稱'
        ,`type`                STRING COMMENT '事件類型'
        ,created_at            DATETIME COMMENT '事件發(fā)生時間'
        ,action                STRING COMMENT '事件行為'
        ,iss_or_pr_id          BIGINT COMMENT 'issue/pull_request ID'
        ,number                BIGINT COMMENT 'issue/pull_request 序號'
        ,comment_id            BIGINT COMMENT 'comment(評論) ID'
        ,commit_id             STRING COMMENT 'commit(提交記錄) ID'
        ,member_id             BIGINT COMMENT '成員ID'
        ,rev_or_push_or_rel_id BIGINT COMMENT 'review/push/release ID'
        ,ref                   STRING COMMENT '創(chuàng)建/刪除的資源名稱'
        ,ref_type              STRING COMMENT '創(chuàng)建/刪除的資源類型'
        ,state                 STRING COMMENT 'issue/pull_request/pull_request_review的狀態(tài)'
        ,author_association    STRING COMMENT 'actor與repo之間的關(guān)系'
        ,language              STRING COMMENT '請求合并代碼的語言'
        ,merged                BOOLEAN COMMENT '是否接受合并'
        ,merged_at             DATETIME COMMENT '代碼合并時間'
        ,additions             BIGINT COMMENT '代碼增加行數(shù)'
        ,deletions             BIGINT COMMENT '代碼減少行數(shù)'
        ,changed_files         BIGINT COMMENT 'pull request 改變文件數(shù)量'
        ,push_size             BIGINT COMMENT '提交數(shù)量'
        ,push_distinct_size    BIGINT COMMENT '不同的提交數(shù)量'
        ,hr                    STRING COMMENT '事件發(fā)生所在小時,如00點(diǎn)23分,hr=00'
        ,`month`               STRING COMMENT '事件發(fā)生所在月,如2015年10月,month=2015-10'
        ,`year`                STRING COMMENT '事件發(fā)生所在年,如2015年,year=2015'
    )
    PARTITIONED BY 
    (
        ds                     STRING COMMENT '事件發(fā)生所在日,ds=yyyy-mm-dd'
    )
    ;
  3. 將JSON數(shù)據(jù)解析寫入事實(shí)表。

    使用如下命令引入分區(qū)并進(jìn)行JSON解析寫入dwd_github_events_odps表中:

    msck repair table githubevents add partitions;
    
    set odps.sql.hive.compatible = true;
    set odps.sql.split.hive.bridge = true;
    INSERT into TABLE dwd_github_events_odps PARTITION(ds)
    SELECT  CAST(GET_JSON_OBJECT(col,'$.id')  AS BIGINT ) AS id
            ,CAST(GET_JSON_OBJECT(col,'$.actor.id')AS BIGINT) AS actor_id
            ,GET_JSON_OBJECT(col,'$.actor.login') AS actor_login
            ,CAST(GET_JSON_OBJECT(col,'$.repo.id')AS BIGINT) AS repo_id
            ,GET_JSON_OBJECT(col,'$.repo.name') AS repo_name
            ,CAST(GET_JSON_OBJECT(col,'$.org.id')AS BIGINT) AS org_id
            ,GET_JSON_OBJECT(col,'$.org.login') AS org_login
            ,GET_JSON_OBJECT(col,'$.type') as type
            ,to_date(GET_JSON_OBJECT(col,'$.created_at'), 'yyyy-mm-ddThh:mi:ssZ') AS created_at
            ,GET_JSON_OBJECT(col,'$.payload.action') AS action
            ,case    WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.id')AS BIGINT) 
                     WHEN GET_JSON_OBJECT(col,'$.type')="IssuesEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.issue.id')AS BIGINT) 
             END AS iss_or_pr_id
            ,case    WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.number')AS BIGINT) 
                     WHEN GET_JSON_OBJECT(col,'$.type')="IssuesEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.issue.number')AS BIGINT) 
                     ELSE CAST(GET_JSON_OBJECT(col,'$.payload.number')AS BIGINT)
             END AS number
            ,CAST(GET_JSON_OBJECT(col,'$.payload.comment.id')AS BIGINT) AS comment_id
            ,GET_JSON_OBJECT(col,'$.payload.comment.commit_id') AS commit_id
            ,CAST(GET_JSON_OBJECT(col,'$.payload.member.id')AS BIGINT) AS member_id
            ,case    WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestReviewEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.review.id')AS BIGINT)
                     WHEN GET_JSON_OBJECT(col,'$.type')="PushEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.push_id')AS BIGINT)
                     WHEN GET_JSON_OBJECT(col,'$.type')="ReleaseEvent" THEN CAST(GET_JSON_OBJECT(col,'$.payload.release.id')AS BIGINT)
             END AS rev_or_push_or_rel_id
            ,GET_JSON_OBJECT(col,'$.payload.ref') AS ref
            ,GET_JSON_OBJECT(col,'$.payload.ref_type') AS ref_type
            ,case    WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestEvent" THEN GET_JSON_OBJECT(col,'$.payload.pull_request.state')
                     WHEN GET_JSON_OBJECT(col,'$.type')="IssuesEvent" THEN GET_JSON_OBJECT(col,'$.payload.issue.state')
                     WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestReviewEvent" THEN GET_JSON_OBJECT(col,'$.payload.review.state') 
             END AS state
            ,case    WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestEvent" THEN GET_JSON_OBJECT(col,'$.payload.pull_request.author_association')
                     WHEN GET_JSON_OBJECT(col,'$.type')="IssuesEvent" THEN GET_JSON_OBJECT(col,'$.payload.issue.author_association')
                     WHEN GET_JSON_OBJECT(col,'$.type')="IssueCommentEvent" THEN GET_JSON_OBJECT(col,'$.payload.comment.author_association')
                     WHEN GET_JSON_OBJECT(col,'$.type')="PullRequestReviewEvent" THEN GET_JSON_OBJECT(col,'$.payload.review.author_association') 
             END AS author_association
            ,GET_JSON_OBJECT(col,'$.payload.pull_request.base.repo.language') AS language
            ,CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.merged') AS BOOLEAN) AS merged
            ,to_date(GET_JSON_OBJECT(col,'$.payload.pull_request.merged_at'), 'yyyy-mm-ddThh:mi:ssZ') AS merged_at
            ,CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.additions')AS BIGINT) AS additions
            ,CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.deletions')AS BIGINT)  AS deletions
            ,CAST(GET_JSON_OBJECT(col,'$.payload.pull_request.changed_files')AS BIGINT) AS changed_files
            ,CAST(GET_JSON_OBJECT(col,'$.payload.size')AS BIGINT)  AS push_size
            ,CAST(GET_JSON_OBJECT(col,'$.payload.distinct_size')AS BIGINT)   AS push_distinct_size
            ,SUBSTR(GET_JSON_OBJECT(col,'$.created_at'),12,2) as hr
            ,REPLACE(SUBSTR(GET_JSON_OBJECT(col,'$.created_at'),1,7),'/','-') as month
            ,SUBSTR(GET_JSON_OBJECT(col,'$.created_at'),1,4) as year
            ,REPLACE(SUBSTR(GET_JSON_OBJECT(col,'$.created_at'),1,10),'/','-') as ds
    from githubevents 
    where hr = cast(to_char(dateadd(getdate(),-9,'hh'), 'yyyy-mm-dd-hh') as string);
  4. 查詢數(shù)據(jù)。

    使用如下命令查詢dwd_github_events_odps表數(shù)據(jù):

    set odps.sql.allow.fullscan=true;
    select * from dwd_github_events_odps limit 10;

    示例返回結(jié)果如下:

    image

實(shí)時數(shù)據(jù)倉庫建設(shè)

通過ECS獲取實(shí)時數(shù)據(jù)

ECS實(shí)例用于從GitHub API中提取實(shí)時事件數(shù)據(jù)。本文僅以如下腳本為例,展示一種通過GitHub API采集實(shí)時數(shù)據(jù)的方法。

說明
  • 該腳本每次運(yùn)行會執(zhí)行1分鐘,采集這段時間內(nèi)API提供的實(shí)時事件數(shù)據(jù),并以JSON格式存儲每個事件數(shù)據(jù)。

  • 該腳本不保證采集到全部的實(shí)時事件數(shù)據(jù)。

  • 持續(xù)從GitHub API中采集數(shù)據(jù)需要提供Accept和Authorization。其中Accept為固定值,Authorization需要填寫從GitHub中申請的訪問令牌。訪問令牌的創(chuàng)建方法請參見此處。

  1. 使用如下命令創(chuàng)建名稱為download_realtime_data.py的文件。

    vim download_realtime_data.py
  2. 在文件內(nèi)輸入i后進(jìn)入編輯模式,添加如下示例內(nèi)容。

    #!python
    
    import requests
    import json
    import sys
    import time
    
    # 獲取API URL
    def get_next_link(resp):
        resp_link = resp.headers['link']
        link = ''
        for l in resp_link.split(', '):
            link = l.split('; ')[0][1:-1]
            rel = l.split('; ')[1]
            if rel == 'rel="next"':
                return link
        return None
    
    # 采集API中一頁的數(shù)據(jù)
    def download(link, fname):
      	# 定義GitHub API的Accept和Authorization
        headers = {"Accept": "application/vnd.github+json"[, "Authorization": "Bearer <github_api_token>"]}
        resp = requests.get(link, headers=headers)
    
        if int(resp.status_code) != 200:
            return None
    
        with open(fname, 'a') as f:
            for j in resp.json():
                f.write(json.dumps(j))
                f.write('\n')
    
        print('downloaded {} events to {}'.format(len(resp.json()), fname))
        return resp
    
    # 采集API中多頁的數(shù)據(jù)
    def download_all_data(fname):
        link = 'https://api.github.com/events?per_page=100&page=1'
        while True:
            resp = download(link, fname)
            if resp is None:
                break
            link = get_next_link(resp)
            if link is None:
                break
    
    # 定義當(dāng)前時間
    def get_current_ms():
        return round(time.time()*1000)
    
    # 定義腳本每次執(zhí)行時長1分鐘
    def main(fname):
        current_ms = get_current_ms()
        while get_current_ms() - current_ms < 60*1000:
            download_all_data(fname)
            time.sleep(0.1)
    
    # 執(zhí)行腳本
    if __name__ == '__main__':
        if len(sys.argv) < 2:
            print('usage: python {} <log_file>'.format(sys.argv[0]))
            exit(0)
        main(sys.argv[1])
  3. Esc鍵,輸入:wq并回車以保存并關(guān)閉文件。

  4. 創(chuàng)建run_py.sh文件用于執(zhí)行download_realtime_data.py并將每次執(zhí)行采集到的數(shù)據(jù)分別存儲,內(nèi)容如下。

    python /root/download_realtime_data.py /root/gh_realtime_data/$(date '+%Y-%m-%d-%H:%M:%S').json
  5. 創(chuàng)建delete_log.sh文件用于刪除歷史數(shù)據(jù),內(nèi)容如下。

    d=$(TZ=UTC date --date='2 day ago' '+%Y-%m-%d')
    rm -f /root/gh_realtime_data/*$z68uejxpaoma*.json
  6. 使用如下命令每分鐘采集一次GitHub數(shù)據(jù)、每天刪除一次歷史數(shù)據(jù)。

    crontab -e
    * * * * * bash /root/run_py.sh
    1 1 * * * bash /root/delete_log.sh

通過SLS采集ECS數(shù)據(jù)

SLS用于將ECS中提取到的實(shí)時事件數(shù)據(jù)作為日志進(jìn)行收集。

SLS支持通過Logtail采集ECS上的日志。由于本文涉及的數(shù)據(jù)為JSON格式,因此可以使用Logtail的JSON模式快速采集ECS中的增量JSON日志,采集方法請參見使用JSON模式采集日志。其中本文定義SLS對原始數(shù)據(jù)的頂層鍵值對進(jìn)行解析。

說明

Logtail配置的日志路徑參數(shù)本示例設(shè)置為/root/gh_realtime_data/**/*.json

配置完成后,SLS即可持續(xù)完成對ECS中增量事件數(shù)據(jù)的采集。采集到的數(shù)據(jù)情況示例如下圖。image

通過Flink實(shí)時寫入SLS數(shù)據(jù)至Hologres

Flink用于將SLS采集的日志數(shù)據(jù)實(shí)時寫入Hologres。通過在Flink中使用SLS源表、Hologres結(jié)果表,即可實(shí)現(xiàn)數(shù)據(jù)從SLS到Hologres的實(shí)時寫入,詳情請參見日志服務(wù)數(shù)據(jù)同步至Hologres

  1. 創(chuàng)建Hologres內(nèi)部表。

    本文創(chuàng)建的內(nèi)部表中只保留了原始JSON數(shù)據(jù)的部分鍵值,并將事件id、日期ds設(shè)為主鍵,將事件id設(shè)為Distribution Key,將日期ds設(shè)為分區(qū)鍵,將事件發(fā)生時間created_at設(shè)為event_time_column。您可以根據(jù)實(shí)際查詢需求,為其他字段創(chuàng)建索引,以提升查詢效率。索引介紹請參見建表概述。本次示例建表DDL如下。

    DROP TABLE IF EXISTS gh_realtime_data;
    
    BEGIN;
    
    CREATE TABLE gh_realtime_data (
        id bigint,
        actor_id bigint,
        actor_login text,
        repo_id bigint,
        repo_name text,
        org_id bigint,
        org_login text,
        type text,
        created_at timestamp with time zone NOT NULL,
        action text,
        iss_or_pr_id bigint,
        number bigint,
        comment_id bigint,
        commit_id text,
        member_id bigint,
        rev_or_push_or_rel_id bigint,
        ref text,
        ref_type text,
        state text,
        author_association text,
        language text,
        merged boolean,
        merged_at timestamp with time zone,
        additions bigint,
        deletions bigint,
        changed_files bigint,
        push_size bigint,
        push_distinct_size bigint,
        hr text,
        month text,
        year text,
        ds text,
        PRIMARY KEY (id,ds)
    )
    PARTITION BY LIST (ds);
    CALL set_table_property('public.gh_realtime_data', 'distribution_key', 'id');
    CALL set_table_property('public.gh_realtime_data', 'event_time_column', 'created_at');
    CALL set_table_property('public.gh_realtime_data', 'clustering_key', 'created_at');
    
    COMMENT ON COLUMN public.gh_realtime_data.id IS '事件ID';
    COMMENT ON COLUMN public.gh_realtime_data.actor_id IS '事件發(fā)起人ID';
    COMMENT ON COLUMN public.gh_realtime_data.actor_login IS '事件發(fā)起人登錄名';
    COMMENT ON COLUMN public.gh_realtime_data.repo_id IS 'repoID';
    COMMENT ON COLUMN public.gh_realtime_data.repo_name IS 'repo名稱';
    COMMENT ON COLUMN public.gh_realtime_data.org_id IS 'repo所屬組織ID';
    COMMENT ON COLUMN public.gh_realtime_data.org_login IS 'repo所屬組織名稱';
    COMMENT ON COLUMN public.gh_realtime_data.type IS '事件類型';
    COMMENT ON COLUMN public.gh_realtime_data.created_at IS '事件發(fā)生時間';
    COMMENT ON COLUMN public.gh_realtime_data.action IS '事件行為';
    COMMENT ON COLUMN public.gh_realtime_data.iss_or_pr_id IS 'issue/pull_request ID';
    COMMENT ON COLUMN public.gh_realtime_data.number IS 'issue/pull_request 序號';
    COMMENT ON COLUMN public.gh_realtime_data.comment_id IS 'comment(評論)ID';
    COMMENT ON COLUMN public.gh_realtime_data.commit_id IS '提交記錄ID';
    COMMENT ON COLUMN public.gh_realtime_data.member_id IS '成員ID';
    COMMENT ON COLUMN public.gh_realtime_data.rev_or_push_or_rel_id IS 'review/push/release ID';
    COMMENT ON COLUMN public.gh_realtime_data.ref IS '創(chuàng)建/刪除的資源名稱';
    COMMENT ON COLUMN public.gh_realtime_data.ref_type IS '創(chuàng)建/刪除的資源類型';
    COMMENT ON COLUMN public.gh_realtime_data.state IS 'issue/pull_request/pull_request_review的狀態(tài)';
    COMMENT ON COLUMN public.gh_realtime_data.author_association IS 'actor與repo之間的關(guān)系';
    COMMENT ON COLUMN public.gh_realtime_data.language IS '編程語言';
    COMMENT ON COLUMN public.gh_realtime_data.merged IS '是否接受合并';
    COMMENT ON COLUMN public.gh_realtime_data.merged_at IS '代碼合并時間';
    COMMENT ON COLUMN public.gh_realtime_data.additions IS '代碼增加行數(shù)';
    COMMENT ON COLUMN public.gh_realtime_data.deletions IS '代碼減少行數(shù)';
    COMMENT ON COLUMN public.gh_realtime_data.changed_files IS 'pull request 改變文件數(shù)量';
    COMMENT ON COLUMN public.gh_realtime_data.push_size IS '提交數(shù)量';
    COMMENT ON COLUMN public.gh_realtime_data.push_distinct_size IS '不同的提交數(shù)量';
    COMMENT ON COLUMN public.gh_realtime_data.hr IS '事件發(fā)生所在小時,如00點(diǎn)23分,hr=00';
    COMMENT ON COLUMN public.gh_realtime_data.month IS '事件發(fā)生所在月,如2015年10月,month=2015-10';
    COMMENT ON COLUMN public.gh_realtime_data.year IS '事件發(fā)生所在年,如2015年,year=2015';
    COMMENT ON COLUMN public.gh_realtime_data.ds IS '事件發(fā)生所在日,ds=yyyy-mm-dd';
    
    COMMIT;
  2. 通過Flink實(shí)時寫入數(shù)據(jù)。

    通過Flink對SLS的數(shù)據(jù)進(jìn)一步解析并實(shí)時寫入到Hologres中。在Flink中使用如下語句對寫入的數(shù)據(jù)進(jìn)行過濾,丟棄事件ID、事件發(fā)生時間(created_at)為空的臟數(shù)據(jù),并且只保留近期發(fā)生的事件數(shù)據(jù)。

    CREATE TEMPORARY TABLE sls_input (
      actor varchar,
      created_at varchar,
      id bigint,
      org varchar,
      payload varchar,
      public varchar,
      repo varchar,
      type varchar
      )
    WITH (
        'connector' = 'sls',
        'endpoint' = '<endpoint>',--sls私域endpoint
        'accessid' = '<accesskey id>',--賬號access id
        'accesskey' = '<accesskey secret>',--賬號access key
        'project' = '<project name>',--sls的project名
        'logstore' = '<logstore name>'--sls的LogStore名稱
    );
    
    CREATE TEMPORARY TABLE hologres_sink (
        id bigint,
        actor_id bigint,
        actor_login string,
        repo_id bigint,
        repo_name string,
        org_id bigint,
        org_login string,
        type string,
        created_at timestamp,
        action string,
        iss_or_pr_id bigint,
        number bigint,
        comment_id bigint,
        commit_id string,
        member_id bigint,
        rev_or_push_or_rel_id bigint,
        `ref` string,
        ref_type string,
        state string,
        author_association string,
        `language` string,
        merged boolean,
        merged_at timestamp,
        additions bigint,
        deletions bigint,
        changed_files bigint,
        push_size bigint,
        push_distinct_size bigint,
        hr string,
        `month` string,
        `year` string,
        ds string
        )
    with (
        'connector' = 'hologres',
        'dbname' = '<hologres dbname>', --Hologres的數(shù)據(jù)庫名稱
        'tablename' = '<hologres tablename>', --Hologres用于接收數(shù)據(jù)的表名稱
        'username' = '<accesskey id>', --當(dāng)前阿里云賬號的AccessKey ID
        'password' = '<accesskey secret>', --當(dāng)前阿里云賬號的AccessKey Secret
        'endpoint' = '<endpoint>', --當(dāng)前Hologres實(shí)例VPC網(wǎng)絡(luò)的Endpoint
        'jdbcretrycount' = '1', --連接故障時的重試次數(shù)
        'partitionrouter' = 'true', --是否寫入分區(qū)表
        'createparttable' = 'true', --是否自動創(chuàng)建分區(qū)
        'mutatetype' = 'insertorignore' --數(shù)據(jù)寫入模式
    );
    
    INSERT INTO hologres_sink
    SELECT id
            ,CAST(JSON_VALUE(actor, '$.id') AS bigint) AS actor_id
            ,JSON_VALUE(actor, '$.login') AS actor_login
            ,CAST(JSON_VALUE(repo, '$.id') AS bigint) AS repo_id
            ,JSON_VALUE(repo, '$.name') AS repo_name
            ,CAST(JSON_VALUE(org, '$.id') AS bigint) AS org_id
            ,JSON_VALUE(org, '$.login') AS org_login
            ,type
            ,TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC') AS created_at
            ,JSON_VALUE(payload, '$.action') AS action
            ,CASE    WHEN type='PullRequestEvent' THEN CAST(JSON_VALUE(payload, '$.pull_request.id') AS bigint)
                     WHEN type='IssuesEvent' THEN CAST(JSON_VALUE(payload, '$.issue.id') AS bigint)
             END AS iss_or_pr_id
            ,CASE    WHEN type='PullRequestEvent' THEN CAST(JSON_VALUE(payload, '$.pull_request.number') AS bigint)
                     WHEN type='IssuesEvent' THEN CAST(JSON_VALUE(payload, '$.issue.number') AS bigint)
                     ELSE CAST(JSON_VALUE(payload, '$.number') AS bigint)
             END AS number
            ,CAST(JSON_VALUE(payload, '$.comment.id') AS bigint) AS comment_id
            ,JSON_VALUE(payload, '$.comment.commit_id') AS commit_id
            ,CAST(JSON_VALUE(payload, '$.member.id') AS bigint) AS member_id
            ,CASE    WHEN type='PullRequestReviewEvent' THEN CAST(JSON_VALUE(payload, '$.review.id') AS bigint)
                     WHEN type='PushEvent' THEN CAST(JSON_VALUE(payload, '$.push_id') AS bigint)
                     WHEN type='ReleaseEvent' THEN CAST(JSON_VALUE(payload, '$.release.id') AS bigint)
             END AS rev_or_push_or_rel_id
            ,JSON_VALUE(payload, '$.ref') AS `ref`
            ,JSON_VALUE(payload, '$.ref_type') AS ref_type
            ,CASE    WHEN type='PullRequestEvent' THEN JSON_VALUE(payload, '$.pull_request.state')
                     WHEN type='IssuesEvent' THEN JSON_VALUE(payload, '$.issue.state')
                     WHEN type='PullRequestReviewEvent' THEN JSON_VALUE(payload, '$.review.state')
             END AS state
            ,CASE    WHEN type='PullRequestEvent' THEN JSON_VALUE(payload, '$.pull_request.author_association')
                     WHEN type='IssuesEvent' THEN JSON_VALUE(payload, '$.issue.author_association')
                     WHEN type='IssueCommentEvent' THEN JSON_VALUE(payload, '$.comment.author_association')
                     WHEN type='PullRequestReviewEvent' THEN JSON_VALUE(payload, '$.review.author_association')
             END AS author_association
            ,JSON_VALUE(payload, '$.pull_request.base.repo.language') AS `language`
            ,CAST(JSON_VALUE(payload, '$.pull_request.merged') AS boolean) AS merged
            ,TO_TIMESTAMP_TZ(replace(JSON_VALUE(payload, '$.pull_request.merged_at'),'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC') AS merged_at
            ,CAST(JSON_VALUE(payload, '$.pull_request.additions') AS bigint) AS additions
            ,CAST(JSON_VALUE(payload, '$.pull_request.deletions') AS bigint) AS deletions
            ,CAST(JSON_VALUE(payload, '$.pull_request.changed_files') AS bigint) AS changed_files
            ,CAST(JSON_VALUE(payload, '$.size') AS bigint) AS push_size
            ,CAST(JSON_VALUE(payload, '$.distinct_size') AS bigint) AS push_distinct_size
            ,SUBSTRING(TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC'),12,2) as hr
            ,REPLACE(SUBSTRING(TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC'),1,7),'/','-') as `month`
            ,SUBSTRING(TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC'),1,4) as `year`
            ,SUBSTRING(TO_TIMESTAMP_TZ(replace(created_at,'T',' '), 'yyyy-MM-dd HH:mm:ss', 'UTC'),1,10) as ds
    FROM
            sls_input
    WHERE
            id IS NOT NULL
          	AND created_at IS NOT NULL
            AND to_date(replace(created_at,'T',' ')) >= date_add(CURRENT_DATE, -1)
    ; 

    參數(shù)說明請參見日志服務(wù)SLS源表實(shí)時數(shù)倉Hologres結(jié)果表

    說明

    由于GitHub原始事件數(shù)據(jù)采用的時區(qū)為UTC、原始數(shù)據(jù)不帶有時區(qū)屬性,Hologres的默認(rèn)時區(qū)為東八區(qū),因此需要在Flink實(shí)時寫入Hologres過程中對數(shù)據(jù)時區(qū)進(jìn)行調(diào)整:需要在Flink SQL中對源表數(shù)據(jù)賦予UTC時區(qū)屬性,并在啟動作業(yè)時在作業(yè)啟動配置頁面的Flink配置區(qū)域添加table.local-time-zone:Asia/Shanghai語句將Flink系統(tǒng)時區(qū)定義為Asia/Shanghai

  3. 查詢數(shù)據(jù)。

    在Hologres中查詢通過Flink寫入Hologres中的SLS數(shù)據(jù),后續(xù)您可以根據(jù)業(yè)務(wù)需求進(jìn)行數(shù)據(jù)開發(fā)。

    SELECT * FROM public.gh_realtime_data limit 10;

    結(jié)果示例如下:

    image

使用離線數(shù)據(jù)修正實(shí)時數(shù)據(jù)

在本文的場景中,實(shí)時數(shù)據(jù)存在遺漏的可能,因此可以使用離線數(shù)據(jù)對實(shí)時數(shù)據(jù)進(jìn)行修正。通過如下步驟可以完成對前一日實(shí)時數(shù)據(jù)的修正,您可以根據(jù)自身業(yè)務(wù)需要,調(diào)整數(shù)據(jù)修正的周期。

  1. 在Hologres中創(chuàng)建外部表,獲取MaxCompute離線數(shù)據(jù)。

    IMPORT FOREIGN SCHEMA <maxcompute_project_name> LIMIT to
    (
        <foreign_table_name>
    ) 
    FROM SERVER odps_server INTO public OPTIONS(if_table_exist 'update',if_unsupported_type 'error');

    參數(shù)說明請參見IMPORT FOREIGN SCHEMA。

  2. 通過創(chuàng)建臨時表實(shí)現(xiàn)離線數(shù)據(jù)修正前一日實(shí)時數(shù)據(jù)。

    -- 清理潛在的臨時表
    DROP TABLE IF EXISTS gh_realtime_data_tmp;
    
    -- 創(chuàng)建臨時表
    SET hg_experimental_enable_create_table_like_properties = ON;
    CALL HG_CREATE_TABLE_LIKE ('gh_realtime_data_tmp', 'select * from gh_realtime_data');
    
    -- 向臨時表插入數(shù)據(jù)并更新統(tǒng)計信息
    INSERT INTO gh_realtime_data_tmp
    SELECT
        *
    FROM
        <foreign_table_name>
    WHERE
        ds = current_date - interval '1 day'
    ON CONFLICT (id, ds)
        DO NOTHING;
    ANALYZE gh_realtime_data_tmp;
    
    -- 已有臨時子表替換原子表
    BEGIN;
    DROP TABLE IF EXISTS "gh_realtime_data_<yesterday_date>";
    ALTER TABLE gh_realtime_data_tmp RENAME TO "gh_realtime_data_<yesterday_date>";
    ALTER TABLE gh_realtime_data ATTACH PARTITION "gh_realtime_data_<yesterday_date>" FOR VALUES IN ('<yesterday_date>');
    COMMIT;

數(shù)據(jù)分析

針對已獲取到的海量數(shù)據(jù),可以進(jìn)行豐富的數(shù)據(jù)分析。您可以結(jié)合自身業(yè)務(wù)需要分析的時間范圍,對數(shù)據(jù)倉庫進(jìn)行進(jìn)一步分層設(shè)計,以滿足實(shí)時數(shù)據(jù)分析、離線數(shù)據(jù)分析、實(shí)時離線一體化分析等多方面訴求。

如下示例針對上文獲取到的實(shí)時數(shù)據(jù)進(jìn)行分析,您也可以針對具體代碼倉庫或開發(fā)者進(jìn)行數(shù)據(jù)分析。

  • 查詢今日公開事件總數(shù)。

    SELECT
        count(*)
    FROM
        gh_realtime_data
    WHERE
        created_at >= date_trunc('day', now());

    返回結(jié)果示例如下:

    count
    ------
    1006
  • 查詢過去1天最活躍(事件數(shù)最多)的幾個項(xiàng)目。

    SELECT
        repo_name,
        COUNT(*) AS events
    FROM
        gh_realtime_data
    WHERE
        created_at >= now() - interval '1 day'
    GROUP BY
        repo_name
    ORDER BY
        events DESC
    LIMIT 5;

    返回結(jié)果示例如下:

    repo_name	                               events
    ----------------------------------------+------
    leo424y/heysiri.ml	                      29
    arm-on/plan	                              10
    Christoffel-T/fiverr-pat-20230331	        9
    mate-academy/react_dynamic-list-of-goods	9
    openvinotoolkit/openvino	                7
  • 查詢過去1天最活躍(事件數(shù)最多)的幾位開發(fā)者。

    SELECT
        actor_login,
        COUNT(*) AS events
    FROM
        gh_realtime_data
    WHERE
        created_at >= now() - interval '1 day'
        AND actor_login NOT LIKE '%[bot]'
    GROUP BY
        actor_login
    ORDER BY
        events DESC
    LIMIT 5;

    返回結(jié)果示例如下:

    actor_login	       events
    ------------------+------
    direwolf-github	    13
    arm-on	            10
    sergii-nosachenko	  9
    Christoffel-T	      9
    yangwang201911	    7
  • 查詢過去1小時最火編程語言排行。

    SELECT
        language,
        count(*) total
    FROM
        gh_realtime_data
    WHERE
        created_at > now() - interval '1 hour'
        AND language IS NOT NULL
    GROUP BY
        language
    ORDER BY
        total DESC
    LIMIT 10;

    返回結(jié)果示例如下:

    language	  total
    -----------+----
    JavaScript	25
    C++	        15
    Python	    14
    TypeScript	13
    Java	      8
    PHP	        8
  • 查詢過去1天項(xiàng)目加星數(shù)排行。

    說明

    本示例并未考慮用戶取消星標(biāo)等情況。

    SELECT
        repo_id,
        repo_name,
        COUNT(actor_login) total
    FROM
        gh_realtime_data
    WHERE
        type = 'WatchEvent'
        AND created_at > now() - interval '1 day'
    GROUP BY
        repo_id,
        repo_name
    ORDER BY
        total DESC
    LIMIT 10;

    返回結(jié)果示例如下:

    repo_id	   repo_name	                       total
    ---------+----------------------------------+-----
    618058471	facebookresearch/segment-anything	 4
    619959033	nomic-ai/gpt4all	                 1
    97249406	denysdovhan/wtfjs	                 1
    9791525	  digininja/DVWA	                   1
    168118422	aylei/interview	                   1
    343520006	joehillen/sysz	                   1
    162279822	agalwood/Motrix	                   1
    577723410	huggingface/swift-coreml-diffusers 1
    609539715	e2b-dev/e2b	                       1
    254839429	maniackk/KKCallStack	             1
    
  • 查詢今日用戶和項(xiàng)目日活。

    SELECT
        uniq (actor_id) actor_num,
        uniq (repo_id) repo_num
    FROM
        gh_realtime_data
    WHERE
        created_at > date_trunc('day', now());

    返回結(jié)果示例如下:

    actor_num	repo_num
    ---------+--------
    743	      816