日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

通過實時計算Flink集成向量數據

更新時間:

云原生數據倉庫AnalyticDB PostgreSQL版支持通過flink-adbpg-connector集成向量化數據。本文以將Kafka數據導入至AnalyticDB PostgreSQL版為例,介紹如何將向量數據導入AnalyticDB PostgreSQL版。

前提條件

  • 已創建AnalyticDB PostgreSQL版實例。具體操作,請參見創建實例。

  • 已創建Flink全托管工作空間,且與AnalyticDB PostgreSQL版實例位于同一VPC下。具體操作,請參見開通Flink全托管。

  • AnalyticDB PostgreSQL版數據庫已安裝向量檢索插件FastANN。

    您可以在psql客戶端通過\dx fastann命令查看是否安裝。

    • 如果返回FastANN插件的相關信息,表示已安裝。

    • 如果沒有返回任何信息,請提交工單聯系技術支持進行安裝。

  • 已購買并部署Kafka實例,且Kafka實例與AnalyticDB PostgreSQL版實例位于同一VPC下。具體操作,請參見購買和部署實例

  • 已將Flink工作空間和Kafka實例所屬的網段加入AnalyticDB PostgreSQL版的白名單。具體操作,請參見設置白名單。

測試數據

為方便測試,AnalyticDB PostgreSQL版提供了測試數據。下載鏈接,請參見vector_sample_data.csv。

測試數據的表結構如下。

字段

類型

說明

id

bigint

編號。

market_time

timestamp

汽車上市時間。

color

varchar(10)

汽車的顏色。

price

int

汽車的價格。

feature

float4[]

汽車照片的特征向量。

操作流程

  1. 創建結構化索引和向量化索引。

  2. 將向量化測試數據寫入Kafka Topic

  3. 創建映射表并導入數據。

創建結構化索引和向量化索引

  1. 連接AnalyticDB PostgreSQL版數據庫。本文以通過psql客戶端連接數據庫為例,詳情請參見psql連接數據庫。

  2. 執行以下命令,創建測試庫并切換至測試庫。

    CREATE DATABASE adbpg_test;
    \c adbpg_test
  3. 執行以下命令,創建目標表。

    CREATE SCHEMA IF NOT EXISTS vector_test;
    CREATE TABLE IF NOT EXISTS vector_test.car_info
    (
      id bigint NOT NULL,
      market_time timestamp,
      color varchar(10),
      price int,
      feature float4[],
      PRIMARY KEY(id)
    ) DISTRIBUTED BY(id);
  4. 執行以下命令,創建結構化索引和向量化索引。

    -- 修改向量列的存儲格式為PLAIN。
    ALTER TABLE vector_test.car_info ALTER COLUMN feature SET STORAGE PLAIN;
    
    -- 創建結構化索引。
    CREATE INDEX ON vector_test.car_info(market_time);
    CREATE INDEX ON vector_test.car_info(color);
    CREATE INDEX ON vector_test.car_info(price);
    
    -- 創建向量索引。
    CREATE INDEX ON vector_test.car_info USING ann(feature) 
    WITH (dim='10', pq_enable='0');

將向量化測試數據寫入Kafka Topic

  1. 執行以下命令,創建Kafka Topic。

    bin/kafka-topics.sh --create --topic vector_ingest --partitions 1\ 
    --bootstrap-server <your_broker_list>
  2. 執行以下命令,將向量測試數據寫入Kafka Topic。

    bin/kafka-console-producer.sh\
    --bootstrap-server <your_broker_list>\
    --topic vector_ingest < ../vector_sample_data.csv

<your_broker_list>:接入點信息。您可在云消息隊列Kafka版控制臺實例詳情頁面的接入點信息區域獲取。

創建映射表并導入數據

  1. 創建Flink作業。

    1. 登錄實時計算控制臺,在Flink全托管頁簽,單擊目標工作空間操作列下的控制臺

    2. 在左側導航欄,單擊SQL開發,單擊新建,選擇空白的流作業草稿,單擊下一步

    3. 新建作業草稿對話框,填寫作業配置信息。

      作業參數

      說明

      示例

      文件名稱

      作業的名稱。

      說明

      作業名稱在當前項目中必須保持唯一。

      adbpg-test

      存儲位置

      指定該作業的代碼文件所屬的文件夾。

      您還可以在現有文件夾右側,單擊新建文件夾圖標,新建子文件夾。

      作業草稿

      引擎版本

      當前作業使用的Flink的引擎版本。引擎版本號含義、版本對應關系和生命周期重要時間點詳情請參見引擎版本介紹。

      vvr-6.0.6-flink-1.15

  2. 執行以下命令,創建AnalyticDB PostgreSQL版映射表。

    CREATE TABLE vector_ingest (
      id INT,
      market_time TIMESTAMP,
      color VARCHAR(10),
      price int,
      feature VARCHAR
    )WITH (
       'connector' = 'adbpg-nightly-1.13',
       'url' = 'jdbc:postgresql://<your_instance_url>:5432/adbpg_test',
       'tablename' = 'car_info',
       'username' = '<your_username>',
       'password' = '<your_password>',
       'targetschema' = 'vector_test',
       'maxretrytimes' = '2',
       'batchsize' = '3000',
       'batchwritetimeoutms' = '10000',
       'connectionmaxactive' = '20',
       'conflictmode' = 'ignore',
       'exceptionmode' = 'ignore',
       'casesensitive' = '0',
       'writemode' = '1',
       'retrywaittime' = '200'
    );

    參數說明,請參見寫入數據到AnalyticDB PostgreSQL版。

  3. 執行以下命令,創建Kafka映射表。

    CREATE TABLE vector_kafka (
      id INT,
      market_time TIMESTAMP,
      color VARCHAR(10),
      price int,
      feature string
    ) 
    WITH (
        'connector' = 'kafka',
        'properties.bootstrap.servers' = '<your_broker_list>',
        'topic' = 'vector_ingest',
        'format' = 'csv',
        'csv.field-delimiter' = '\t',
        'scan.startup.mode' = 'earliest-offset'
    );

    參數說明如下。

    參數

    是否必填

    說明

    connector

    連接器名。固定值為Kafka。

    properties.bootstrap.servers

    接入點信息。您可在云消息隊列Kafka版控制臺的實例詳情頁面的接入點信息區域獲取。

    topic

    Kafka消息所在的Topic名稱。

    format

    寫入Kafka消息Value部分時使用的格式。支持的格式:

    • csv

    • json

    • avro

    • debezium-json

    • canal-json

    • maxwell-json

    • avro-confluent

    • raw

    csv.field-delimiter

    CSV字段分隔符。

    scan.startup.mode

    Kafka讀取數據的啟動位點。取值如下:

    • earliest-offset:從Kafka最早分區開始讀取。

    • latest-offset:從Kafka最新位點開始讀取。

  4. 執行以下命令,創建導入任務。

    INSERT INTO vector_ingest SELECT * FROM vector_kafka;