Flink全托管產品(Flink Serverless)是基于Apache Flink構建的全托管產品,為您提供全托管的實時計算服務。Hologres與Flink全托管高度兼容,支持Hologres源表、維表和結果表,以及Catalog等功能,滿足一站式實時數倉建設。本文為您介紹如何使用Hologres作為Flink全托管的上下游存儲。
Hologres源表、維表和結果表
在Flink全托管中支持Hologres源表、維表和結果表,詳細的使用說明請參見實時數倉Hologres。
Flink實時消費Binlog
Hologres Connector支持實時消費Binlog,從VVR 6.0.3版本開始支持JDBC模式的流式源表,相比Holohub模式,支持更多的數據類型以及支持自定義賬號,詳情請參見Flink實時消費Binlog。
Hologres Catalog
Flink全托管支持Hologres Catalog,在Flink全托管控制臺直接讀取Hologres元數據,不用再手動注冊Hologres表,可以提高作業開發的效率且能保證表結構的正確性,詳情請參見管理Hologres Catalog。
基于Hologres Catalog,目前全托管的Flink已經支持模型演進(schema evolution)以及整庫同步能力,詳情請參見CREATE TABLE AS(CTAS)語句和CREATE DATABASE AS(CDAS)語句。
Hologres DataStream Connector
如果您通過DataStream的方式讀寫Hologres數據,則需要使用Hologres DataStream Connector連接Flink全托管,詳情請參見Hologres DataStream Connector。
數據類型映射
Flink全托管與Hologres的數據類型映射,請參見數據類型匯總。
不論是將Hologres作為源表、維表還是結果表,在Flink的SQL作業中,都要將數據定義為Flink的數據類型。只有在Hologres中創建內部表時,才需要定義為Hologres的數據類型。
如果期望使用Flink將JSON數據寫入Hologres,則在Flink SQL作業中,需要將源表、結果表中的該列都定義為VARCHAR類型,僅在Hologres內部表中將其定義為JSONB類型,示例如下:
Hologres創建內部表:message定義為JSONB類型。
BEGIN ; DROP TABLE IF EXISTS holo_internal_table; CREATE TABLE IF NOT EXISTS holo_internal_table ( id BIGINT NOT NULL, message JSONB NOT NULL ); CALL set_table_property('holo_internal_table', 'distribution_key', 'id'); COMMIT ;
Flink作業:源表、結果表的message都定義為VARCHAR類型。
CREATE TEMPORARY TABLE randomSource ( id BIGINT, message VARCHAR ) WITH ('connector' = 'datagen'); CREATE TEMPORARY TABLE sink_holo ( id BIGINT, message VARCHAR ) WITH ( 'connector' = 'hologres', 'endpoint' = '', 'username' = '', 'password' = '', 'dbname' = '', 'tablename' = 'holo_internal_table' ); INSERT INTO sink_holo SELECT 1, '{"k":"v"}' FROM randomSource;
Connector Release Note
版本說明請參見Hologres Connector Release Note。
常見問題
Flink全托管使用過程中,常見問題及解決方案請參見Blink和Flink常見問題及診斷。