日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

設置Flink Connector

當內置的連接器無法滿足需求時,您可以使用自定義連接器。本文為您介紹如何上傳FeatureStore自定義連接器。

前提條件

開通實時計算Flink并購買實例。具體操作,請參見開通實時計算Flink

操作步驟

  1. 下載FeatureStore ConnectorJAR包。

  2. 進入自定義連接器注冊入口。

    1. 登錄實時計算控制臺

    2. 單擊目標工作空間操作列下的控制臺

    3. 在左側導航欄,單擊連接器

  3. 注冊自定義連接器。

    1. 連接器頁面,單擊創建自定義連接器

    2. 選擇上傳文件,單擊選擇文件,上傳FeatureStore Connector JAR包。

  4. 單擊下一步

    說明

    系統會對您上傳的自定義連接器內容進行解析。如果解析成功,您可以繼續下一步;如果解析失敗,請確認您上傳的自定義連接器代碼是否符合Flink社區標準。

  5. 單擊完成

    說明

    創建完成的自定義連接器可在連接器列表中查看。

  6. (可選)上傳成功后,如果希望其作為維表,請打開Lookup開關。

    其中,Properties詳情請參見附錄:Properties說明

    image

  7. 單擊完成

Flink SQL Demo

  1. FeatureStore創建特征視圖。具體操作,請參見配置FeatureStore項目

    假設在FeatureStore定義了如下實時特征FeatureView。

    image

  2. Flink進行數據寫入。具體操作,請參見SQL作業開發

    示例如下:

    -- 定義數據源表
    CREATE TEMPORARY TABLE server_logs ( 
      user_id BIGINT,
      string_field STRING, 
      int32_field INT, 
      float_field FLOAT,
      double_field DOUBLE,
      boolean_field BOOLEAN
    ) WITH (
      'connector' = 'datagen',   
      'rows-per-second' = '10'
    
    );
    -- 定義結果表
    CREATE TEMPORARY TABLE featurestore_sink (
      user_id BIGINT,
      string_field STRING, 
      int32_field INT, 
      float_field FLOAT,
      double_field DOUBLE,
      boolean_field BOOLEAN
    ) WITH (
      'connector' = 'featurestore',
      'username' = 'xxxx',
      'password' = 'xxx',
      'region_id' = 'cn-beijing',
      'aliyun_access_id' = 'xxxx',
      'aliyun_access_key' = 'xxxx',
      'project' = 'tablestore_p2',
      'feature_view' = 'user_fea3'
    );
         
    -- 寫入數據到結果表
    INSERT INTO featurestore_sink
    	SELECT user_id, string_field, int32_field, float_field, double_field,boolean_field
    FROM server_logs
    

    如果使用維表,使用FeatureStore的表,必須定義主鍵。示例如下:

    -- 定義數據源表
    CREATE TEMPORARY TABLE server_logs ( 
      user_id BIGINT,
      string_field STRING, 
      int32_field INT, 
      int64_field BIGINT,
      float_field FLOAT,
      double_field DOUBLE,
      boolean_field BOOLEAN
    ) WITH (
      'connector' = 'datagen',   
      'rows-per-second' = '10',
      'fields.user_id.min'='1',
      'fields.user_id.max'='100',
      'fields.user_id.kind'='random'
    
    );
    -- 定義結果表
    CREATE TEMPORARY TABLE featurestore_sink (
      user_id BIGINT,
      string_field STRING, 
      int32_field INT, 
      int64_field BIGINT,
      float_field FLOAT,
      double_field DOUBLE,
      boolean_field BOOLEAN,
      PRIMARY KEY (user_id) NOT ENFORCED
    ) WITH (
      'connector' = 'featurestore',
      'username' = 'xxx',
      'password' = 'xxx',
      'region_id' = 'cn-beijing',
      'aliyun_access_id' = 'xxxx',
      'aliyun_access_key' = 'xxxx',
      'project' = 'fs_demo_featuredb',
      'feature_view' = 'user_test_1'
    );
         
    -- 寫入數據到結果表
    INSERT INTO featurestore_sink
    	SELECT S.user_id, 
        COALESCE(S.string_field, F.string_field), 
        COALESCE(S.int32_field, F.int32_field),  
        COALESCE(S.int64_field, F.int64_field),
        COALESCE(S.float_field, F.float_field),
        COALESCE(S.double_field,F.double_field),
        COALESCE(S.boolean_field, F.boolean_field) 
    FROM server_logs S LEFT JOIN featurestore_sink FOR SYSTEM_TIME AS OF PROCTIME() AS F ON  S.user_id = F.user_id

附錄:Propoerties說明

名稱

類型

是否必須

描述

region_id

string

地域標識。例如:

  • 北京:cn-beijing

  • 上海:cn-shanghai

  • 杭州:cn-hangzhou

  • 深圳:cn-shenzhen

  • 新加坡:ap-southeast-1

aliyun_access_id

string

阿里云Access Secret Id。

aliyun_access_key

string

阿里云Access Secret Key。

username

string

FeatureStoreDB數據源的用戶名。

password

string

FeatureStoreDB數據源密碼。

project

string

FeatureStore的項目名稱。

feature_view

string

FeatureStore的特征視圖名稱。

host

string

如果通過公網測試,設置的FeatureStore的公網地址,例如:

  • 北京:paifeaturestore.cn-beijing.aliyuncs.com

  • 杭州:paifeaturestore.cn-hangzhou.aliyuncs.com

  • 上海:paifeaturestore.cn-shanghai.aliyuncs.com

  • 深圳:paifeaturestore.cn-shenzhen.aliyuncs.com

  • 新加坡:paifeaturestore.ap-southeast-1.aliyuncs.com

use_public_address

boolean

否,默認為false

使用FeatureStoreDB的公網地址進行數據寫入,主要用于測試。

insert_mode

string

否,默認值為 full_row_write

數據插入更新的方式,默認是整行的數據替換。如果是部分字段更新,指定 partial_field_write