本文為您介紹開源Flink 1.11如何實時寫入數據至Hologres。
前提條件
背景信息
從開源Flink1.11版本開始,Hologres代碼已開源,相應版本的Connector已經在中央倉庫發布Release包,可在項目中參照如下pom文件進行配置。詳細內容請參見Hologres GitHub官方庫。
<dependency>
<groupId>com.alibaba.hologres</groupId>
<artifactId>hologres-connector-flink-1.15</artifactId>
<version>1.4.0</version>
<classifier>jar-with-dependencies</classifier>
</dependency>
開源Flink版本與hologres-connector-flink
最新版本對應關系如下,建議使用1.15及以上版本,功能更豐富:
Flink版本 | Connector版本 |
Flink 1.11 | hologres-connector-flink-1.11:1.0.1 |
Flink 1.12 | hologres-connector-flink-1.12:1.0.1 |
Flink 1.13 | hologres-connector-flink-1.13:1.3.2 |
Flink 1.14 | hologres-connector-flink-1.14:1.3.2 |
Flink 1.15 | hologres-connector-flink-1.15:1.4.1 |
Flink 1.17 | hologres-connector-flink-1.17:1.4.1 |
Flink SQL寫入數據至Hologres代碼示例
您可以參照如下代碼示例,通過將Flink SQL將數據寫入Hologres。其中,更多詳細的代碼示例請參見Hologres GitHub官方庫。
String createHologresTable =
String.format(
"create table sink("
+ " user_id bigint,"
+ " user_name string,"
+ " price decimal(38,2),"
+ " sale_timestamp timestamp"
+ ") with ("
+ " 'connector'='hologres',"
+ " 'dbname' = '%s',"
+ " 'tablename' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'endpoint' = '%s'"
+ ")",
database, tableName, userName, password, endPoint);
tEnv.executeSql(createHologresTable);
createScanTable(tEnv);
tEnv.executeSql("insert into sink select * from source");
更多詳盡的代碼示例請參見hologres-connector-flink-examples,包括如下示例。
FlinkSQLToHoloExample:一個使用純Flink SQL接口實現的應用,將數據寫入至Hologres。
FlinkDSAndSQLToHoloExample:一個混合Flink DataStream以及SQL 接口實現的應用,寫入Hologres前,將DataStream轉換成Table,之后再用Flink SQL寫入Hologres。
FlinkDataStreamToHoloExample:一個使用純Flink DataStream接口實現的應用,將數據寫入至Hologres。
FlinkRoaringBitmapAggJob:一個使用Flink及RoaringBitmap,結合Hologres維表,實現實時去重統計UV的應用,并將統計結果寫入Hologres。
通過Flink DataStream接口的實時數據寫入方法,可以對數據進行基于Hologres Shard的Repartition操作,有效減少寫入Hologres實例時的小文件數量,從而提升寫入性能并降低系統負載。該方法適用于需要批量導入具有主鍵的空表,實現類似Insert Overwrite的場景。
Hologres Flink Connector參數說明
您可以將Flink數據寫入Hologres,Hologres Flink Connector相關參數具體內容如下:
參數 | 是否必填 | 說明 |
connector | 是 | 結果表類型,固定值為hologres。 |
dbname | 是 | Hologres的數據庫名稱。 |
tablename | 是 | Hologres接收數據的表名稱。 |
username | 是 | 當前阿里云賬號的AccessKey ID。 您可以單擊AccessKey 管理,獲取AccessKey ID。 |
password | 是 | 當前阿里云賬號的AccessKey Secret。 您可以單擊AccessKey 管理,獲取AccessKey Secret。 |
endpoint | 是 | Hologres的VPC網絡地址。進入Hologres管理控制臺的實例詳情頁,獲取Endpoint。 說明 endpoint需包含端口號,格式為 |
連接參數
參數
是否必填
說明
connectionSize
否
單個Flink Hologres Task所創建的JDBC連接池大小。
默認值:3,和吞吐成正比。
connectionPoolName
否
連接池名稱,同一個TaskManager中,表配置同名的連接池名稱可以共享連接池。
無默認值,每個表默認使用自己的連接池。如果設置連接池名稱,則所有表的connectionSize需要相同
fixedConnectionMode
否
寫入和點查不占用連接數(beta功能,需要connector版本>=1.2.0,hologres引擎版本>=1.3)
默認值:false
jdbcRetryCount
否
當連接故障時,寫入和查詢的重試次數。
默認值:10。
jdbcRetrySleepInitMs
否
每次重試的等待時間=retrySleepInitMs+retry*retrySleepStepMs。
默認值:1000ms。
jdbcRetrySleepStepMs
否
每次重試的等待時間=retrySleepInitMs+retry*retrySleepStepMs。
默認值:5000ms。
jdbcConnectionMaxIdleMs
否
寫入線程和點查線程數據庫連接的最大Idle時間,超過連接將被釋放。
默認值:60000ms。
jdbcMetaCacheTTL
否
TableSchema信息的本地緩存時間。
默認值:60000ms。
jdbcMetaAutoRefreshFactor
否
當TableSchema cache剩余存活時間短于
metaCacheTTL/metaAutoRefreshFactor
將自動刷新cache。默認值:-1,表示不自動刷新。
connection.ssl.mode
否
是否啟用數據傳輸加密。取值說明如下:
disable(默認值):不啟用傳輸加密。
require:啟用SSL,只對數據鏈路加密。
verify-ca:啟用SSL,加密數據鏈路,同時使用CA證書驗證Hologres服務端的真實性。
verify-full:啟用SSL,加密數據鏈路,使用CA證書驗證Hologres服務端的真實性,同時比對證書內的CN或DNS與連接時配置的Hologres連接地址是否一致。
connection.ssl.root-cert.location
否
CA證書的路徑,且確保已經將CA證書上傳至Flink集群環境。
說明當connection.ssl.mode參數配置為verify-ca或verify-full時,需要配置此參數。
jdbcDirectConnect
否
是否開啟直連。取值說明如下:
false(默認值):不開啟。
true:開啟。
Flink批量寫入的瓶頸是VIP Endpoint的網絡吞吐。開啟此參數會測試當前環境能否直連Hologres FE,若支持默認使用直連。
寫入參數
參數
是否必填
說明
mutatetype
否
數據寫入模式,詳情請參見流式語義。
默認值:insertorignore。
ignoredelete
否
是否忽略撤回消息。通常Flink的Group By會產生回撤消息,回撤消息到Hologres connector會產生Delete請求。
默認值:true,僅在使用流式語義時生效。
createparttable
否
當寫入分區表時,是否自動根據分區值自動創建分區表。
false(默認值):不會自動創建。
true:自動創建。
建議慎用該功能,確保分區值不會出現臟數據導致創建錯誤的分區表。
ignoreNullWhenUpdate
否
當
mutatetype='insertOrUpdate'
時,是否忽略更新寫入數據中的Null值。默認值:false。
jdbcWriteBatchSize
否
Hologres Sink節點數據攢批的最大批大小。
默認值:256
jdbcWriteBatchByteSize
否
Hologres Sink節點單個線程數據攢批的最大字節大小。
默認值:2097152(2 * 1024 * 1024),2MB。
jdbcWriteBatchTotalByteSize
否
Hologres Sink節點所有數據攢批的最大字節大小。
默認值:20971520(20 * 1024 * 1024),20MB。
jdbcWriteFlushInterval
否
Hologres Sink節點數據攢批的最長Flush等待時間。
默認值:10000,即10秒。
jdbcUseLegacyPutHandler
否
寫入SQL格式,取值說明如下:
true:寫入SQL格式為
insert into xxx(c0,c1,...) values (?,?,...),... on conflict;
。false(默認值):寫入SQL格式為
insert into xxx(c0,c1,...) select unnest(?),unnest(?),... on conflict
。
jdbcEnableDefaultForNotNullColumn
否
設置為true時,not null且未在表上設置default的字段傳入null時,將以默認值寫入。String類型默認為"",Number類型默認為0,Date、timestamp、timestamptz 類型默認為1970-01-01 00:00:00。
默認值:true。
remove-u0000-in-text.enabled
否
是否自動替換TEXT數據類型中的非UTF-8的u0000字符。取值說明如下:
true:替換。
false(默認值):不替換。
deduplication.enabled
否
若一批寫入的數據中有主鍵相同的數據,是否進行去重。取值說明如下:
true(默認值):進行去重,只保留后到達的這條數據。
false:不進行去重。
首先將批處理數據寫入,待寫入完成后再繼續寫入后到達的數據。
說明在不進行去重時,極端情況下(例如所有數據的主鍵都相同),寫入操作會退化為非批量寫入,對性能會產生一定影響。
aggressive.enabled
否
是否啟用激進提交模式。取值說明如下:
true:啟用。
啟用后,即使批量處理沒有達到預期的條數,只要檢測到連接空閑,系統就會強制提交數據。在流量較小時,這種方法可以有效減少數據傳輸的延時。
false(默認值):不啟用。
jdbcCopyWriteMode
否
數據寫入方式。取值說明如下:
true:使用Copy方式寫入,Copy寫入方式分為流式Copy(Fixed Copy)和批量Copy,當前默認使用流式Copy(Fixed Copy)方式寫入。
說明與使用INSERT方式寫入相比,Fixed Copy方式可以實現更高的吞吐(因為采用流模式),更低的數據延時以及更低的客戶端內存消耗(因為不需要攢批數據),但不支持數據回撤功能。
false(默認值):使用INSERT方式寫入。
說明僅Hologres V1.3.1及以上版本支持此參數。
jdbcCopyWriteFormat
否
底層是否采用二進制協議。
binary(默認值):表示使用二進制模式,二進制會更快。
text:為文本模式。
說明僅Hologres V1.3.1及以上版本支持此參數。
bulkLoad
是否使用批量Copy方式寫入。取值說明如下:
true:使用批量Copy方式寫入,僅jdbcCopyWriteMode參數也設置為true時,該參數才會生效,否則使用Fixed Copy方式寫入。
說明批量Copy相較于流式Copy(Fixed Copy),具備更高的效率,能更好地利用Hologres的資源,從而在數據寫入過程中提供更優的性能,您可以根據業務需要,選擇合適的數據寫入方式。
在對主鍵表進行批量Copy寫入時,通常會出現表鎖的情況,您可以通過配置target-shards.enabled參數為true,將寫入鎖粒度降至Shard級別,從而允許并發執行多個批量導入任務,減少了表鎖的發生。相比Fixed Copy模式,批量Copy寫入有主鍵表時,通過這種方式能夠顯著降低Hologres實例的負載,實測顯示,可以減少約66.7%的負載。
批量Copy寫入時,如果目標表包含主鍵,要求在寫入之前目標表為空表,否則寫入過程中進行主鍵去重會影響寫入性能。
false(默認值):不使用。
說明僅Hologres V1.4.0及以上版本支持此參數。
target-shards.enabled
是否啟用Target Shard批量寫入。取值說明如下:
true:啟用Target Shard批量寫入,當源數據已按Shard重新分區時,可以將寫入鎖粒度降至Shard級別。
false(默認值):不啟用。
說明僅Hologres V1.4.1及以上版本支持此參數。
點查參數
參數
是否必填
說明
jdbcReadBatchSize
否
維表點查最大批次大小。
默認值:128。
jdbcReadBatchQueueSize
否
維表點查請求緩沖隊列大小。
默認值:256。
async
否
是否采用異步方式同步數據。
默認值:false。異步模式可以并發地處理多個請求和響應,從而連續的請求之間不需要阻塞等待,提高查詢的吞吐。但在異步模式下,無法保證請求的絕對順序。
cache
否
緩存策略。
默認值:None。Hologres僅支持以下兩種緩存策略:None:無緩存。LRU:緩存維表里的部分數據。源表的每條數據都會觸發系統先在Cache中查找數據,如果未找到,則去物理維表中查找。
cachesize
否
緩存大小。
默認值:10000。選擇LRU緩存策略后,可以設置緩存大小。
cachettlms
否
更新緩存的時間間隔,單位為毫秒。
當選擇LRU緩存策略后,可以設置緩存失效的超時時間,默認不過期。
cacheempty
否
是否緩存join結果為空的數據。
默認值:true,表示緩存join結果為空的數據。false:表示不緩存join結果為空的數據。
數據類型映射
當前Flink全托管與Hologres的數據類型映射請參見Blink/Flink與Hologres的數據類型映射。