實時計算Flink版提供豐富強大的日志數據實時入倉能力。本文為您介紹如何在實時計算管理控制臺上快速構建一個從Kafka到Hologres的數據同步作業。
背景信息
假設消息隊列Kafka實例中有一個名稱為users的Topic,其中有100條JSON數據,代表通過日志文件采集工具或者應用寫入Kafka的日志數據,其數據分布大致如下圖所示。
此時,如果您希望創建一個數據同步的作業,將該Topic中的日志數據都同步到Hologres中,則可以按照以下步驟進行:
本文使用實時計算Flink版提供的CREATE TABLE AS(CTAS)語句,一鍵完成日志數據的同步,以及實時的表結構變更同步。
前提條件
如果您使用RAM用戶或RAM角色等身份訪問,需要確認已具有Flink控制臺相關權限,詳情請參見權限管理。
已創建Flink工作空間,詳情請參見開通實時計算Flink版。
上下游存儲
已創建消息隊列Kafka實例,詳情請參見步驟三:創建資源。
已創建Hologres實例,詳情請參見購買Hologres。
說明消息隊列Kafka和Hologres需要與實時計算Flink版工作空間在相同地域相同VPC下,否則需要打通網絡,詳情請參見如何訪問跨VPC的其他服務?或實時計算Flink版如何訪問公網?。
步驟一:配置IP白名單
為了讓Flink能訪問Kafka和Hologres實例,您需要將Flink工作空間的網段添加到在Kafka和Hologres的白名單中。
步驟二:準備Kafka測試數據
使用實時計算Flink版的模擬數據生成源表作為數據生成器,將數據寫入到Kafka中。請按以下步驟使用實時計算開發控制臺將數據寫入至消息隊列Kafka。
在Kafka控制臺創建一個名稱為users的Topic。
操作詳情請參見步驟一:創建Topic。
創建將數據寫入到Kafka的作業。
登錄實時計算管理控制臺。
單擊目標工作空間操作列下的控制臺。
在左側導航欄,單擊 ,單擊新建。
在新建作業草稿對話框,選擇目標模板(例如:選擇空白的流作業草稿),完成后單擊下一步,填寫作業配置信息。
作業參數
示例
說明
文件名稱
kafka-data-input
作業的名稱。
說明作業名稱在當前項目中必須保持唯一。
存儲位置
作業草稿
指定該作業的代碼文件所屬的文件夾。默認存放在作業草稿目錄。
您還可以在現有文件夾右側,單擊圖標,新建子文件夾。
引擎版本
vvr-8.0.5-flink-1.17
在引擎版本下拉列表中選擇目標引擎版本。
單擊創建。
將以下作業代碼拷貝到作業文本編輯區。
CREATE TEMPORARY TABLE source ( id INT, first_name STRING, last_name STRING, `address` ROW<`country` STRING, `state` STRING, `city` STRING>, event_time TIMESTAMP ) WITH ( 'connector' = 'faker', 'number-of-rows' = '100', 'rows-per-second' = '10', 'fields.id.expression' = '#{number.numberBetween ''0'',''1000''}', 'fields.first_name.expression' = '#{name.firstName}', 'fields.last_name.expression' = '#{name.lastName}', 'fields.address.country.expression' = '#{Address.country}', 'fields.address.state.expression' = '#{Address.state}', 'fields.address.city.expression' = '#{Address.city}', 'fields.event_time.expression' = '#{date.past ''15'',''SECONDS''}' ); CREATE TEMPORARY TABLE sink ( id INT, first_name STRING, last_name STRING, `address` ROW<`country` STRING, `state` STRING, `city` STRING>, `timestamp` TIMESTAMP METADATA ) WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = 'alikafka-host1.aliyuncs.com:9000,alikafka-host2.aliyuncs.com:9000,alikafka-host3.aliyuncs.com:9000', 'topic' = 'users', 'format' = 'json' ); INSERT INTO sink SELECT * FROM source;
請按您的實際配置,修改以下參數配置信息。
參數
示例值
說明
properties.bootstrap.servers
alikafka-host1.aliyuncs.com:9000,alikafka-host2.aliyuncs.com:9000,alikafka-host3.aliyuncs.com:9000
Kafka Broker地址。
格式為host:port,host:port,host:port,以英文逗號(,)分割。
topic
users
Kafka Topic名稱。
啟動作業。
在 頁面,單擊部署。
在部署新版本對話框中,單擊確定。
配置作業資源,資源設置填寫詳情請參見配置作業資源。
在作業啟動。 頁面,單擊目標作業名稱操作列中的啟動。關于作業啟動的配置說明,請參見
您可以在作業運維頁面觀察作業的運行信息和狀態。
由于faker數據源是一個有限流,因此在作業處于運行狀態后,大約1分鐘左右后,作業就會處于完成狀態。當作業結束運行代表作業已經將相關的數據寫入到Kafka的users中。其中,寫入到消息隊列Kafka的JSON數據格式大致如下。
{ "id": 765, "first_name": "Barry", "last_name": "Pollich", "address": { "country": "United Arab Emirates", "state": "Nevada", "city": "Powlowskifurt" } }
步驟三:創建Hologres Catalog
單表同步都需要依賴目標Catalog來創建目標表。因此,您需要通過控制臺創建目標Catalog。本文將以目標Catalog為Hologres Catalog為例,為您進行介紹。
創建名稱為holo的Hologres Catalog,操作步驟詳情請參見創建Hologres Catalog。
您需要在您的目標實例中已創建flink_test_db數據庫,否則創建Catalog會報錯。
步驟四:創建并啟動數據同步作業
登錄實時計算開發控制臺,創建數據同步作業。
登錄實時計算管理控制臺。
單擊目標工作空間操作列下的控制臺。
在左側導航欄,單擊 ,單擊新建。
在新建作業草稿對話框,選擇目標模板(例如:選擇空白的流作業草稿),完成后單擊下一步,填寫作業配置信息。
作業參數
示例
說明
文件名稱
flink-quickstart-test
作業的名稱。
說明作業名稱在當前項目中必須保持唯一。
存儲位置
作業草稿
指定該作業的代碼文件所屬的文件夾。默認存放在作業草稿目錄。
您還可以在現有文件夾右側,單擊圖標,新建子文件夾。
引擎版本
vvr-8.0.5-flink-1.17
在引擎版本下拉列表中選擇目標引擎版本。
單擊創建。
將以下作業代碼拷貝到作業文本編輯區。
將消息隊列Kafka中名稱為users的Topic數據同步至Hologres的flink_test_db數據庫的sync_kafka_users表中。您可以通過以下任意一種方式進行:
通過CATS語句同步
該方式無需您手動在Hologres中創建該表,也無需指明對應的列類型為JSON或JSONB。
CREATE TEMPORARY TABLE kafka_users ( `id` INT NOT NULL, `address` STRING, `offset` BIGINT NOT NULL METADATA, `partition` BIGINT NOT NULL METADATA, `timestamp` TIMESTAMP METADATA, `date` AS CAST(`timestamp` AS DATE), `country` AS JSON_VALUE(`address`, '$.country'), PRIMARY KEY (`partition`, `offset`) NOT ENFORCED ) WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = 'alikafka-host1.aliyuncs.com:9000,alikafka-host2.aliyuncs.com:9000,alikafka-host3.aliyuncs.com:9000', 'topic' = 'users', 'format' = 'json', 'json.infer-schema.flatten-nested-columns.enable' = 'true', -- 自動展開嵌套列。 'scan.startup.mode' = 'earliest-offset' ); CREATE TABLE IF NOT EXISTS holo.flink_test_db.sync_kafka_users WITH ( 'connector' = 'hologres' ) AS TABLE kafka_users;
說明為了避免作業Failover后,作業重啟將重復數據寫入到Hologres中,您可以添加相關主鍵從而唯一地標識數據。當數據重發時,Hologres將會保證相同partition和offset的數據只會保留一份。
通過INSERT INTO語句同步
考慮到Hologres中對于JSON和JSONB類型的數據會進行特殊的優化,您也可以通過INSERT INTO語句將嵌套JSON寫入到Hologres中。
該方式需要您手動在Hologres中創建該表并指明需要對應的列類型為JSON或JSONB,然后通過下文的SQL,會將address數據寫入到 Hologres中類型為JSON的列。
CREATE TEMPORARY TABLE kafka_users ( `id` INT NOT NULL, `address` STRING, -- 該列對應的數據為嵌套JSON。 `offset` BIGINT NOT NULL METADATA, `partition` BIGINT NOT NULL METADATA, `timestamp` TIMESTAMP METADATA, `date` AS CAST(`timestamp` AS DATE), `country` AS JSON_VALUE(`address`, '$.country') ) WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = 'alikafka-host1.aliyuncs.com:9000,alikafka-host2.aliyuncs.com:9000,alikafka-host3.aliyuncs.com:9000', 'topic' = 'users', 'format' = 'json', 'json.infer-schema.flatten-nested-columns.enable' = 'true', -- 自動展開嵌套列。 'scan.startup.mode' = 'earliest-offset' ); CREATE TEMPORARY TABLE holo ( `id` INT NOT NULL, `address` STRING, `offset` BIGINT, `partition` BIGINT, `timestamp` TIMESTAMP, `date` DATE, `country` STRING ) WITH ( 'connector' = 'hologres', 'endpoint' = 'hgpostcn-****-cn-beijing-vpc.hologres.aliyuncs.com:80', 'username' = 'LTAI5tE572UJ44Xwhx6i****', 'password' = 'KtyIXK3HIDKA9VzKX4tpct9xTm****', 'dbname' = 'flink_test_db', 'tablename' = 'sync_kafka_users' ); INSERT INTO holo SELECT * FROM kafka_users;
請按您的實際配置,修改以下參數配置信息。
參數
示例值
說明
properties.bootstrap.servers
alikafka-host1.aliyuncs.com:9000,alikafka-host2.aliyuncs.com:9000,alikafka-host3.aliyuncs.com:9000
Kafka Broker地址。
格式為host:port,host:port,host:port,以英文逗號(,)分割。
topic
users
Kafka Topic名稱。
endpoint
hgpostcn-****-cn-beijing-vpc.hologres.aliyuncs.com:80
Hologres端點。
格式為<ip>:<port>。
username
LTAI5tE572UJ44Xwhx6i****
Hologres用戶名,請填寫阿里云賬號的AccessKey ID。
password
KtyIXK3HIDKA9VzKX4tpct9xTm****
Hologres密碼,請填寫阿里云賬號的AccessKey Secret。
dbname
flink_test_db
Hologres數據庫名稱。
tablename
sync_kafka_users
Hologres表名稱。
說明如果您通過INSERT INTO方式同步數據,則需要提前在目標實例的數據庫中創建sync_kafka_users表和字段。
如果Schema不為Public時,則tablename需要填寫為schema.tableName。
單擊保存。
在 頁面,單擊部署。
在關于作業啟動的配置說明,請參見作業啟動。 頁面,單擊目標作業名稱操作列中的啟動。
單擊啟動。
作業啟動后,您可以在作業運維界面觀察作業的運行信息和狀態。
步驟五:觀察全量同步結果
在實例列表頁面,單擊目標實例名稱。
在頁面右上角,單擊登錄實例。
在元數據管理頁簽,查看users數據庫中同步的sync_kafka_users表結構和數據。
同步后的表結構和數據如下圖所示。
表結構
雙擊sync_kafka_users表名稱,查看表結構。
說明在同步過程中,建議聲明Kafka的Metadata partition和offset作為Hologres表中的主鍵。這樣可以避免由于作業Failover,數據重發導致下游存儲多份相同數據。
表數據
在sync_kafka_users表信息頁面右上角,單擊查詢表后,輸入如下命令,單擊運行。
SELECT * FROM public.sync_kafka_users order by partition, "offset";
表數據結果如下圖所示。
步驟六:觀察自動同步表結構變更
在Kafka控制臺手動發送一條包含新增列的消息。
在實例列表頁面,單擊目標實例名稱。
在Topic管理頁面,單擊目標Topic名稱users。
單擊體驗發送消息。
填寫消息內容。
配置項
示例
發送方式
選中控制臺。
消息Key
填寫為flinktest。
消息內容
將以下JSON內容復制粘貼到消息內容中。
{ "id": 100001, "first_name": "Dennise", "last_name": "Schuppe", "address": { "country": "Isle of Man", "state": "Montana", "city": "East Coleburgh" }, "house-points": { "house": "Pukwudgie", "points": 76 } }
說明該示例中house-points是一個新增的嵌套列。
發送到指定分區
選中是。
分區ID
填寫為0。
單擊確定。
在Hologres控制臺,查看sync_kafka_users表結構和數據的變化。
在實例列表頁面,單擊目標實例名稱。
在頁面右上角,單擊登錄實例。
在元數據管理頁簽,雙擊sync_kafka_users表名稱。
單擊查詢表后,輸入如下命令,單擊運行。
SELECT * FROM public.sync_kafka_users order by partition, "offset";
查看表數據結果。
表數據結果如下圖所示。
可以觀察到id為100001的數據已經成功地寫入到了Hologres中。同時,Hologres中多了house-points.house和house-points.points 兩列。
說明雖然插入到Kafka中的數據僅只有一個嵌套列house-points,但是由于在kafka_users表的WITH參數內聲明要求json.infer-schema.flatten-nested-columns.enable,那么Flink 就會自動展平新增的嵌套列,并用訪問該列的路徑作為展開后的列的名字。
(可選)步驟七:調整作業資源配置
根據數據量的不同,我們往往需要調節不同節點的并發和資源,以達到更優的作業性能。您可以使用資源配置的基礎模式簡單配置作業并發度和CU數,也可以使用資源配置的專家模式細粒度地調整節點的并發和資源。
登錄實時計算開發控制臺,進入作業詳情頁面。
登錄實時計算管理控制臺。
單擊目標工作空間操作列下的控制臺。
在左側導航欄,單擊 。
修改資源配置。
在 頁面,單擊目標作業名稱。
在狀態總覽頁面,查看調整效果。
相關文檔
CREATE TABLE AS (CTAS) 語法功能介紹,請參見CREATE TABLE AS(CTAS)語句。
消息隊列Kafka作為表結構變更數據源功能介紹,請參見消息隊列Kafka源表。