Flink/Blink實時消費Hologres Binlog
本文將會為您介紹如何通過Flink和Blink實時消費Hologres Binlog。
注意事項
消費Hologres Binlog需要注意如下事項:
僅Hologres V0.9及以上版本支持消費Hologres Binlog;僅Hologres V1.3.21及以上版本支持配置引擎白名單,HologresV1.3.21以下版本當前暫不支持配置引擎白名單,開啟白名單后,會造成Binlog消費失敗。如果您的實例版本低于所要求實例版本,請您加入Hologres釘釘群進行反饋,詳情可參見在線支持。
Hologres支持單表級別的Binlog功能,支持行存表和列存表,以及從Hologres V1.1版本開始支持行列共存表。開啟Binlog后,理論上列存表的開銷要大于行存表的開銷。因此對于數(shù)據(jù)更新頻繁的場景,建議為使用行存存儲格式的表開啟Binlog。
Hologres Binlog的支持情況以及開啟、配置Hologres Binlog,請參見訂閱Hologres Binlog。
僅阿里云Flink支持消費Hologres Binlog。Holohub模式下Flink消費Hologres Binlog只支持簡單數(shù)據(jù)類型,從Flink 6.0.3版本開始,支持通過JDBC模式消費Hologres Binlog,相比Holohub,JDBC支持更多的數(shù)據(jù)類型,詳情請參見Blink/Flink與Hologres的數(shù)據(jù)類型映射。同時增加了部分權(quán)限限制,詳情請參見權(quán)限說明。
目前不支持消費分區(qū)父表的Binlog。
Hologres V2.0版本起有限支持Holohub模式;V2.1版本起下線Holohub模式,全面轉(zhuǎn)為JDBC模式。在您升級Hologres版本前,請參考Holohub模式切換到JDBC模式,查看您當前正在使用Holohub模式的Flink任務(wù),并按步驟升級Flink VVR作業(yè)版本,然后升級Hologres實例。
權(quán)限說明
Flink通過JDBC模式消費Hologres Binlog支持使用Hologres自定義賬號,通過Holohub模式不支持使用Hologres自定義賬號。
Flink通過Holohub模式消費Hologres Binlog需要表的讀寫權(quán)限。
Flink通過JDBC模式消費Hologres Binlog需要如下前提條件,詳情請參見通過JDBC消費Hologres Binlog。
已創(chuàng)建
hg_binlog
Extension(Hologres V2.0版本起默認創(chuàng)建)。用戶為實例的Superuser或用戶同時擁有目標表的Owner權(quán)限和實例的Replication Role權(quán)限。
Flink實時消費Binlog
VVP-2.4及以上版本支持Hologres Connector實時消費Binlog,使用方法如下。
源表DDL(非CDC模式)
該模式下Source消費的Binlog數(shù)據(jù)是作為普通的Flink數(shù)據(jù)傳遞給下游節(jié)點的,即所有數(shù)據(jù)都是作為Insert類型的數(shù)據(jù),可以根據(jù)業(yè)務(wù)情況選擇如何處理特定hg_binlog_event_type
類型的數(shù)據(jù)。Hologres表開啟Binlog后,在Flink中源表(非CDC模式)使用如下DDL可以實時消費Binlog。
create table test_message_src_binlog_table(
hg_binlog_lsn BIGINT,
hg_binlog_event_type BIGINT,
hg_binlog_timestamp_us BIGINT,
id INTEGER,
title VARCHAR,
body VARCHAR
) with (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username'='<yourAccessID>',
'password'='<yourAccessSecret>',
'endpoint'='<yourEndpoint>',
'binlog' = 'true',
'binlogMaxRetryTimes' = '10',
'binlogRetryIntervalMs' = '500',
'binlogBatchReadSize' = '100'
);
三個
binlogxxx
參數(shù)表示Binlog系統(tǒng)字段,命名和類型是固定的不能修改。其他字段是跟用戶字段一一對應(yīng),必須是全小寫。
源表DDL(CDC模式)
該模式下Source消費的Binlog數(shù)據(jù),將根據(jù)hg_binlog_event_type
自動為每行數(shù)據(jù)設(shè)置準確的Flink RowKind類型(INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER),這樣就能完成表的數(shù)據(jù)的鏡像同步,類似MySQL和Postgres的CDC功能。
Hologres Binlog源表(CDC模式)暫不支持定義Watermark。如果您需要進行窗口聚合,您可以采用非窗口聚合的方式,詳情請參見MySQL/Hologres CDC源表不支持窗口函數(shù),如何實現(xiàn)類似每分鐘聚合統(tǒng)計的需求?。
Hologres表開啟Binlog后,在Flink中源表(CDC模式)使用如下DDL可以實時消費Binlog。
create table test_message_src_binlog_table(
id INTEGER,
title VARCHAR,
body VARCHAR
) with (
'connector'='hologres',
'dbname'='<yourDbname>',//Hologres的DB名
'tablename'='<yourTablename>',//Hologres的表名
'username'='<yourAccessID>',//當前賬號的access id
'password'='<yourAccessSecret>',//當前賬號的access key
'endpoint'='<yourEndpoint>',//Hologres的vpc網(wǎng)絡(luò)地址
'binlog' = 'true',
'cdcMode' = 'true',
'binlogMaxRetryTimes' = '10',
'binlogRetryIntervalMs' = '500',
'binlogBatchReadSize' = '100'
);
全增量一體源表
VVR引擎1.13-vvr-4.0.13版本,Hologres實例0.10及以上版本開始,Hologres Binlog CDC源表支持全增量一體的消費,這種方式會先讀取數(shù)據(jù)庫的歷史全量數(shù)據(jù),并平滑切換到Binlog讀取增量數(shù)據(jù),詳情請參見實時數(shù)倉Hologres。
JDBC模式Binlog源表
從Flink 6.0.3版本開始,支持通過JDBC模式消費Hologres Binlog,JDBC模式相比Holohub支持更多的數(shù)據(jù)類型和支持自定義賬號,詳情使用請參見實時數(shù)倉Hologres。
Holohub模式切換到JDBC模式
Hologres從V2.0版本起逐步下線Holohub模式。如果您需要升級Hologres版本,需要將Holohub模式的作業(yè)切換到JDBC模式。請參考如下方式進行。
Hologres實例升級為V2.1版本
您在升級Hologres實例版本到V2.1前,請選擇如下兩個方案之一,檢查Flink任務(wù)與Hologres實例,以保障Flink任務(wù)正常運行。
(方案一)(推薦)將Flink VVR版本升級到8.0.7及以上版本,F(xiàn)link會自動將Holohub模式切換為JDBC模式。
(方案二)將Flink VVR升級到6.0.7~8.0.5版本,在源表中添加參數(shù)
'sdkMode'='jdbc'
之后重新啟動作業(yè),同時需要授予用戶如下權(quán)限選項中的其中之一,確認作業(yè)正常運行之后再對Hologres實例進行升級。(選項一)實例的Superuser權(quán)限。
(選項二)目標表的Owner權(quán)限,CREATE DATABASE權(quán)限及實例的Replication Role權(quán)限。
(方案三)(不推薦)將Flink VVR版本升級至8.0.6,F(xiàn)link會自動將Holohub模式切換為JDBC模式。但VVR 8.0.6版本存在已知缺陷,當維表字段過多時可能導(dǎo)致VVR上線超時,詳情請參見Hologres Connector Release Note。
(可選)如果您的Flink VVR作業(yè)數(shù)量較多,獲取需要升級版本的作業(yè)和表信息請參見如下內(nèi)容。
Hologres實例升級為V2.0版本
(方案一)(推薦)將Flink VVR版本升級到8.0.6及以上版本,F(xiàn)link會自動將Holohub模式切換為JDBC模式,其中VVR 8.0.6版本存在已知缺陷,當維表字段過多時可能導(dǎo)致VVR作業(yè)上線超時,詳情請參見Hologres Connector Release Note。建議選擇VVR 8.0.7版本。
(方案二)將Flink VVR版本升級到8.0.4或8.0.5版本,并重啟Flink作業(yè),同時授予用戶如下權(quán)限選項中的其中之一,確認作業(yè)正常運行之后再對Hologres實例進行升級。
(選項一)實例的Superuser權(quán)限。
(選項二)目標表的Owner權(quán)限,CREATE DATABASE權(quán)限,及實例的Replication Role權(quán)限。
(方案三)將Flink VVR版本升級到6.0.7到8.0.3版本,F(xiàn)link會繼續(xù)使用Holohub模式消費Binlog。
如果您的Flink VVR消費Hologres Binlog的作業(yè)過多,可以使用如下方式獲取需要升級版本的作業(yè)和表信息。
該工具僅支持獲取如下作業(yè)信息:
通過DDL方式進行表定義的SQL作業(yè)。
通過Hints方式指定參數(shù)的Catalog作業(yè)。
不支持獲取JAR作業(yè)信息,不支持獲取沒有Hints參數(shù)的Catalog表信息。
下載開源工具find-incompatible-flink-jobs-1.0-SNAPSHOT-jar-with-dependencies.jar,詳情請參見find-incompatible-flink-jobs。
使用本地命令行進入開源工具目錄,然后運行如下命令,即可查看全部需要升級版本的作業(yè)和表信息。
說明運行如下命令需要安裝Java環(huán)境,使用JDK 8及以上版本。
java -cp find-incompatible-flink-jobs-1.0-SNAPSHOT-jar-with-dependencies.jar com.alibaba.hologres.FindIncompatibleFlinkJobs <region> <url> <AccessKeyID> <AccessKeySecret> <binlog/rpc> # 使用示例 java -cp find-incompatible-flink-jobs-1.0-SNAPSHOT-jar-with-dependencies.jar com.alibaba.hologres.FindIncompatibleFlinkJobs 北京 https://vvp.console.aliyun.com/web/xxxxxx/zh/#/workspaces/xxxx/namespaces/xxxx/operations/stream/xxxx my-access-key-id my-access-key-secret binlog
參數(shù)說明如下:
參數(shù)
說明
region
目標實時計算Flink版項目空間所在地域的中文簡稱,取值請參見region取值對應(yīng)表。
url
目標實時計算Flink版項目任意一個作業(yè)的連接地址。
AccessKeyID
能訪問實時計算Flink版項目空間的賬號AccessKey ID。
AccessKeySecret
能訪問實時計算Flink版項目空間的賬號AccessKey Secret。
binlog/rpc
需要檢查的作業(yè)內(nèi)容,取值如下:
binlog
:表示檢查整個項目中所有作業(yè)的Hologres Binlog源表。rpc
:表示檢查整個項目中所有作業(yè)使用了rpc
模式的維表或結(jié)果表。
地域
取值
華北2(北京)
北京
華東2(上海)
上海
華東1(杭州)
杭州
華南1(深圳)
深圳
華北3(張家口)
張家口
中國(香港)
香港
新加坡
新加坡
德國(法蘭克福)
德國
印度尼西亞(雅加達)
印度尼西亞
馬來西亞(吉隆坡)
馬來西亞
美國(硅谷)
美國
上海金融云
上海金融云
示例返回結(jié)果如下。