當內置的連接器無法滿足需求時,您可以使用自定義連接器。本文為您介紹如何上傳FeatureStore自定義連接器。
前提條件
開通實時計算Flink并購買實例。具體操作,請參見開通實時計算Flink版。
操作步驟
下載FeatureStore ConnectorJAR包。
進入自定義連接器注冊入口。
登錄實時計算控制臺。
單擊目標工作空間操作列下的控制臺。
在左側導航欄,單擊連接器。
注冊自定義連接器。
在連接器頁面,單擊創建自定義連接器。
選擇上傳文件,單擊選擇文件,上傳FeatureStore Connector JAR包。
單擊下一步。
說明系統會對您上傳的自定義連接器內容進行解析。如果解析成功,您可以繼續下一步;如果解析失敗,請確認您上傳的自定義連接器代碼是否符合Flink社區標準。
單擊完成。
說明創建完成的自定義連接器可在連接器列表中查看。
(可選)上傳成功后,如果希望其作為維表,請打開Lookup開關。
其中,Properties詳情請參見附錄:Properties說明。
單擊完成。
Flink SQL Demo
在FeatureStore創建特征視圖。具體操作,請參見配置FeatureStore項目。
假設在FeatureStore定義了如下實時特征FeatureView。
在Flink進行數據寫入。具體操作,請參見SQL作業開發。
示例如下:
-- 定義數據源表 CREATE TEMPORARY TABLE server_logs ( user_id BIGINT, string_field STRING, int32_field INT, float_field FLOAT, double_field DOUBLE, boolean_field BOOLEAN ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '10' ); -- 定義結果表 CREATE TEMPORARY TABLE featurestore_sink ( user_id BIGINT, string_field STRING, int32_field INT, float_field FLOAT, double_field DOUBLE, boolean_field BOOLEAN ) WITH ( 'connector' = 'featurestore', 'username' = 'xxxx', 'password' = 'xxx', 'region_id' = 'cn-beijing', 'aliyun_access_id' = 'xxxx', 'aliyun_access_key' = 'xxxx', 'project' = 'tablestore_p2', 'feature_view' = 'user_fea3' ); -- 寫入數據到結果表 INSERT INTO featurestore_sink SELECT user_id, string_field, int32_field, float_field, double_field,boolean_field FROM server_logs
如果使用維表,使用FeatureStore的表,必須定義主鍵。示例如下:
-- 定義數據源表 CREATE TEMPORARY TABLE server_logs ( user_id BIGINT, string_field STRING, int32_field INT, int64_field BIGINT, float_field FLOAT, double_field DOUBLE, boolean_field BOOLEAN ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '10', 'fields.user_id.min'='1', 'fields.user_id.max'='100', 'fields.user_id.kind'='random' ); -- 定義結果表 CREATE TEMPORARY TABLE featurestore_sink ( user_id BIGINT, string_field STRING, int32_field INT, int64_field BIGINT, float_field FLOAT, double_field DOUBLE, boolean_field BOOLEAN, PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'featurestore', 'username' = 'xxx', 'password' = 'xxx', 'region_id' = 'cn-beijing', 'aliyun_access_id' = 'xxxx', 'aliyun_access_key' = 'xxxx', 'project' = 'fs_demo_featuredb', 'feature_view' = 'user_test_1' ); -- 寫入數據到結果表 INSERT INTO featurestore_sink SELECT S.user_id, COALESCE(S.string_field, F.string_field), COALESCE(S.int32_field, F.int32_field), COALESCE(S.int64_field, F.int64_field), COALESCE(S.float_field, F.float_field), COALESCE(S.double_field,F.double_field), COALESCE(S.boolean_field, F.boolean_field) FROM server_logs S LEFT JOIN featurestore_sink FOR SYSTEM_TIME AS OF PROCTIME() AS F ON S.user_id = F.user_id
附錄:Propoerties說明
名稱 | 類型 | 是否必須 | 描述 |
region_id | string | 是 | 地域標識。例如:
|
aliyun_access_id | string | 是 | 阿里云Access Secret Id。 |
aliyun_access_key | string | 是 | 阿里云Access Secret Key。 |
username | string | 是 | FeatureStoreDB數據源的用戶名。 |
password | string | 是 | FeatureStoreDB數據源密碼。 |
project | string | 是 | FeatureStore的項目名稱。 |
feature_view | string | 是 | FeatureStore的特征視圖名稱。 |
host | string | 否 | 如果通過公網測試,設置的FeatureStore的公網地址,例如:
|
use_public_address | boolean | 否,默認為false | 使用FeatureStoreDB的公網地址進行數據寫入,主要用于測試。 |
insert_mode | string | 否,默認值為 full_row_write | 數據插入更新的方式,默認是整行的數據替換。如果是部分字段更新,指定 partial_field_write |