實(shí)時(shí)計(jì)算Flink版實(shí)時(shí)消費(fèi)Hologres
Hologres連接器支持實(shí)時(shí)消費(fèi)Hologres,即實(shí)時(shí)消費(fèi)Hologres的Binlog數(shù)據(jù)。本文為您介紹實(shí)時(shí)計(jì)算Flink版消費(fèi)Hologres的詳情。
使用限制
Hologres 0.10及以下版本,已存在的表無(wú)法修改表屬性開(kāi)啟Binlog,需要重新建表。Hologres V1.1及以上版本,可以根據(jù)業(yè)務(wù)需要選擇開(kāi)啟或關(guān)閉Binlog能力,同時(shí)支持配置TTL滿足不同業(yè)務(wù)場(chǎng)景對(duì)Binlog保留時(shí)間的訴求,詳情請(qǐng)參見(jiàn)訂閱Hologres Binlog。
不支持開(kāi)啟分區(qū)表父表的Binlog,請(qǐng)使用非分區(qū)表。
暫不支持實(shí)時(shí)消費(fèi)TIMESTAMP類(lèi)型的數(shù)據(jù),因此創(chuàng)建Hologres表時(shí),請(qǐng)使用TIMESTAMPTZ類(lèi)型。
默認(rèn)的Binlog源表不支持?jǐn)?shù)組類(lèi)型,僅支持INTEGER、BIGINT、TEXT、REAL、DOUBLE PRECISION、BOOLEAN、NUMERIC(38,8)和TIMESTAMPTZ數(shù)據(jù)類(lèi)型。
說(shuō)明對(duì)不支持的數(shù)據(jù)類(lèi)型(例如SMALLINT),即使不消費(fèi)此字段,仍然可能導(dǎo)致作業(yè)無(wú)法上線。
實(shí)時(shí)計(jì)算引擎VVR 6.0.3及以上版本新增JDBC模式Binlog源表,VVR 6.0.7版本開(kāi)始默認(rèn)通過(guò)JDBC模式消費(fèi)Hologres Binlog。相比原有Holohub模式,支持更完善的數(shù)據(jù)類(lèi)型,如SMALLINT,數(shù)組類(lèi)型等,同時(shí)也支持了自定義用戶(非RAM用戶)。詳見(jiàn)下方JDBC模式Binlog源表。
Hologres 2.0及以上版本下線了Holohub模式,全面轉(zhuǎn)為JDBC模式。如果您的Flink版本小于6.0.7,需要顯式指定sdkMode參數(shù)為jdbc,或升級(jí)您的Flink版本。
Hologres 1.3.41版本開(kāi)始,JDBC模式Binlog源表新支持讀取JSONB類(lèi)型,但需要數(shù)據(jù)庫(kù)級(jí)別開(kāi)啟GUC,開(kāi)啟GUC的命令如下。
--db級(jí)別開(kāi)啟GUC,僅superuser可以執(zhí)行,每個(gè)db只需要設(shè)置一次。 alter database <db_name> set hg_experimental_enable_binlog_jsonb = on;
實(shí)時(shí)計(jì)算引擎VVR 8.0.4起,連接器如果發(fā)現(xiàn)用戶使用的Hologres實(shí)例大于2.0版本,會(huì)強(qiáng)制使用JDBC模式消費(fèi)Binlog。推薦Hologres實(shí)例升級(jí)至2.1版本,可以從Holohub模式無(wú)縫切換。如果Hologres實(shí)例是2.0版本,且用戶不是Superuser,使用JDBC模式消費(fèi)Binlog需要特別進(jìn)行權(quán)限的配置,否則作業(yè)上線時(shí)可能拋出“permission denied for database”的異常。需要的權(quán)限包括Database的CREATE權(quán)限,以及實(shí)例的Replication Role權(quán)限,授權(quán)SQL如下。
-- 專(zhuān)家權(quán)限模型下為用戶授予CREATE權(quán)限,以及賦予用戶實(shí)例的Replication Role權(quán)限 GRANT CREATE ON DATABASE database_name TO <user_name>; alter role <user_name> replication; -- 如果Database開(kāi)啟了簡(jiǎn)單權(quán)限模型(SLMP),無(wú)法執(zhí)行GRANT語(yǔ)句,使用spm_grant為用戶授予DB的Admin權(quán)限,也可以在Holoweb中直接賦權(quán) call spm_grant('{dbname}_admin', '云賬號(hào)id/云郵箱/RAM賬號(hào)'); alter role <user_name> replication;
注意事項(xiàng)
Hologres Binlog以行存的形式記錄了數(shù)據(jù)的變更前后的整行數(shù)據(jù),因此列存表生成Binlog時(shí)的反查開(kāi)銷(xiāo)要大于行存表。對(duì)于數(shù)據(jù)更新頻繁的場(chǎng)景,建議使用行存表來(lái)開(kāi)啟Binlog,否則Binlog生成會(huì)成為表寫(xiě)入時(shí)的性能瓶頸,如果這張表同時(shí)還用于OLAP等分析查詢,建議使用行列共存的存儲(chǔ)格式。
UPDATE操作會(huì)產(chǎn)生兩條Binlog記錄,分別為更新操作前和操作后的數(shù)據(jù)記錄,因此您會(huì)消費(fèi)到兩條數(shù)據(jù)。但是,Hologres Binlog功能會(huì)保證這兩條記錄是連續(xù)的且更新前的Binlog記錄在前,更新后的Binlog記錄在后。
建議Flink作業(yè)并發(fā)數(shù)和Hologres Table的Shard個(gè)數(shù)保持一致。
您可以在Hologres控制臺(tái)上,使用以下語(yǔ)句查看Table的Shard數(shù),其中tablename為您的業(yè)務(wù)表名稱(chēng)。
select tg.property_value from hologres.hg_table_properties tb join hologres.hg_table_group_properties tg on tb.property_value = tg.tablegroup_name where tb.property_key = 'table_group' and tg.property_key = 'shard_count' and table_name = '<tablename>';
如果作業(yè)從檢查點(diǎn)恢復(fù)過(guò)程中,發(fā)生
table id parsed from checkpoint is different from the current table id
異常,可以升級(jí)到VVR-8.0.9版本啟動(dòng)作業(yè)。這是由于實(shí)時(shí)計(jì)算引擎VVR 8.0.5~VVR 8.0.8版本,Hologres Binlog源表從checkpoint恢復(fù)時(shí),會(huì)強(qiáng)制檢查hologres表的table id,如果當(dāng)前表的table id和checkpoint中保存的不一致,會(huì)無(wú)法從checkpoint恢復(fù)。此異常表示作業(yè)運(yùn)行期間,用戶對(duì)源表進(jìn)行了TRUNCATE或其他重建表操作。考慮到用戶使用場(chǎng)景的復(fù)雜性,在VVR 8.0.9取消了對(duì)table id的強(qiáng)制檢查,但仍然不推薦對(duì)Binlog源表做重建表操作。重建表時(shí)原有表的歷史Binlog會(huì)全部清除,F(xiàn)link使用舊表的消費(fèi)位點(diǎn)去消費(fèi)新表的數(shù)據(jù),可能導(dǎo)致數(shù)據(jù)不一致等不符合預(yù)期的情況。
開(kāi)啟Binlog
實(shí)時(shí)消費(fèi)功能默認(rèn)關(guān)閉,因此在Hologres控制臺(tái)上創(chuàng)建表的DDL時(shí),需要設(shè)置binlog.level和binlog.ttl參數(shù),示例如下。
begin;
CREATE TABLE test_message_src(
id int primary key,
title text not null,
body text
);
call set_table_property('test_message_src', 'orientation', 'row');
call set_table_property('test_message_src', 'clustering_key', 'id');
call set_table_property('test_message_src', 'binlog.level', 'replica'); --自Hologres 1.1版本起,可以在建表后開(kāi)啟Binlog。
call set_table_property('test_message_src', 'binlog.ttl', '86400');
commit;
其中,binlog.level設(shè)置為replica
即代表開(kāi)啟Binlog,binlog.ttl為Binlog的TTL,單位為秒。
消費(fèi)模式
非CDC模式
該模式下Source消費(fèi)的Binlog數(shù)據(jù)是作為普通的Flink數(shù)據(jù)傳遞給下游節(jié)點(diǎn)的,即所有數(shù)據(jù)都是INSERT類(lèi)型的數(shù)據(jù),您可以根據(jù)業(yè)務(wù)情況選擇如何處理特定hg_binlog_event_type
類(lèi)型的數(shù)據(jù)。源表DDL代碼示例如下。
CREATE TABLE test_message_src_binlog_table(
hg_binlog_lsn BIGINT,
hg_binlog_event_type BIGINT,
hg_binlog_timestamp_us BIGINT,
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username'='<yourAccessID>',
'password'='<yourAccessSecret>',
'endpoint'='<yourEndpoint>',
'binlog' = 'true',
'binlogMaxRetryTimes' = '10',
'binlogRetryIntervalMs' = '500',
'binlogBatchReadSize' = '100'
);
CDC模式
該模式下Source消費(fèi)的Binlog數(shù)據(jù),將根據(jù)hg_binlog_event_type
自動(dòng)為每行數(shù)據(jù)設(shè)置準(zhǔn)確的Flink RowKind類(lèi)型,例如,INSERT、DELETE、UPDATE_BEFORE和UPDATE_AFTER類(lèi)型,這樣就能完成表的數(shù)據(jù)的鏡像同步,類(lèi)似MySQL或Postgres的CDC功能。源表DDL代碼示例如下。
CREATE TABLE test_message_src_binlog_table(
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username'='<yourAccessID>',
'password'='<yourAccessSecret>',
'endpoint'='<yourEndpoint>',
'binlog' = 'true',
'cdcMode' = 'true',
'binlogMaxRetryTimes' = '10',
'binlogRetryIntervalMs' = '500',
'binlogBatchReadSize' = '100'
);
全增量一體源表消費(fèi)
在源表Join維表時(shí),由于Binlog的TTL等原因,會(huì)導(dǎo)致無(wú)法使用源表的所有數(shù)據(jù)。原解決方案是為Binlog表設(shè)置一個(gè)很大的TTL,但這樣會(huì)有以下問(wèn)題:
歷史Binlog數(shù)據(jù)會(huì)被長(zhǎng)時(shí)間保存,導(dǎo)致占用較多的存儲(chǔ)資源。
因?yàn)锽inlog包含數(shù)據(jù)更新記錄,使用Binlog進(jìn)行全量消費(fèi)會(huì)消費(fèi)一些不必要的數(shù)據(jù),導(dǎo)致占用較多的計(jì)算資源,且無(wú)法讓用戶只關(guān)注最新的數(shù)據(jù)。
從VVR 4.0.13及以上版本,Hologres 0.10及以上版本,Hologres Binlog CDC源表支持全增量一體的消費(fèi),這種方式會(huì)先讀取數(shù)據(jù)庫(kù)的歷史全量數(shù)據(jù),并平滑切換到Binlog讀取增量數(shù)據(jù)。采用這種方式,可以解決上述問(wèn)題。
適用場(chǎng)景
適用于歷史數(shù)據(jù)不包含Binlog,但又希望消費(fèi)所有數(shù)據(jù)的場(chǎng)景。
僅適用于目標(biāo)表有主鍵的場(chǎng)景,推薦在CDC模式下使用的全增量Hologres源表。
Hologres1.1版本之后,支持按需開(kāi)啟Binlog,可以將已有歷史數(shù)據(jù)的表打開(kāi)Binlog。
代碼示例
CREATE TABLE test_message_src_binlog_table(
hg_binlog_lsn BIGINT,
hg_binlog_event_type BIGINT,
hg_binlog_timestamp_us BIGINT,
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username'='<yourAccessID>',
'password'='<yourAccessSecret>',
'endpoint'='<yourEndpoint>',
'binlog' = 'true',
'cdcMode' = 'true',
'binlogStartUpMode' = 'initial', --先讀取歷史全量數(shù)據(jù),再增量消費(fèi)Binlog。
'binlogMaxRetryTimes' = '10',
'binlogRetryIntervalMs' = '500',
'binlogBatchReadSize' = '100'
);
JDBC模式Binlog源表
實(shí)時(shí)計(jì)算引擎VVR 6.0.7版本開(kāi)始,binlog源表新增JDBC模式(不同于CDC等消費(fèi)模式,此處的JDBC模式是指底層獲取binlog的SDK基于JDBC)。相比原有Holohub模式,JDBC模式的Binlog源表:
支持更多的數(shù)據(jù)類(lèi)型。包括:SMALLINT、INTEGER、BIGINT、TEXT、REAL、DOUBLE PRECISION、BOOLEAN、NUMERIC、DATE、TIME、TIMETZ、TIMESTAMP、TIMESTAMPTZ、BYTEA、JSON、int4[]、int8[]、float4[]、float8[]、boolean[]、text[]、JSONB(需要Hologres版本大于1.3.41且開(kāi)啟相應(yīng)GUC,詳見(jiàn)本文使用限制)。
支持Hologres的自定義用戶(非RAM用戶)。
使用方式與普通的binlog源表類(lèi)似,但需要設(shè)置sdkMode為jdbc,示例如下。
create TEMPORARY table test_message_src_binlog_table(
id INTEGER,
title VARCHAR,
body VARCHAR
) with (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username'='<yourAccessID>',
'password'='<yourAccessSecret>',
'endpoint'='<yourEndpoint>',
'binlog' = 'true',
'cdcMode' = 'true',
'sdkMode'='jdbc', --使用jdbc模式的binlog源表
'jdbcBinlogSlotName'='replication_slot_name' --可選,不設(shè)置會(huì)自動(dòng)創(chuàng)建
);
jdbcBinlogSlotName是jdbc模式消費(fèi)binlog的一個(gè)可選參數(shù),如果不設(shè)置,Hologres連接器會(huì)創(chuàng)建默認(rèn)的slot并使用,默認(rèn)創(chuàng)建的publication名稱(chēng)類(lèi)似publication_for_table_<table_name>_used_by_flink
,默認(rèn)創(chuàng)建的slot名稱(chēng)類(lèi)似slot_for_table_<table_name>_used_by_flink
,在使用中如果發(fā)生異常,可以嘗試刪除并重試。默認(rèn)創(chuàng)建slot需要一定的前提條件,要求用戶為實(shí)例的Superuser,或者同時(shí)擁有Database的CREATE權(quán)限和實(shí)例的Replication Role權(quán)限。如果沒(méi)有權(quán)限導(dǎo)致作業(yè)上線失敗,可以嘗試如下操作,或者參考通過(guò)JDBC消費(fèi)Hologres Binlog文檔進(jìn)行處理。Hologres2.1版本起,JDBC模式消費(fèi)Binlog不再需要配置slot,因此Hologres連接器從VVR 8.0.5開(kāi)始,判斷Hologres實(shí)例為2.1及以上版本,也不再自動(dòng)創(chuàng)建默認(rèn)的slot。
-- 專(zhuān)家權(quán)限模型下為用戶授予CREATE權(quán)限,以及賦予用戶實(shí)例的Replication Role權(quán)限
GRANT CREATE ON DATABASE database_name TO <user_name>;
alter role <user_name> replication;
-- 如果Database開(kāi)啟了簡(jiǎn)單權(quán)限模型(SLMP),無(wú)法執(zhí)行GRANT語(yǔ)句,使用spm_grant為用戶授予DB的Admin權(quán)限,也可以在Holoweb中直接賦權(quán)
call spm_grant('{dbname}_admin', '云賬號(hào)id/云郵箱/RAM賬號(hào)');
alter role <user_name> replication;
目前刪除表并重建同名表可能導(dǎo)致作業(yè)出現(xiàn)"no table is defined in publication"或者"The table xxx has no slot named xxx"異常,原因是表被刪除時(shí),和表綁定的publication沒(méi)有被刪除。當(dāng)發(fā)生此異常時(shí),可以在hologres中執(zhí)行select * from pg_publication where pubname not in (select pubname from pg_publication_tables);
語(yǔ)句,查詢刪表時(shí)未一起被清理的publication,并執(zhí)行drop publication xx;
語(yǔ)句刪除殘留的publication,之后重新啟動(dòng)作業(yè)即可。或者選擇VVR 8.0.5版本,連接器會(huì)自動(dòng)執(zhí)行清理操作。
Hologres Binlog實(shí)現(xiàn)原理
一條Binlog的字段由Binlog系統(tǒng)字段和用戶Table字段組成,字段定義如下:
字段名 | 字段類(lèi)型 | 說(shuō)明 |
hg_binlog_lsn | BIGINT | Binlog系統(tǒng)字段,表示Binlog序號(hào),Shard內(nèi)部單調(diào)遞增不保證連續(xù),不同Shard之間不保證唯一和有序。 |
hg_binlog_event_type | BIGINT | Binlog系統(tǒng)字段,表示當(dāng)前記錄所表示的修改類(lèi)型,參數(shù)取值如下:
|
hg_binlog_timestamp_us | BIGINT | Binlog系統(tǒng)字段,系統(tǒng)時(shí)間戳,單位為微秒。 |
user_table_column_1 | 用戶定義 | 用戶的表字段。 |
... | ... | 用戶的表字段。 |
user_table_column_n | 用戶定義 | 用戶的表字段。 |