通過實時計算Flink集成向量數據
云原生數據倉庫AnalyticDB PostgreSQL版支持通過flink-adbpg-connector集成向量化數據。本文以將Kafka數據導入至AnalyticDB PostgreSQL版為例,介紹如何將向量數據導入AnalyticDB PostgreSQL版。
前提條件
已創建AnalyticDB PostgreSQL版實例。具體操作,請參見創建實例。
已創建Flink全托管工作空間,且與AnalyticDB PostgreSQL版實例位于同一VPC下。具體操作,請參見開通Flink全托管。
若實時計算Flink端使用開源自建版,請確保flink-adbpg-connector已安裝在
$FLINK_HOME/lib
目錄下。若使用公有云托管版,則無任何操作。
AnalyticDB PostgreSQL版數據庫已安裝向量檢索插件FastANN。
您可以在psql客戶端通過
\dx fastann
命令查看是否安裝。如果返回FastANN插件的相關信息,表示已安裝。
如果沒有返回任何信息,請提交工單聯系技術支持進行安裝。
已購買并部署Kafka實例,且Kafka實例與AnalyticDB PostgreSQL版實例位于同一VPC下。具體操作,請參見購買和部署實例。
已將Flink工作空間和Kafka實例所屬的網段加入AnalyticDB PostgreSQL版的白名單。具體操作,請參見設置白名單。
測試數據
為方便測試,AnalyticDB PostgreSQL版提供了測試數據。下載鏈接,請參見vector_sample_data.csv。
測試數據的表結構如下。
字段 | 類型 | 說明 |
id | bigint | 編號。 |
market_time | timestamp | 汽車上市時間。 |
color | varchar(10) | 汽車的顏色。 |
price | int | 汽車的價格。 |
feature | float4[] | 汽車照片的特征向量。 |
操作流程
創建結構化索引和向量化索引
連接AnalyticDB PostgreSQL版數據庫。本文以通過psql客戶端連接數據庫為例,詳情請參見psql連接數據庫。
執行以下命令,創建測試庫并切換至測試庫。
CREATE DATABASE adbpg_test; \c adbpg_test
執行以下命令,創建目標表。
CREATE SCHEMA IF NOT EXISTS vector_test; CREATE TABLE IF NOT EXISTS vector_test.car_info ( id bigint NOT NULL, market_time timestamp, color varchar(10), price int, feature float4[], PRIMARY KEY(id) ) DISTRIBUTED BY(id);
執行以下命令,創建結構化索引和向量化索引。
-- 修改向量列的存儲格式為PLAIN。 ALTER TABLE vector_test.car_info ALTER COLUMN feature SET STORAGE PLAIN; -- 創建結構化索引。 CREATE INDEX ON vector_test.car_info(market_time); CREATE INDEX ON vector_test.car_info(color); CREATE INDEX ON vector_test.car_info(price); -- 創建向量索引。 CREATE INDEX ON vector_test.car_info USING ann(feature) WITH (dim='10', pq_enable='0');
將向量化測試數據寫入Kafka Topic
執行以下命令,創建Kafka Topic。
bin/kafka-topics.sh --create --topic vector_ingest --partitions 1\ --bootstrap-server <your_broker_list>
執行以下命令,將向量測試數據寫入Kafka Topic。
bin/kafka-console-producer.sh\ --bootstrap-server <your_broker_list>\ --topic vector_ingest < ../vector_sample_data.csv
<your_broker_list>
:接入點信息。您可在云消息隊列Kafka版控制臺的實例詳情頁面的接入點信息區域獲取。
創建映射表并導入數據
創建Flink作業。
執行以下命令,創建AnalyticDB PostgreSQL版映射表。
CREATE TABLE vector_ingest ( id INT, market_time TIMESTAMP, color VARCHAR(10), price int, feature VARCHAR )WITH ( 'connector' = 'adbpg-nightly-1.13', 'url' = 'jdbc:postgresql://<your_instance_url>:5432/adbpg_test', 'tablename' = 'car_info', 'username' = '<your_username>', 'password' = '<your_password>', 'targetschema' = 'vector_test', 'maxretrytimes' = '2', 'batchsize' = '3000', 'batchwritetimeoutms' = '10000', 'connectionmaxactive' = '20', 'conflictmode' = 'ignore', 'exceptionmode' = 'ignore', 'casesensitive' = '0', 'writemode' = '1', 'retrywaittime' = '200' );
參數說明,請參見寫入數據到AnalyticDB PostgreSQL版。
執行以下命令,創建Kafka映射表。
CREATE TABLE vector_kafka ( id INT, market_time TIMESTAMP, color VARCHAR(10), price int, feature string ) WITH ( 'connector' = 'kafka', 'properties.bootstrap.servers' = '<your_broker_list>', 'topic' = 'vector_ingest', 'format' = 'csv', 'csv.field-delimiter' = '\t', 'scan.startup.mode' = 'earliest-offset' );
參數說明如下。
參數
是否必填
說明
connector
是
連接器名。固定值為Kafka。
properties.bootstrap.servers
是
接入點信息。您可在云消息隊列Kafka版控制臺的實例詳情頁面的接入點信息區域獲取。
topic
是
Kafka消息所在的Topic名稱。
format
是
寫入Kafka消息Value部分時使用的格式。支持的格式:
csv
json
avro
debezium-json
canal-json
maxwell-json
avro-confluent
raw
csv.field-delimiter
是
CSV字段分隔符。
scan.startup.mode
是
Kafka讀取數據的啟動位點。取值如下:
earliest-offset:從Kafka最早分區開始讀取。
latest-offset:從Kafka最新位點開始讀取。
執行以下命令,創建導入任務。
INSERT INTO vector_ingest SELECT * FROM vector_kafka;