實時數倉Hologres
本文為您介紹如何使用實時數倉Hologres連接器。
背景信息
實時數倉Hologres是一站式實時數據倉庫引擎,支持海量數據實時寫入、實時更新、實時分析,支持標準SQL(兼容PostgreSQL協議),支持PB級數據多維分析(OLAP)與即席分析(Ad Hoc),支持高并發低延遲的在線數據服務(Serving),與MaxCompute、Flink、DataWorks深度融合,提供離在線一體化全棧數倉解決方案。Hologres連接器支持的信息如下。
類別 | 詳情 |
支持類型 | 源表、維表和結果表 |
運行模式 | 流模式和批模式 |
數據格式 | 暫不支持 |
特有監控指標 |
|
API種類 | Datastream和SQL |
是否支持更新或刪除結果表數據 | 是 |
特色功能
源表
功能 | 詳情 |
實時消費Hologres |
獲取更多信息,詳情請參見Flink/Blink實時消費Hologres Binlog。 |
結果表
功能 | 詳情 |
支持寫入Changelog消息。 | |
只更新修改部分的數據,而非整行更新。 | |
支持實時同步單表、整庫的數據以及相應的表結構變更到Hologres表中。 | |
插入部分列 說明 僅實時計算引擎VVR 6.0.7及以上版本支持。 | 支持將Flink INSERT DML中指定的列名下推給連接器,從而僅更新指定的列。 |
前提條件
已創建Hologres表,詳情請參見創建Hologres表。
使用限制
通用:
僅Flink計算引擎VVR 2.0.0及以上版本支持Hologres連接器。
Hologres連接器不支持訪問Hologres外部表。關于Hologres外部表詳情請參見創建Hologres外部表(映射到MaxCompute)。
連接器目前的已知缺陷以及版本功能發布記錄詳見Hologres Connector Release Note。
源表獨有:
Flink默認以批模式讀取Hologres源表數據,即只掃描一次Hologres全表,掃描結束,消費結束,新到Hologres源表的數據不會被讀取。從VVR 3.0.0版本開始,支持實時消費Hologres數據,詳情請參見實時計算Flink版實時消費Hologres。從VVR 6.0.3版本開始,Hologres源表在批模式讀取時支持filter下推,詳見源表參數
enable_filter_push_down
。實時計算引擎8.0以下版本Hologres CDC模式暫不支持定義Watermark。如果您需要進行窗口聚合,您可以采用非窗口聚合的方式,詳情請參見MySQL/Hologres CDC源表不支持窗口函數,如何實現類似每分鐘聚合統計的需求?。
結果表獨有:無。
維表獨有:
維表建議使用主鍵作為Join條件,對于此類主鍵點查的維表,創建Hologres表時建議選擇行存模式,列存模式對于點查場景性能開銷較大。選擇行存模式創建維表時必須設置主鍵,并且將主鍵設置為Clustering Key才可以工作。詳情請參見CREATE TABLE。
如果業務需要,無法使用主鍵作為Join條件,對于此類非主鍵點查的維表(即一對多的查詢),創建Hologres表時建議選擇列存模式,并合理設置分布鍵Distribution Key以及Event Time Column(Segment Key)以優化查詢性能,詳情請參見表存儲格式:列存、行存、行列共存。
VVR 4.0以下版本僅支持對維表主鍵點查的維表Join,VVR 4.0及以上版本,jdbc模式支持維表的非主鍵點查。
注意事項
使用了rpc模式時,VVR版本升級注意事項:
Hologres 2.0版本下線了rpc(用于維表與結果表)模式,全面轉為jdbc相關模式(目前包括jdbc、jdbc_fixed和jdbc_copy等),rpc模式不會對同一批次中相同主鍵的數據做去重,如果業務場景需要保留完整的數據,切換為jdbc模式后,可以通過設置'jdbcWriteBatchSize'='1'防止去重,或者升級到VVR 8.0.5版本配置deduplication.enabled參數為false。
如果您作業中存在使用了rpc模式讀寫Hologres的情況,此時如果您需要將VVR 4.x升級到VVR 6.x或VVR 8.x,請按照以下情況進行處理:
升級到VVR 6.0.4~6.0.6版本,可能會拋出異常。推薦維表和結果表使用jdbc_fixed或jdbc模式。
升級到VVR 6.0.7及以上版本,無需您做任何處理,Flink系統會自動替換rpc為jdbc相關模式。
使用binlog源表且未指定jdbc模式時,VVR版本升級注意事項:
Hologres 2.0版本開始有限支持holohub(用于Binlog源表)模式,Hologres 2.1版本徹底下線了holohub模式,全面轉為jdbc模式。
如果您作業中存在消費binlog源表的情況,而且binlog源表未通過sdkmode='jdbc'指定jdbc模式,默認會使用holohub模式。此時如果您需要將VVR 4.x升級到VVR 6.x或VVR 8.x,請按照以下情況進行處理:
如果Hologres版本是2.0。
升級到VVR 6.0.7~VVR 8.0.3版本,仍然可以繼續使用holohub模式。
升級到VVR 8.0.4及以上版本,可能拋出權限不足的異常,需要特別配置用戶權限,詳情見實時計算Flink版實時消費Hologres。
如果Hologres版本是2.1。
升級到VVR 6.0.7~VVR 8.0.4版本,可能無法正常消費Binlog,建議升級到VVR 8.0.5。
升級到VVR 8.0.5及以上版本,無需您做任何處理,Flink系統會自動替換holohub模式為jdbc模式。
語法結構
CREATE TABLE hologres_table (
name VARCHAR,
age BIGINT,
birthday BIGINT,
PRIMARY KEY (name) NOT ENFORCED
) WITH (
'connector' = 'hologres',
'dbname' = '<yourDBName>',
'tablename' = '<yourTableName>',
'username' = '${secret_values.ak_id}',
'password' = '${secret_values.ak_secret}',
'endpoint' = '<yourEndpoint>',
'sdkmode' = 'jdbc'
);
WITH參數
僅Flink實時計算引擎VVR 4.0.11及以上版本支持所有jdbc開頭的參數。
通用
參數
說明
數據類型
是否必填
默認值
備注
connector
表類型。
String
是
無
固定值為
hologres
。dbname
數據庫名稱。
String
是
無
Hologres V2.0版本推出了全新的彈性高可用實例形態,將計算資源分解為不同的計算組(Virtual Warehouse),更好的服務于高可用部署,詳情請參見計算組實例快速入門。不同的計算組使用相同的Endpoint,您可以通過在dbname參數后添加特定的后綴來指定連接某個計算組。例如某張維表希望連接特定的計算組read_warehouse,可以通過
'dbname' = 'db_test@read_warehouse'
方式指定,詳情請參見連接計算組。說明僅JDBC相關模式支持使用計算組,詳見源表、維表和結果表WITH參數中的sdkMode參數。
tablename
表名稱。
String
是
無
如果Schema不為Public時,則tablename需要填寫為
schema.tableName
。username
用戶名,請填寫阿里云賬號的AccessKey ID。
String
是
無
詳情請參見如何查看AccessKey ID和AccessKey Secret信息?
重要為了避免您的AK信息泄露,建議您通過密鑰管理的方式填寫AccessKey ID取值,詳情請參見變量管理。
password
密碼,請填寫阿里云賬號的AccessKey Secret。
String
是
無
詳情請參見如何查看AccessKey ID和AccessKey Secret信息?
重要為了避免您的AK信息泄露,建議您通過密鑰管理的方式填寫AccessKey Secret取值,詳情請參見變量管理。
endpoint
Hologres服務地址。
String
是
無
詳情請參見訪問域名。
connection.ssl.mode
是否啟用SSL(Secure Sockets Layer)傳輸加密,以及啟用采用何種模式。
String
否
disable
參數取值如下:
disable(默認值):不啟用傳輸加密。
require:啟用SSL,只對數據鏈路加密。
verify-ca:啟用SSL,加密數據鏈路,同時使用CA證書驗證Hologres服務端的真實性。
verify-full:啟用SSL,加密數據鏈路,使用CA證書驗證Hologres服務端的真實性,同時比對證書內的CN或DNS與連接時配置的Hologres連接地址是否一致。
說明VVR 8.0.5及以上版本開始支持此參數。
Hologres自1.1版本起支持SSL傳輸加密的require模式,2.1版本起新增支持verify-ca和verify-full模式。詳見傳輸加密。
當配置為verify-ca或者verify-full時,需要同時配置connection.ssl.root-cert.location參數。
connection.ssl.root-cert.location
當傳輸加密模式需要證書時,配置證書的路徑。
String
否
無
當connection.ssl.mode配置為verify-ca或者verify-full時,需要同時配置CA證書的路徑。證書可以使用實時計算控制臺的資源上傳功能上傳至平臺,上傳后文件存放在/flink/usrlib目錄下。例如,需要使用的CA證書文件名為certificate.crt,則上傳后參數取值應該為
'/flink/usrlib/certificate.crt'
。說明VVR 8.0.5及以上版本開始支持此參數。
CA證書獲取方式見傳輸加密-下載CA證書。
jdbcRetryCount
當連接故障時,寫入和查詢的重試次數。
Integer
否
10
無。
jdbcRetrySleepInitMs
每次重試的固定等待時間。
Long
否
1000
實際重試的等待時間的計算公式為
jdbcRetrySleepInitMs+retry*jdbcRetrySleepStepMs
。單位為毫秒。jdbcRetrySleepStepMs
每次重試的累加等待時間。
Long
否
5000
實際重試的等待時間的計算公式為
jdbcRetrySleepInitMs+retry*jdbcRetrySleepStepMs
。單位為毫秒。jdbcConnectionMaxIdleMs
JDBC連接的空閑時間。
Long
否
60000
超過這個空閑時間,連接就會斷開釋放掉。單位為毫秒。
jdbcMetaCacheTTL
本地緩存TableSchema信息的過期時間。
Long
否
60000
單位為毫秒。
jdbcMetaAutoRefreshFactor
如果緩存的剩余時間小于觸發時間,則系統會自動刷新緩存。
Integer
否
4
緩存的剩余時間計算方法:緩存的剩余時間=緩存的過期時間 - 緩存已經存活的時間。緩存自動刷新后,則從0開始重新計算緩存的存活時間。
觸發時間計算方法:jdbcMetaCacheTTL/jdbcMetaAutoRefreshFactor兩個參數的比值。
type-mapping.timestamp-converting.legacy
Flink和Hologres之間是否進行時間類型的相互轉換。
Boolean
否
true
參數取值如下:
true:不進行相互轉換。時區轉換將采用運行環境中的JVM時區。
false(推薦):進行相互轉換。時區轉換將使用Flink所配置的時區。
說明僅實時計算引擎VVR 8.0.6及以上版本支持該參數。
Flink和Hologres的時區詳情,請參見Flink與Hologres時區說明。
property-version=0時,默認值為true;property-version=1時,默認值為false。
property-version
Connector參數版本。
Integer
否
0
可填的值為0和1,默認值為0。
說明僅VVR 8.0.6及以上版本支持配置該參數。
在不同參數版本里,可用的參數集合和參數的默認值可能不同。如果存在區別,區別詳情會在參數的說明部分描述。
推薦使用參數版本1。
源表獨有
參數
說明
數據類型
是否必填
默認值
備注
field_delimiter
導出數據時,不同行之間使用的分隔符。
String
否
"\u0002"
無。
binlog
是否消費Binlog數據。
Boolean
否
false
參數取值如下:
true:消費Binlog數據。
false(默認值):不消費Binlog數據。
說明實時計算引擎VVR 8.0.6及以上版本支持多版本默認值。
property-version=0時,默認值為false。
property-version=1時,默認值為true。
sdkMode
SDK模式。
String
否
holohub
參數取值如下:
holohub(默認值):使用holohub模式消費binlog。
jdbc:使用jdbc模式消費binlog。
jdbc_fixed: 使用fixed jdbc模式消費binlog,與jdbc模式的區別在于不受連接數限制。目前此模式暫不支持消費開起數據脫敏功能的database的binlog,詳情請參見數據脫敏。
詳情請參見Binlog Source表。
版本推薦參數如下:
VVR 6.0.3及以下版本:不支持配置該參數。
VVR 6.0.4~6.0.6版本:推薦使用默認配置,即holohub。
VVR 6.0.7及以上版本:推薦配置為jdbc。
VVR 8.0.4~8.0.5版本:系統會自動切換為jdbc。
VVR 8.0.6及以上版本:系統會根據Hologres版本進行自動切換。
2.1.27及以上版本,為jdbc_fixed模式。
2.1.0~2.1.26版本,為jdbc模式
Hologres實例為2.0版本,Flink系統會嘗試使用jdbc模式,如果存在權限不足等異常,會自動選擇holohub模式啟動。
說明VVR 6.0.7及以上版本:
Hologres實例為2.0以下版本,Flink系統采用您配置的參數值。
Hologres實例為2.0及以上版本,由于Hologres 2.0版本下線了holohub服務,此時如果您配置了holohub,Flink系統自動切換為jdbc。但是如果您配置為jdbc,Flink系統采用jdbc。
VVR 8.0.4~8.0.5版本:
Hologres實例為2.0版本,Flink系統自動切換為jdbc。可能存在權限不足等問題,參考實時計算Flink版實時消費Hologres文檔進行處理。
jdbcBinlogSlotName
JDBC模式的binlog源表的Slot名稱。創建方法請參見JDBC模式Binlog源表。
String
否
無
僅在sdkMode配置為jdbc時生效,如果用戶未配置,連接器會默認創建一個Slot來使用。詳見JDBC模式Binlog源表。
說明Hologres實例2.1版本起,不再需要配置此參數,連接器也不會嘗試自動創建。
binlogMaxRetryTimes
讀取Binlog數據出錯后的重試次數。
Integer
否
60
無。
binlogRetryIntervalMs
讀取Binlog數據出錯后的重試時間間隔。
Long
否
2000
單位為毫秒。
binlogBatchReadSize
批量讀取Binlog的數據行數。
Integer
否
100
無。
cdcMode
是否采用CDC模式讀取Binlog數據。
Boolean
否
false
參數取值如下:
true:CDC模式讀取Binlog數據。
false(默認值):非CDC模式讀取Binlog數據。
說明實時計算引擎VVR 8.0.6及以上版本支持多版本默認值。
property-version=0時,默認值為false。
property-version=1時,默認值為true。
upsertSource
源表是否使用upsert類型的Changelog。
Boolean
否
false
僅在CDC模式下生效。參數取值如下:
true:僅支持Upsert類型,包括INSERT、DELETE和UPDATE_AFTER。
false(默認值):支持所有類型,包括INSERT、DELETE、UPDATE_BEFORE和UPDATE_AFTER。
說明如果下游包含回撤算子(例如使用ROW_NUMBER OVER WINDOW去重),則需要設置upsertSource為true,此時源表會以Upsert方式從Hologres中讀取數據。
binlogStartupMode
Binlog數據消費模式。
String
否
earliestOffset
參數取值如下:
initial:先全量消費數據,再讀取Binlog開始增量消費。
earliestOffset(默認值):從最早的Binlog開始消費。
timestamp:從設置的startTime開始消費Binlog。
說明如果設置了startTime或者在啟動界面選擇了啟動時間,則binlogStartupMode強制使用timestamp模式,其他消費模式不生效,即startTime參數優先級更高。
說明僅實時計算引擎VVR 4.0.13及以上版本支持該參數。
實時計算引擎VVR 8.0.6及以上版本支持多版本默認值。
property-version=0時,默認值為false。
property-version=1時,默認值為true。
startTime
啟動位點的時間。
String
否
無
格式為yyyy-MM-dd hh:mm:ss。如果沒有設置該參數,且作業沒有從State恢復,則從最早的Binlog開始消費Hologres數據。
jdbcScanFetchSize
掃描時攢批大小。
Integer
否
256
無。
jdbcScanTimeoutSeconds
掃描操作超時時間。
Integer
否
60
單位為秒。
jdbcScanTransactionSessionTimeoutSeconds
掃描操作所在事務的超時時間。
Integer
否
600秒(0表示不超時)
對應Hologres的GUC參數idle_in_transaction_session_timeout,詳情請參見GUC參數。
說明僅實時計算引擎Flink1.13-vvr-4.0.15及以上版本支持該參數。
enable_filter_push_down
全量讀取階段是否進行filter下推。
Boolean
否
false
參數取值如下:
false(默認值):不進行filter下推。
true:讀取全量數據時,將支持的過濾條件下推到Hologres執行。包括非Binlog Source全量讀取以及Binlog Source使用全增量一體化消費模式時的全量階段。
重要實時計算引擎Flink1.15-vvr-6.0.3到Flink1.15-vvr-6.0.5默認會進行filter下推,但如果作業使用了hologres維表,且寫入的DML中包含有對維表非主鍵字段的過濾條件時,維表的filter會被錯誤的下推,可能導致維表join出現錯誤結果。因此推薦使用實時計算引擎Flink1.15-vvr-6.0.6及以上版本,并在源表增加此參數來開啟過濾條件下推功能。
結果表獨有
參數
說明
數據類型
是否必填
默認值
備注
sdkMode
SDK模式。
String
否
jdbc
參數取值如下:
jdbc:默認值,表示使用jdbc模式進行寫入。
jdbc_copy:是否使用fixed copy方式寫入。
fixed copy是hologres1.3新增的能力,相比通過insert方法(jdbc模式)進行寫入,fixed copy方式可以更高的吞吐(因為是流模式),更低的數據延時,更低的客戶端內存消耗(因為不攢批)。但此模式暫不支持delete數據,也不支持寫入分區父表,不支持ignoreNullWhenUpdate參數。
rpc:表示使用rpc模式進行寫入,與useRpcMode參數一致,與jdbc模式的區別在于不占用連接數,并且不支持寫入Hologres的JSONB、RoarinBitmap類型。
jdbc_fixed(beta功能):表示使用fixed jdbc方式進行寫入,
需要Hologres引擎版本大于等于1.3,與jdbc模式的區別在于不占用連接數,不支持寫入Hologres的Jsonb,RoarinBitmap類型。目前此模式暫不支持寫入開啟數據脫敏功能的database,詳情請參見數據脫敏。
版本推薦參數如下:
VVR 6.0.3以下版本:不支持配置該參數。
VVR 6.0.4及以上版本:推薦配置為jdbc。
說明VVR 6.0.7~VVR 8.0.1版本:
如果Hologres實例為2.0以下版本,Flink系統采用您配置的參數值。
如果Hologres實例為2.0及以上版本,由于Hologres 2.0及以上版本下線了rpc服務,此時如果您將該參數配置為rpc,Flink系統將自動切換該參數值為jdbc_fixed,但是如果您配置為其他值,Flink系統將采用您配置的參數值。
rpc模式不會對同一批次中相同主鍵的數據做去重,如果業務場景需要保留完整的數據,切換為jdbc模式后,可以通過設置'jdbcWriteBatchSize'='1'防止去重。
VVR 8.0.3及以上版本
自此版本開始,無論Hologres實例版本,都不再支持rpc模式,如果選擇rpc模式,將自動切換該參數值為jdbc_fixed且設置'jdbcWriteBatchSize'='1'防止去重。
bulkload
是否采用bulkload寫入。
Boolean
否
false
僅在sdkMode設置為jdbc_copy時生效。bulkload寫入目前僅適用于無主鍵表或者主鍵保證不重復的有主鍵表(主鍵重復會拋出異常),相比默認的jdbc_copy,寫入使用更少的Hologres資源。
說明僅實時計算引擎VVR 8.0.5及以上版本且Hologres實例為2.1及以上版本支持該參數。
useRpcMode
是否通過RPC方式使用Hologres連接器。
Boolean
否
false
參數取值如下:
true:使用RPC方式使用Hologres連接器。
與sdkMode參數設置為rpc效果相同,通過RPC方式會降低SQL連接數。
false(默認值):使用JDBC方式使用Hologres連接器。
通過JDBC方式會占用SQL連接,導致JDBC連接數增加。
說明Hologres 2.0版本下線了rpc模式,推薦使用sdkMode參數來選擇jdbc或者jdbc_fixed模式。
實時計算引擎VVR 6.0.7及VVR 8.0.1版本在檢測到Hologres實例是2.0及以上版本時,會自動切換rpc模式為jdbc_fixed模式。
實時計算引擎VVR 8.0.3及以上版本會自動切換rpc模式為jdbc_fixed模式。
rpc模式不會對同一批次中相同主鍵的數據做去重,如果業務場景需要保留完整的數據變化記錄,推薦使用實時計算引擎VVR 8.0.5及以上版本,jdbc模式可以配置deduplication.enabled參數為false不進行去重。
property-version=1時,該參數下線。
mutatetype
數據寫入模式。
String
否
insertorignore
詳情請參見流式語義。
說明實時計算引擎VVR 8.0.6及以上版本支持多版本默認值。
property-version=0時,默認值為insertorignore。
property-version=1時,默認值為insertorupdate。
partitionrouter
是否寫入分區表。
Boolean
否
false
無。
createparttable
當寫入分區表時,是否根據分區值自動創建不存在的分區表。
Boolean
否
false
RPC模式下,如果分區值中存在短劃線(-),暫不支持自動創建分區表。
說明Hologres實例1.3.22及以上版本開始支持使用Date類型做分區鍵。實時計算引擎VVR 8.0.3及以上版本,支持使用Date類型做分區鍵時自動創建分區表。
請確保分區值不會出現臟數據,否則會創建錯誤的分區表導致Failover,建議慎用該參數。
當sdk_mode設置為jdbc_copy時,不支持寫入分區父表。
ignoredelete
是否忽略撤回消息。
Boolean
否
true
說明僅在使用流式語義時生效。
實時計算引擎VVR 8.0.8及以上版本推薦使用
sink.delete-strategy
參數替換該參數。兩個參數同時配置時,只有sink.delete-strategy
參數生效。實時計算引擎VVR 8.0.6及以上版本支持多版本默認值。
property-version=0時,默認值為true。
property-version=1時,默認值為false。
sink.delete-strategy
撤回消息的處理策略。
String
否
無
參數取值如下:
IGNORE_DELETE:忽略Update Before和Delete消息。適用于僅需插入或更新數據,而無需刪除數據的場景。
NON_PK_FIELD_TO_NULL:忽略Update Before消息,并將Delete消息執行為將非主鍵字段更新為null。適用于希望在局部更新操作中執行刪除操作而不影響其他列的場景。
DELETE_ROW_ON_PK:忽略Update Before消息,并將Delete消息執行為根據主鍵刪除整行。適用于在局部更新過程中,希望執行刪除整行操作,從而影響其他列的場景。
CHANGELOG_STANDARD:Flink框架按照 Flink SQL Changelog的工作原理運行,不忽略刪除操作,并通過先刪除數據再插入的方式執行更新操作,以確保數據準確性。適用于不涉及局部更新的場景。
說明僅實時計算引擎VVR 8.0.8及以上版本支持該參數。
啟用NON_PK_FIELD_TO_NULL選項可能會導致記錄中只有主鍵,其他所有列都為null。
connectionSize
單個Flink結果表任務所創建的JDBC連接池大小。
Integer
否
3
如果作業性能不足,建議您增加連接池大小。連接池大小和數據吞吐成正比。
jdbcWriteBatchSize
JDBC模式,Hologres Sink節點數據攢批條數(不是來一條數據處理一條,而是攢一批再處理)的最大值。
Integer
否
256
單位為數據行數。
說明jdbcWriteBatchSize、jdbcWriteBatchByteSize和jdbcWriteFlushInterval三者之間為或的關系。如果同時設置了這三個參數,則滿足其中一個,就進行寫入結果數據。
jdbcWriteBatchByteSize
JDBC模式,Hologres Sink節點數據攢批字節數(不是來一條數據處理一條,而是攢一批再處理)的最大值。
Long
否
2097152(2*1024*1024)字節,即2 MB
說明jdbcWriteBatchSize、jdbcWriteBatchByteSize和jdbcWriteFlushInterval三者之間為或的關系。如果同時設置了這三個參數,則滿足其中一個,就進行寫入結果數據。
jdbcWriteFlushInterval
JDBC模式,Hologres Sink節點數據攢批寫入Hologres的最長等待時間。
Long
否
10000
單位為毫秒。
說明jdbcWriteBatchSize、jdbcWriteBatchByteSize和jdbcWriteFlushInterval三者之間為或的關系。如果同時設置了這三個參數,則滿足其中一個,就進行寫入結果數據。
ignoreNullWhenUpdate
當mutatetype='insertOrUpdate'時,是否忽略更新寫入數據中的Null值。
Boolean
否
false
取值說明如下:
false(默認值):將Null值寫到Hologres結果表里。
true:忽略更新寫入數據中的Null值。
說明僅Flink計算引擎VVR 4.0及以上版本支持該參數。
當sdk_mode設置為jdbc_copy時,不支持此參數。
connectionPoolName
連接池名稱。同一個TaskManager中,配置相同名稱的連接池的表可以共享連接池。
String
否
無
取值為非
'default'
的任意字符串。如果多個表設置相同的連接池,則這些使用相同連接池的表的connectionSize參數也需要相同。說明VVR 4.0.12以下版本:不支持配置該參數。
VVR 4.0.12~VVR 8.0.3版本:默認不共享,每個表使用自己的連接池。
VVR 8.0.4以上版本:同一個作業中endpoint相同的表會默認共享連接池。作業中表數量較多時連接數可能相對不足影響性能,這種情況下推薦為不同的表設置不同的connectionPoolName。
此參數可以按需配置,比如作業中有維表A,B以及結果表C,D,E五張hologres表,可以A表和B表使用'pool1',C表和D表使用'pool2',E表流量較大,單獨使用'pool3'。
jdbcEnableDefaultForNotNullColumn
如果將Null值寫入Hologres表中Not Null且無默認值的字段,是否允許連接器幫助填充一個默認值。
Boolean
否
true
參數取值如下:
true(默認值):允許連接器填充默認值并寫入,規則如下。
如果字段是String類型,則默認寫為空("")。
如果字段是Number類型,則默認寫為0。
如果是Date、timestamp或timestamptz時間類型字段,則默認寫為1970-01-01 00:00:00。
false:不填充默認值,寫Null到Not Null字段時,會拋出異常。
remove-u0000-in-text.enabled
如果寫入時字符串類型包含\u0000非法字符,是否允許連接器幫助去除。
Boolean
否
false
參數取值如下:
false(默認值):連接器不對數據進行操作,但碰到臟數據時寫入可能拋出如下異常,
ERROR: invalid byte sequence for encoding "UTF8": 0x00
此時需要在源表提前處理臟數據,或者在SQL中定義臟數據處理邏輯。
true:連接器會幫助去除字符串類型中的\u0000,防止寫入拋出異常。
重要實時計算引擎VVR 8.0.1及以上版本,僅
sdkMode='jdbc'
場景下支持該參數。實時計算引擎VVR 8.0.8及以上版本,
僅sdkMode='jdbc_copy'
或sdkMode='jdbc'
場景下支持該參數。
partial-insert.enabled
是否只插入INSERT語句中定義的字段。
Boolean
否
false
參數取值如下:
false(默認值):無論INSERT語句中聲明了哪些字段,都會更新結果表DDL中定義的所有字段,對于未在INSERT語句中聲明的字段,會被更新為null。
true:將INSERT語句中定義的字段下推給連接器,從而可以只對聲明的字段進行更新或插入。
說明僅實時計算引擎VVR 6.0.7及以上版本支持該參數。
此參數僅在mutatetype參數配置為InsertOrUpdate時生效。
deduplication.enabled
jdbc及jdbc_fixed模式寫入攢批過程中,是否進行去重。
Boolean
否
true
參數取值如下:
true(默認值):如果一批數據中有主鍵相同的數據,默認進行去重,只保留最后一條到達的數據。以兩個字段,其中第一個字段為主鍵的數據舉例:
INSERT (1,'a')
和INSERT (1,'b')
兩條記錄先后到達,去重之后只保留后到達的(1,'b')
寫入Hologres結果表中。Hologres結果表中已經存在記錄
(1,'a')
,此時DELETE (1,'a')
和INSERT (1,'b')
兩條記錄先后到達,只保留后到達的(1,'b')
寫入hologres中,表現為直接更新,而不是先刪除再插入。
false:在攢批過程中不進行去重,如果發現新到的數據和目前攢批的數據中存在主鍵相同的情況,先將攢批數據寫入,寫入完成之后再繼續寫入新到的數據。
說明僅實時計算引擎VVR 8.0.5及以上版本支持該參數。
不允許攢批去重時,極端情況下(例如所有數據的主鍵都相同)寫入會退化為不攢批的單條寫入,對性能有一定影響。
維表獨有
參數
說明
數據類型
是否必填
默認值
備注
sdkMode
SDK模式。
String
否
jdbc
參數取值如下:
jdbc(默認值):表示使用jdbc模式進行查詢,支持主鍵點查和非主鍵的查詢,但是非主鍵的查詢對性能影響較大,查詢較慢。
rpc:表示使用rpc模式進行點查,與useRpcMode參數一致,僅支持主鍵點查,即維表的主鍵字段必須與Flink Join On的字段完全匹配,與jdbc模式的區別在于不占用連接數,不支持讀取Hologres的Jsonb,RoarinBitmap類型。
jdbc_fixed:(beta功能,需要hologres引擎版本大于等于1.3)表示使用fixed jdbc方式進行點查,與jdbc模式的區別在于不占用連接數,且不支持讀取Hologres的Jsonb,RoarinBitmap類型。僅支持主鍵點查,即維表的主鍵字段必須與Flink Join On的字段完全匹配。目前此模式暫不支持查詢開起數據脫敏功能的database,詳情請參見數據脫敏。
版本推薦參數如下:
VVR 6.0.3以下版本:不支持配置該參數。
VVR 6.0.4及以上版本:推薦配置為jdbc。
說明在VVR 6.0.6版本,SDK模式選擇為jdbc時,如果維表字符串類型的查詢結果中包含null值,可能拋出空指針異常,此時推薦您使用rpc模式繞過。
VVR 6.0.7~VVR 8.0.1版本:
如果Hologres實例為2.0以下版本,Flink系統將采用您配置的參數值。
如果Hologres實例為2.0及以上版本,由于Hologres 2.0及以上版本下線了rpc服務,此時如果您將該參數配置為了rpc,Flink系統自動將參數值切換為jdbc_fixed。但是如果您配置為其他值,Flink系統將采用您配置的參數值。
VVR 8.0.3及以上版本:
自此版本開始,無論Hologres實例版本,都不再支持rpc模式,如果選擇rpc模式,將自動切換該參數值為jdbc_fixed。
useRpcMode
是否通過RPC方式使用Hologres連接器。
Boolean
否
false
參數取值如下:
true:使用RPC方式使用Hologres連接器。與sdkMode參數設置為rpc效果相同。通過RPC方式會降低SQL連接數。
false(默認值):使用JDBC方式使用Hologres連接器。
通過JDBC方式會占用SQL連接,導致JDBC鏈接數增加。
說明Hologres 2.0版本下線了rpc了服務,推薦使用sdkMode參數來選擇jdbc或者jdbc_fixed模式。
實時計算引擎VVR 6.0.7及VVR 8.0.1版本在檢測到Hologres實例是2.0及以上版本時,會自動切換rpc模式為jdbc_fixed模式。
實時計算引擎VVR 8.0.3及以上版本會自動切換rpc模式為jdbc_fixed模式。
connectionSize
單個Flink維表任務所創建的JDBC連接池大小。
Integer
否
3
如果作業性能不足,建議您增加連接池大小。連接池大小和數據吞吐成正比。
connectionPoolName
連接池名稱。同一個TaskManager中,配置相同名稱的連接池的表可以共享連接池。
String
否
無
取值為非
'default'
的任意字符串。如果多個表設置相同的連接池,則這些使用相同連接池的表的connectionSize參數也需要相同。您可以按需配置此參數,例如作業中有維表A,B以及結果表C,D,E五張hologres表,可以A表和B表使用pool1,C表和D表使用pool2,E表流量較大,單獨使用pool3。
說明VVR 4.0.12以下版本:不支持配置該參數。
VVR 4.0.12~VVR 8.0.3版本:默認不共享,每個表使用自己的連接池。
VVR 8.0.4以上版本:同一個作業中Endpoint相同的表會默認共享連接池。作業中表數量較多時連接數可能相對不足影響性能,這種情況下推薦為不同的表設置不同的connectionPoolName。
jdbcReadBatchSize
點查Hologres維表時,攢批處理的最大條數。
Integer
否
128
無。
jdbcReadBatchQueueSize
維表點查請求緩沖隊列大小。
Integer
否
256
無。
jdbcReadTimeoutMs
維表點查的超時時間。
Long
否
默認值為0,表示不會超時
僅vvr 4.0.15-flink 1.13及以上版本、vvr 6.0.2-flink 1.15及以上版本支持該參數。
jdbcReadRetryCount
維表點查超時時的重試次數。
Interger
否
VVR 8.0.5以下版本:1
VVR 8.0.5及以上版本:10
僅VVR 6.0.3以上版本支持該參數。
本參數與jdbcRetryCount不同,后者是指連接發生異常時的重試次數。
jdbcScanFetchSize
在一對多join(即沒有使用完整主鍵)時使用scan接口,scan攢批處理數據的條數。
Integer
否
256
無。
jdbcScanTimeoutSeconds
scan操作的超時時間。
Integer
否
60
單位為秒。
cache
緩存策略。
String
否
見備注列。
Hologres僅支持None和LRU兩種緩存策略,取值詳情請參見背景信息。
說明Cache默認值和VVR版本有關:
VVR 4.x版本及以上版本,默認值為None。
VVR 4.x版本以下版本,默認值為LRU。
cacheSize
緩存大小。
Integer
否
10000
選擇LRU緩存策略后,可以設置緩存大小。單位為條。
cacheTTLMs
緩存更新時間間隔。
Long
否
見備注列。
單位為毫秒。cacheTTLMs默認值和cache的配置有關:
如果cache配置為LRU,則cacheTTLMs為緩存超時時間。默認不過期。
如果cache配置為None,則cacheTTLMs可以不配置,表示緩存不超時。
cacheEmpty
是否緩存join結果為空的數據。
Boolean
否
true
true(默認值):緩存join結果為空的數據。
false:不緩存join結果為空的數據。
但當join語句中AND前面條件符合而后面條件不符合時,依然會緩存join結果為空的數據。代碼示例如下。
LEFT JOIN latest_emergency FOR SYSTEM_TIME AS OF PROCTIME() AS t2 ON t1.alarm_id = t2.alarm_id -- 如果發現是動態告警,則匹配時加入動態告警id,否則無需考慮動態告警id字段。 AND CASE WHEN alarm_type = 2 THEN t1.dynamic_id = t2.dynamic_alarm_id ELSE true END
async
是否異步返回數據。
Boolean
否
false
true:表示異步返回數據。
false(默認值):表示不進行異步返回數據。
說明異步返回數據是無序的。
類型映射
Flink與Hologres的數據類型映射請參見Blink/Flink與Hologres的數據類型映射。
使用示例
源表示例
非Binlog Source表
CREATE TEMPORARY TABLE hologres_source (
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}',
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'field_delimiter'='|' --該參數可選。
'sdkmode' = 'jdbc'
);
CREATE TEMPORARY TABLE blackhole_sink(
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector'='blackhole'
);
INSERT INTO blackhole_sink
SELECT name, age, birthday
from hologres_source;
Binlog Source表
Hologres連接器支持實時消費Hologres,即實時消費Hologres的Binlog數據。Flink實時消費Hologres詳情請參見實時計算Flink版實時消費Hologres。
結果表示例
CREATE TEMPORARY TABLE datagen_source(
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector'='datagen'
);
CREATE TEMPORARY TABLE hologres_sink (
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}',
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>'
);
INSERT INTO hologres_sink SELECT * from datagen_source;
維表示例
CREATE TEMPORARY TABLE datagen_source (
a INT,
b BIGINT,
c STRING,
proctime AS PROCTIME()
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE hologres_dim (
a INT,
b VARCHAR,
c VARCHAR
) WITH (
'connector' = 'hologres',
...
);
CREATE TEMPORARY TABLE blackhole_sink (
a INT,
b STRING
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink SELECT T.a,H.b
FROM datagen_source AS T JOIN hologres_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;
特色功能詳解
流式語義
流處理,也稱為流數據或流事件處理,即對一系列無界數據或事件連續處理。執行流數據或流事件處理的系統通常允許您指定一種可靠性模式或處理語義,保證整個系統處理數據的準確性,因為網絡或設備故障等可能會導致數據丟失。
根據Hologres Sink的配置和Hologres表的屬性,流式語義分為以下兩種:
Exactly-once(僅一次):即使在發生各種故障的情況下,系統只處理一次數據或事件。
At-least-once(至少一次):如果在系統完全處理之前丟失了數據或事件,則從源頭重新傳輸,因此可以多次處理數據或事件。如果第一次重試成功,則不必進行后續重試。
在Hologres結果表中使用流式語義,需要注意以下幾點:
如果Hologres物理表未設置主鍵,則Hologres Sink使用At-least-once語義。
如果Hologres物理表已設置主鍵,則Hologres Sink通過主鍵確保Exactly-once語義。當同主鍵數據出現多次時,您需要設置mutatetype參數確定更新結果表的方式,mutatetype取值如下:
insertorignore(默認值):保留首次出現的數據,忽略后續所有數據。
insertorreplace:后出現的數據整行替換已有數據。
insertorupdate:更新已有數據的部分列。例如一張表有a、b、c和d四個字段,a是PK(Primary Key),寫入Hologres時只寫入a和b兩個字段,在PK重復的情況下,系統只會更新b字段,c和d保持不變。
說明當mutatetype設置為
insertorreplace
或insertorupdate
時,系統根據主鍵更新數據。Flink定義的結果表中的數據列數不一定要和Hologres物理表的列數一致,您需要保證缺失的列沒有非空約束,即列值可以為Null,否則會報錯。
默認情況下,Hologres Sink只能向一張表導入數據。如果導入數據至分區表的父表,即使導入成功,也會查詢數據失敗。您可以設置參數partitionRouter為true,開啟自動將數據路由到對應分區表的功能。注意事項如下:
tablename參數需要填寫為父表的表名。
如果沒有提前創建分區表,需要在WITH參數中啟用createparttable=true,從而支持自動創建分區表,否則會導入失敗。
寬表Merge和局部更新功能
在把多個流的數據寫到一張Hologres寬表的場景中,會涉及到寬表Merge和數據的局部更新。下面通過示例來介紹實現寬表merge的兩種方式。
本場景的兩種實現方式均具有如下限制:
寬表必須有主鍵。
每個數據流的數據都必須包含完整的主鍵字段。
列存模式的寬表Merge場景在高RPS的情況下,CPU使用率會偏高,建議關閉表中字段的Dictionary Encoding功能。
方式一(推薦)
僅實時計算引擎VVR 6.0.7及以上版本支持使用此方式。
假設有兩個Flink數據流,一個數據流中包含a、b和c字段,另一個數據流中包含a、d和e字段,Hologres寬表WIDE_TABLE包含a、b、c、d和e字段,其中a字段為主鍵。具體操作如下:
使用Flink SQL創建一張Hologres結果表,并聲明a、b、c、d、e五個字段,映射至寬表WIDE_TABLE。
結果表的屬性設置:
mutatetype
設置為insertorupdate
,可以根據主鍵更新數據。ignoredelete
設置為true
,防止回撤消息產生Delete請求。實時計算引擎VVR 8.0.8及以上版本推薦使用sink.delete-strategy
配置局部更新時的各種策略。
將兩個Flink數據流的數據分別INSERT至對應的結果表中。
// 已經定義的source1和source2 CREATE TEMPORARY TABLE hologres_sink ( -- 聲明a,b,c,d,e五個字段 a BIGINT, b STRING, c STRING, d STRING, e STRING, primary key(a) not enforced ) WITH ( 'connector'='hologres', 'dbname'='<yourDbname>', 'tablename'='<yourWideTablename>', -- hologres寬表,包含a,b,c,d,e五個字段 'username' = '${secret_values.ak_id}', 'password' = '${secret_values.ak_secret}', 'endpoint'='<yourEndpoint>', 'mutatetype'='insertorupdate', -- 根據主鍵更新數據 'ignoredelete'='true', -- 忽略回撤消息產生的Delete請求 'partial-insert.enabled'='true' -- 開啟部分列更新參數,支持僅更新INSERT語句中生命的字段 ); BEGIN STATEMENT SET; INSERT INTO hologres_sink(a,b,c) select * from source1; -- 聲明只插入a,b,c三個字段 INSERT INTO hologres_sink(a,d,e) select * from source2; -- 聲明只插入a,d,e三個字段 END;
方式二
僅實時計算引擎VVR 6.0.6及以下版本支持使用此方式。
假設有兩個Flink數據流,一個數據流中包含a、b和c字段,另一個數據流中包含a、d和e字段,Hologres寬表WIDE_TABLE包含a、b、c、d和e字段,其中a字段為主鍵。具體操作如下:
使用Flink SQL創建兩張Hologres結果表,其中一張表只聲明a、b和c字段,另一張表只聲明a、d和e字段。這兩張表都映射至寬表WIDE_TABLE。
兩張結果表的屬性設置:
mutatetype
設置為insertorupdate
,可以根據主鍵更新數據。ignoredelete
設置為true
,防止回撤消息產生Delete請求。實時計算引擎VVR 8.0.8及以上版本推薦使用sink.delete-strategy
配置局部更新時的各種策略。
將兩個Flink數據流的數據分別INSERT至對應的結果表中。
// 已經定義的source1和source2 CREATE TEMPORARY TABLE hologres_sink_1 ( -- 只聲明a,b,c三個字段 a BIGINT, b STRING, c STRING, primary key(a) not enforced ) WITH ( 'connector'='hologres', 'dbname'='<yourDbname>', 'tablename'='<yourWideTablename>', -- hologres寬表,包含a,b,c,d,e五個字段 'username' = '${secret_values.ak_id}', 'password' = '${secret_values.ak_secret}', 'endpoint'='<yourEndpoint>', 'mutatetype' = 'insertorupdate', -- 根據主鍵更新數據 'ignoredelete' = 'true' -- 忽略回撤消息產生的Delete請求 ); CREATE TEMPORARY TABLE hologres_sink_2 ( -- 只聲明a,d,e三個字段 a BIGINT, d STRING, e STRING, primary key(a) not enforced ) WITH ( 'connector'='hologres', 'dbname'='<yourDbname>', 'tablename'='<yourWideTablename>', -- hologres寬表,包含a,b,c,d,e五個字段 'username' = '${secret_values.ak_id}', 'password' = '${secret_values.ak_secret}', 'endpoint'='<yourEndpoint>', 'mutatetype'='insertorupdate', -- 根據主鍵更新數據 'ignoredelete'='true' -- 忽略回撤消息產生的Delete請求 ); BEGIN STATEMENT SET; INSERT INTO hologres_sink_1 select * from source1; INSERT INTO hologres_sink_2 select * from source2; END;
作為CTAS和CDAS的目標端
Hologres支持實時同步單表或整庫級別的數據,在同步過程之中如果上游的表結構發生了變更也會實時同步到Hologres表中。新數據流到Hologres表時,Flink會先觸發Hologres修改相應的表結構,然后再將數據寫入到相應的表中。以上過程全部由Flink自動完成,您無需關心實現細節,詳情請參見CREATE TABLE AS(CTAS)語句和數據庫實時入倉快速入門。
DataStream API
通過DataStream的方式讀寫數據時,需要使用對應的DataStream連接器連接實時計算Flink版,DataStream連接器設置方法請參見DataStream連接器使用方法。Maven中央庫中已經放置了Hologres DataStream連接器。VVR 6.0.7請使用其中的1.15-vvr-6.0.7-1版本的依賴。VVR 8.0.7的依賴請通過ververica-connector-hologres-1.17-vvr-8.0.7.jar下載,在本地調試時,需要使用相應的Uber JAR,詳見本地運行和調試包含連接器的作業,VVR 8.0.7對應的Uber JAR為ververica-connector-hologres-1.17-vvr-8.0.7-uber.jar。
Hologres源表
VVR提供了RichInputFormat的實現類HologresBulkreadInputFormat來讀取Hologres表數據。以下為構建Hologres Source讀取表數據的示例。
VVR 4.0.15
// 初始化讀取的表的Schema,需要和Hologres表Schema的字段匹配,可以只定義部分字段。
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.build();
// Hologres的相關參數。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
// 構建JDBC Options。
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
String query = JDBCUtils.getSimpleSelectFromStatement(
jdbcOptions.getTable(), schema.getFieldNames());
// 構建HologresBulkreadInputFormat讀取表數據。
HologresBulkreadInputFormat inputFormat = new HologresBulkreadInputFormat(jdbcOptions, schema, query);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
env.addSource(new InputFormatSourceFunction<>(inputFormat, typeInfo)).returns(typeInfo)
.print();
env.execute();
VVR 6.0.7&VVR 8.0.7
// set up the Java DataStream API
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 初始化讀取的表的Schema,需要和Hologres表Schema的字段匹配,可以只定義部分字段。
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.build();
// Hologres的相關參數。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
// 構建JDBC Options。
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
HologresBulkreadInputFormat inputFormat = new HologresBulkreadInputFormat(new HologresConnectionParam(config), jdbcOptions, schema, "", -1);
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
env.addSource(new InputFormatSourceFunction<>(inputFormat, typeInfo)).returns(typeInfo)
.print();
env.execute();
XML
Maven中央庫中已經放置了Hologres DataStream連接器。
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-hologres</artifactId>
<version>${vvr-version}</version>
</dependency>
Hologres Binlog源表
VVR提供了Source的實現類HologresBinlogSource來讀取Hologres Binlog數據。以下為構建Hologres Binlog Source的示例。
VVR 4.0.15
// 初始化讀取的表的Schema,需要和Hologres表Schema的字段匹配,可以只定義部分字段。
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.build();
// Hologres的相關參數。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
// 構建JDBC Options。
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
jdbcOptions.setHolohubEndpoint(JDBCUtils.getHolohubEndpoint(jdbcOptions));
RowDataRecordConverter recordConverter = buildRecordConverter(schema, config, jdbcOptions);
// 構建Hologres Binlog Source。
long startTimeMs = 0;
HologresBinlogSource<RowData> source = new HologresBinlogSource<>(
schema,
config,
jdbcOptions,
recordConverter,
startTimeMs);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
env.execute();
VVR 6.0.7
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 初始化讀取的表的Schema,需要和Hologres表Schema的字段匹配,可以只定義部分字段。
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.build();
// Hologres的相關參數。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");
config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
// 構建JDBC Options。
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
// 設置或創建默認slotname
config.setString(HologresBinlogConfigs.JDBC_BINLOG_SLOT_NAME, HoloBinlogUtil.getOrCreateDefaultSlotForJDBCBinlog(jdbcOptions));
boolean cdcMode = config.get(HologresBinlogConfigs.BINLOG_CDC_MODE)
&& config.get(HologresBinlogConfigs.OPTIONAL_BINLOG);
// 構建Binlog Record Converter。
JDBCBinlogRecordConverter recordConverter = new JDBCBinlogRecordConverter(
jdbcOptions.getTable(),
schema,
new HologresConnectionParam(config),
cdcMode,
Collections.emptySet());
// 構建Hologres Binlog Source。
long startTimeMs = 0;
HologresJDBCBinlogSource source = new HologresJDBCBinlogSource(
new HologresConnectionParam(config),
schema,
config,
jdbcOptions,
startTimeMs,
StartupMode.TIMESTAMP,
recordConverter,
"",
-1);
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
env.execute();
VVR 8.0.7
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 初始化讀取的表的Schema,需要和Hologres表Schema的字段匹配,可以只定義部分字段。
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.build();
// Hologres的相關參數。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");
config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
// 構建JDBC Options。
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
// 構建Hologres Binlog Source。
long startTimeMs = 0;
HologresBinlogSource source = new HologresBinlogSource(
new HologresConnectionParam(config),
schema,
config,
jdbcOptions,
startTimeMs,
StartupMode.INITIAL,
"",
"",
-1,
Collections.emptySet());
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
env.execute();
方法buildRecordConverter不在VVR Connector依賴中,是示例代碼中提供的方法。
Hologres Binlog注意事項和實現原理等詳情,請參見Binlog Source表。
Hologres結果表
VVR提供了OutputFormatSinkFunction的實現類HologresSinkFunction來寫入數據。以下為構建Hologres Sink的示例。
VVR 4.0.15
// 初始化讀取的表的Schema。
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.build();
// Hologres的相關參數。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setBoolean(HologresConfigs.USE_RPC_MODE, true);
HologresConnectionParam hologresConnectionParam = new HologresConnectionParam(config);
// 構建Hologres Writer,以RowData的方式寫入數據。
AbstractHologresWriter<RowData> hologresWriter =
buildHologresWriter(schema, config, hologresConnectionParam);
// 構建Hologres Sink。
HologresSinkFunction sinkFunction = new HologresSinkFunction(hologresConnectionParam, hologresWriter);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
int offset = (int) (System.currentTimeMillis() % Integer.MAX_VALUE);
env.fromElements((RowData)GenericRowData.of(2 + offset, StringData.fromString("2")), GenericRowData.of(3 + offset, StringData.fromString("3"))).returns(typeInfo)
.addSink(sinkFunction);
env.execute();
VVR 6.0.7&VVR 8.0.7
// set up the Java DataStream API
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 初始化讀取的表的Schema,需要和Hologres表Schema的字段匹配,可以只定義部分字段。
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.build();
// Hologres的相關參數。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");
HologresConnectionParam hologresConnectionParam = new HologresConnectionParam(config);
// 構建Hologres Writer,以RowData的方式寫入數據。
AbstractHologresWriter<RowData> hologresWriter = HologresJDBCWriter.createRowDataWriter(
hologresConnectionParam, schema, HologresTableSchema.get(hologresConnectionParam), new Integer[0]);
// 構建Hologres Sink。
HologresSinkFunction sinkFunction = new HologresSinkFunction(hologresConnectionParam, hologresWriter);
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
int offset = (int) (System.currentTimeMillis() % Integer.MAX_VALUE);
env.fromElements((RowData)GenericRowData.of(2 + offset, StringData.fromString("2")), GenericRowData.of(3 + offset, StringData.fromString("3"))).returns(typeInfo)
.addSink(sinkFunction);
env.execute();
方法buildHologresWriter不在VVR Connector依賴中,是示例代碼中提供的方法。
Flink與Hologres時區說明
時間類型
產品 | 類型 | 說明 |
Flink | 表示沒有時區信息的日期和時間,描述年、 月、日、小時、分鐘、秒和小數秒對應的時間戳。可以通過一個字符串來指定,例如 | |
用于描述時間線上的絕對時間點,使用long保存從epoch至今的毫秒數,使用int保存毫秒中的納秒數。epoch時間是從Java的標準epoch時間開始計算。在計算和可視化時, 每個TIMESTAMP_LTZ類型的數據都使用Session (會話)中配置的時區。可以用于跨時區的計算,因為它是一個基于epoch的絕對時間點,代表的就是不同時區的同一個絕對時間點。 相同的TIMESTAMP_LTZ值,在不同的時區可能會反映出不同的本地TIMESTAMP,例如:如果一個TIMESTAMP_LTZ值為 | ||
Hologres | TIMESTAMP | 類似于Flink的 |
TIMESTAMP WITH TIME ZONE (TIMESTAMPTZ) | 類似于Flink的 例如北京(UTC+8)時區的時間戳 |
時間類型映射
實時計算引擎VVR 8.0.6及以上版本且
type-mapping.timestamp-converting.legacy=false
時,支持所有時間類型間的相互轉換。Flink
Hologres
詳情
TIMESTAMP
TIMESTAMP
之間相互轉換是直接的,不涉及時區轉換。因此推薦采用該數據映射。
TIMESTAMP LTZ
TIMESTAMPTZ
TIMESTAMP
TIMESTAMPTZ
之間的轉換涉及時區轉換。為了在轉換中保持準確性,需要通過配置項參數
table.local-time-zone
設置Flink時區,配置項參數設置方法請參見如何配置作業運行參數?。例如當設置
'table.local-time-zone': 'Asia/Shanghai'
時,表示Flink時區為上海(+8時區)時,Flink TIMESTAMP類型的數據為2022-01-01 01:01:01.123456,寫入Hologres TIMESTAMP TZ的數值為2022-01-01 01:01:01.123456+8。TIMESTAMP LTZ
TIMESTAMP
實時計算引擎VVR8.0.6及以上版本且
type-mapping.timestamp-converting.legacy=true
時或者VVR 8.0.5及以下版本,除TIMESTAMP間轉化,其他類型相互轉化可能會出現數據偏差問題。Flink
Hologres
備注
TIMESTAMP
TIMESTAMP
之間相互轉換是直接的,不涉及時區轉換。因此推薦采用該數據映射。
TIMESTAMP LTZ
TIMESTAMPTZ
讀寫Hologres數據時都當作無時區時間進行處理,可能會存在數據偏差。
例如,Flink TIMESTAMP_LTZ類型的數值為2024-03-19T04:00:00Z,在上海(+8時區)對應的實際無時區時間為2024-03-19T12:00:00,但是寫入時將2024-03-19T04:00:00當作無時區時間,寫入Hologres TIMESTAMPTZ的數值為2024-03-19T04:00:00+08,數值偏差8小時。
TIMESTAMP
TIMESTAMPTZ
時區轉換默認采用的是運行環境的JVM時區,而不是Flink時區,這與Flink內部計算的時區轉換格式不同。當Flink時區與機器的JVM時區不一致時,會導致數據存在偏差,建議采用Flink時區進行Hologres數據的讀寫。
TIMESTAMP LTZ
TIMESTAMP