EasyRec Processor(推薦打分服務(wù))
EAS內(nèi)置的EasyRec Processor支持將EasyRec或TensorFlow訓(xùn)練的推薦模型部署為打分服務(wù),并具備集成特征工程的能力。通過聯(lián)合優(yōu)化特征工程和TensorFlow模型,EasyRec Processor能夠?qū)崿F(xiàn)高性能的打分服務(wù)。本文為您介紹如何部署及調(diào)用EasyRec模型服務(wù)。
背景信息
基于EasyRec Processor的推薦引擎的架構(gòu)圖如下所示:
其中EasyRec Processor主要包含以下模塊:
Item Feature Cache:將FeatureStore里面的特征緩存到內(nèi)存中,可以減少請求FeatureStore帶來的網(wǎng)絡(luò)開銷和壓力。此外,Item特征緩存支持增量更新,例如實時特征的更新。
Feature Generator:特征工程模塊(FG)采用相同的實現(xiàn)保證了離線和在線特征處理的一致性。 特征工程的實現(xiàn)借鑒于淘寶沉淀的特征工程方案。
TFModel:TensorFlow模型加載EasyRec導(dǎo)出的Saved_Model,并結(jié)合Blade做模型在CPU和GPU上的推理優(yōu)化。
特征埋點和模型增量更新模塊:通常應(yīng)用于實時訓(xùn)練場景,詳情請參見實時訓(xùn)練。
使用限制
僅支持使用通用型實例規(guī)格族g6、g7或g8機型(僅支持Intel系列的CPU),支持T4、A10、3090或4090等GPU型號,詳情請參見通用型(g系列)。
版本列表
EasyRec Processor仍然在迭代中,建議使用最新的版本部署推理服務(wù),新的版本將提供更多的功能和更高的推理性能,已經(jīng)發(fā)布的版本:
Processor name | 發(fā)布日期 | Tensorflow版本 | 新增功能 |
easyrec | 20230608 | 2.10 |
|
easyrec-1.2 | 20230721 | 2.10 |
|
easyrec-1.3 | 20230802 | 2.10 |
|
easyrec-1.6 | 20231006 | 2.10 |
|
easyrec-1.7 | 20231013 | 2.10 |
|
easyrec-1.8 | 20231101 | 2.10 |
|
easyrec-kv-1.8 | 20231220 | DeepRec (deeprec2310) |
|
easyrec-1.9 | 20231222 | 2.10 |
|
easyrec-2.4 | 20240826 | 2.10 |
|
步驟一:部署服務(wù)
使用eascmd客戶端部署EasyRec模型服務(wù)時,您需要指定Processor種類為easyrec-{version},關(guān)于如何使用客戶端工具部署服務(wù),詳情請參見服務(wù)部署:EASCMD或DSW。服務(wù)配置文件示例如下:
使用FG的示例
bizdate=$1
cat << EOF > echo.json
{
"name":"ali_rec_rnk_with_fg",
"metadata": {
"instance": 2,
"rpc": {
"enable_jemalloc": 1,
"max_queue_size": 100
}
},
"cloud": {
"computing": {
"instance_type": "ecs.g7.large"",
"instances": null
}
},
"model_config": {
"remote_type": "hologres",
"url": "postgresql://<AccessKeyID>:<AccessKeySecret>@<域名>:<port>/<database>",
"tables": [{"name":"<schema>.<table_name>","key":"<index_column_name>","value": "<column_name>"}],
"period": 2880,
"fg_mode": "tf",
"outputs":"probs_ctr,probs_cvr",
},
"model_path": "",
"processor": "easyrec-1.9",
"storage": [
{
"mount_path": "/home/admin/docker_ml/workspace/model/",
"oss": {
"path": "oss://easyrec/ali_rec_sln_acc_rnk/20221122/export/final_with_fg"
}
}
]
}
EOF
# 執(zhí)行部署命令。
eascmd create echo.json
# eascmd -i <AccessKeyID> -k <AccessKeySecret> -e <endpoint> create echo.json
# 執(zhí)行更新命令
eascmd update ali_rec_rnk_with_fg -s echo.json
不使用FG的示例
bizdate=$1
cat << EOF > echo.json
{
"name":"ali_rec_rnk_no_fg",
"metadata": {
"instance": 2,
"rpc": {
"enable_jemalloc": 1,
"max_queue_size": 100
}
},
"cloud": {
"computing": {
"instance_type": "ecs.g7.large"",
"instances": null
}
},
"model_config": {
"fg_mode": "bypass"
},
"processor": "easyrec-1.9",
"processor_envs": [
{
"name": "INPUT_TILE",
"value": "2"
}
],
"storage": [
{
"mount_path": "/home/admin/docker_ml/workspace/model/",
"oss": {
"path": "oss://easyrec/ali_rec_sln_acc_rnk/20221122/export/final/"
}
}
],
"warm_up_data_path": "oss://easyrec/ali_rec_sln_acc_rnk/rnk_warm_up.bin"
}
EOF
# 執(zhí)行部署命令。
eascmd create echo.json
# eascmd -i <AccessKeyID> -k <AccessKeySecret> -e <endpoint> create echo.json
# 執(zhí)行更新命令
eascmd update ali_rec_rnk_no_fg -s echo.json
其中關(guān)鍵參數(shù)說明如下,其他參數(shù)說明,請參見服務(wù)模型所有相關(guān)參數(shù)說明。
參數(shù) | 是否必選 | 描述 | 示例 |
processor | 是 | EasyRec Processor。 |
|
fg_mode | 是 | 用于指定特征工程模式,取值如下:
|
|
outputs | 是 | tf模型預(yù)測的輸出變量名稱,如probs_ctr。如果是多個則用逗號分隔。如果不清楚輸出變量名稱,請執(zhí)行tf的命令saved_model_cli來查看。 | "outputs":"probs_ctr,probs_cvr" |
save_req | 否 | 是否將請求獲得的數(shù)據(jù)文件保存到模型目錄下,保存的文件可以用來做warmup和性能測試。取值如下:
| "save_req": "false" |
Item Feature Cache相關(guān)參數(shù) | |||
period | 是 | Item feature cache特征周期性更新的間隔,單位是分鐘。如果Item特征是天級更新的話, 一般設(shè)置的值大于一天即可(例如2880,1天1440分鐘,2880即表示兩天),一天之內(nèi)就不需要更新特征了,因為每天例行更新服務(wù)的時候同時也會更新特征。 |
|
remote_type | 是 | Item特征數(shù)據(jù)源, 目前支持:
|
|
tables | 否 | Item特征表,當(dāng)remote_type為hologres時需要配置,包含以下參數(shù):
支持從多個表中讀取輸入Item數(shù)據(jù),配置格式為:
如果多張表有重復(fù)的列,后面的表將覆蓋前面的表。 |
|
url | 否 | Hologres的訪問地址。 |
|
Processor訪問FeatureStore相關(guān)參數(shù) | |||
fs_project | 否 | FeatureStore 項目名稱,使用 FeatureStore 時需指定該字段。 FeatureStore文檔請參考:配置FeatureStore項目。 | "fs_project": "fs_demo" |
fs_model | 否 | FeatureStore模型特征名稱。 | "fs_model": "fs_rank_v1" |
fs_entity | 否 | FeatureStore實體名稱。 | "fs_entity": "item" |
region | 否 | FeatureStore 產(chǎn)品所在的地區(qū)。 | "region": "cn-beijing" |
access_key_id | 否 | FeatureStore 產(chǎn)品的 access_key_id。 | "access_key_id": "xxxxx" |
access_key_secret | 否 | FeatureStore 產(chǎn)品的 access_key_secret。 | "access_key_secret": "xxxxx" |
load_feature_from_offlinestore | 否 | 離線特征是否直接從FeatureStore OfflineStore中獲取數(shù)據(jù),取值如下:
| "load_feature_from_offlinestore": True |
input_tile: 特征自動擴展相關(guān)參數(shù) | |||
INPUT_TILE | 否 | 支持item feature自動broadcast,對于一次請求里面值都相同的feature(例如user_id),可以只傳一個值。
說明
| "processor_envs": [ { "name": "INPUT_TILE", "value": "2" } ] |
EasyRecProcessor的推理優(yōu)化參數(shù)
參數(shù) | 是否必選 | 描述 | 示例 |
TF_XLA_FLAGS | 否 | 在使用GPU前提下,使用 XLA 對模型進行編譯優(yōu)化和自動算子融合 | "processor_envs": [ { "name": "TF_XLA_FLAGS", "value": "--tf_xla_auto_jit=2" }, { "name": "XLA_FLAGS", "value": "--xla_gpu_cuda_data_dir=/usr/local/cuda/" }, { "name": "XLA_ALIGN_SIZE", "value": "64" } ] |
TF調(diào)度參數(shù) | 否 | inter_op_parallelism_threads: 控制執(zhí)行不同操作的線程數(shù) intra_op_parallelism_threads: 控制單個操作內(nèi)部使用的線程數(shù). 一般32核CPU時,使用設(shè)置為16性能較高 | "model_config": { "inter_op_parallelism_threads": 16, "intra_op_parallelism_threads": 16, } |
步驟二:調(diào)用服務(wù)
EasyRec模型服務(wù)部署完成后,在模型在線服務(wù)(EAS)頁面,單擊待調(diào)用服務(wù)服務(wù)方式列下的調(diào)用信息,查看服務(wù)的訪問地址和Token信息。
EasyRec模型服務(wù)的輸入輸出格式為Protobuf格式,根據(jù)是否包含FG,分為以下兩種調(diào)用方法:
包含FG:fg_mode=tf
使用EAS Java SDK
Maven環(huán)境配置請參考Java SDK使用說明,請求服務(wù)ali_rec_rnk_with_fg的示例代碼如下:
import com.aliyun.openservices.eas.predict.http.*;
import com.aliyun.openservices.eas.predict.request.EasyRecRequest;
PredictClient client = new PredictClient(new HttpConfig());
// 通過普通網(wǎng)關(guān)訪問時,需要使用以用戶UID開頭的Endpoint,在EAS控制臺服務(wù)的調(diào)用信息中可以獲得該信息。
client.setEndpoint("xxxxxxx.vpc.cn-hangzhou.pai-eas.aliyuncs.com");
client.setModelName("ali_rec_rnk_with_fg");
// 替換為服務(wù)Token信息。
client.setToken("******");
EasyRecRequest easyrecRequest = new EasyRecRequest(separator);
// userFeatures: 用戶特征, 特征之間用\u0002(CTRL_B)分隔, 特征名和特征值之間用:分隔。
// user_fea0:user_fea0_val\u0002user_fea1:user_fea1_val
// 特征值的格式請參考: https://easyrec.readthedocs.io/en/latest/feature/rtp_fg.html
easyrecRequest.appendUserFeatureString(userFeatures);
// 也可以每次添加一個user特征:
// easyrecRequest.addUserFeature(String userFeaName, T userFeaValue)。
// 特征值的類型T: String, float, long, int。
// contextFeatures: context特征, 特征之間用\u0002(CTRL_B)分隔, 特征名和特征值之間用:分割, 特征值和特征值之間用:分隔。
// ctxt_fea0:ctxt_fea0_ival0:ctxt_fea0_ival1:ctxt_fea0_ival2\u0002ctxt_fea1:ctxt_fea1_ival0:ctxt_fea1_ival1:ctxt_fea1_ival2
easyrecRequest.appendContextFeatureString(contextFeatures);
// 也可以每次添加一個context特征:
// easyrecRequest.addContextFeature(String ctxtFeaName, List<Object> ctxtFeaValue)。
// ctxtFeaValue的類型: String, Float, Long, Integer。
// itemIdStr: 要預(yù)測的itemId的列表,以半角逗號(,)分割。
easyrecRequest.appendItemStr(itemIdStr, ",");
// 也可以每次添加一個itemId:
// easyrecRequest.appendItemId(String itemId)
PredictProtos.PBResponse response = client.predict(easyrecRequest);
for (Map.Entry<String, PredictProtos.Results> entry : response.getResultsMap().entrySet()) {
String key = entry.getKey();
PredictProtos.Results value = entry.getValue();
System.out.print("key: " + key);
for (int i = 0; i < value.getScoresCount(); i++) {
System.out.format("value: %.6g\n", value.getScores(i));
}
}
// 獲取FG之后的特征,以便和離線的特征對比一致性。
// 將DebugLevel設(shè)置成1,即可返回生成的特征。
easyrecRequest.setDebugLevel(1);
PredictProtos.PBResponse response = client.predict(easyrecRequest);
Map<String, String> genFeas = response.getGenerateFeaturesMap();
for(String itemId: genFeas.keySet()) {
System.out.println(itemId);
System.out.println(genFeas.get(itemId));
}
使用EAS Python SDK
環(huán)境配置請參見Python SDK使用說明。在實際應(yīng)用中建議使用Java客戶端。示例代碼:
from eas_prediction import PredictClient
from eas_prediction.easyrec_request import EasyRecRequest
from eas_prediction.easyrec_predict_pb2 import PBFeature
from eas_prediction.easyrec_predict_pb2 import PBRequest
if __name__ == '__main__':
endpoint = 'http://xxxxxxx.vpc.cn-hangzhou.pai-eas.aliyuncs.com'
service_name = 'ali_rec_rnk_with_fg'
token = '******'
client = PredictClient(endpoint, service_name)
client.set_token(token)
client.init()
req = PBRequest()
uid = PBFeature()
uid.string_feature = 'u0001'
req.user_features['user_id'] = uid
age = PBFeature()
age.int_feature = 12
req.user_features['age'] = age
weight = PBFeature()
weight.float_feature = 129.8
req.user_features['weight'] = weight
req.item_ids.extend(['item_0001', 'item_0002', 'item_0003'])
easyrec_req = EasyRecRequest()
easyrec_req.add_feed(req, debug_level=0)
res = client.predict(easyrec_req)
print(res)
其中:
endpoint:需要配置為以用戶UID開頭的Endpoint。在PAI EAS模型在線服務(wù)頁面,單擊待調(diào)用服務(wù)服務(wù)方式列下的調(diào)用信息,可以獲得該信息。
service_name: 服務(wù)名稱,在PAI EAS模型在線服務(wù)頁面獲取。
token:需要配置為服務(wù)Token信息。在調(diào)用信息對話框,可以獲得該信息。
不包含FG:fg_mode=bypass
使用Java SDK
Maven環(huán)境配置請參考Java SDK使用說明,請求服務(wù)ali_rec_rnk_no_fg的示例代碼如下:
import java.util.List;
import com.aliyun.openservices.eas.predict.http.PredictClient;
import com.aliyun.openservices.eas.predict.http.HttpConfig;
import com.aliyun.openservices.eas.predict.request.TFDataType;
import com.aliyun.openservices.eas.predict.request.TFRequest;
import com.aliyun.openservices.eas.predict.response.TFResponse;
public class TestEasyRec {
public static TFRequest buildPredictRequest() {
TFRequest request = new TFRequest();
request.addFeed("user_id", TFDataType.DT_STRING,
new long[]{5}, new String []{ "u0001", "u0001", "u0001"});
request.addFeed("age", TFDataType.DT_FLOAT,
new long[]{5}, new float []{ 18.0f, 18.0f, 18.0f});
// 注意: 如果設(shè)置了INPUT_TILE=2,那么上述值都相同的feature可以只傳一次:
// request.addFeed("user_id", TFDataType.DT_STRING,
// new long[]{1}, new String []{ "u0001" });
// request.addFeed("age", TFDataType.DT_FLOAT,
// new long[]{1}, new float []{ 18.0f});
request.addFeed("item_id", TFDataType.DT_STRING,
new long[]{5}, new String []{ "i0001", "i0002", "i0003"});
request.addFetch("probs");
return request;
}
public static void main(String[] args) throws Exception {
PredictClient client = new PredictClient(new HttpConfig());
// 如果要使用網(wǎng)絡(luò)直連功能,需使用setDirectEndpoint方法, 如:
// client.setDirectEndpoint("pai-eas-vpc.cn-shanghai.aliyuncs.com");
// 網(wǎng)絡(luò)直連需打通在EAS控制臺開通,提供用于訪問EAS服務(wù)的源vswitch,
// 網(wǎng)絡(luò)直連具有更好的穩(wěn)定性和性能。
client.setEndpoint("xxxxxxx.vpc.cn-hangzhou.pai-eas.aliyuncs.com");
client.setModelName("ali_rec_rnk_no_fg");
client.setToken("");
long startTime = System.currentTimeMillis();
for (int i = 0; i < 100; i++) {
try {
TFResponse response = client.predict(buildPredictRequest());
// probs為模型的輸出的字段名, 可以使用curl命令查看模型的輸入輸出:
// curl xxxxxxx.vpc.cn-hangzhou.pai-eas.aliyuncs.com -H "Authorization:{token}"
List<Float> result = response.getFloatVals("probs");
System.out.print("Predict Result: [");
for (int j = 0; j < result.size(); j++) {
System.out.print(result.get(j).floatValue());
if (j != result.size() - 1) {
System.out.print(", ");
}
}
System.out.print("]\n");
} catch (Exception e) {
e.printStackTrace();
}
}
long endTime = System.currentTimeMillis();
System.out.println("Spend Time: " + (endTime - startTime) + "ms");
client.shutdown();
}
}
使用Python SDK
請參考Python SDK使用說明。由于python性能比較差,建議僅在調(diào)試服務(wù)時使用,在生產(chǎn)環(huán)境中應(yīng)使用Java SDK。請求服務(wù)ali_rec_rnk_no_fg的示例代碼如下:
#!/usr/bin/env python
from eas_prediction import PredictClient
from eas_prediction import StringRequest
from eas_prediction import TFRequest
if __name__ == '__main__':
client = PredictClient('http://xxxxxxx.vpc.cn-hangzhou.pai-eas.aliyuncs.com', 'ali_rec_rnk_no_fg')
client.set_token('')
client.init()
req = TFRequest('server_default') # 注意請將 server_dafault 替換為真實模型的 signature_name,詳細見上文的使用說明文檔
req.add_feed('user_id', [3], TFRequest.DT_STRING, ['u0001'] * 3)
req.add_feed('age', [3], TFRequest.DT_FLOAT, [18.0] * 3)
# 注意: 開啟INPUT_TILE=2的優(yōu)化之后, 上述特征可以只傳一個值
# req.add_feed('user_id', [1], TFRequest.DT_STRING, ['u0001'])
# req.add_feed('age', [1], TFRequest.DT_FLOAT, [18.0])
req.add_feed('item_id', [3], TFRequest.DT_STRING,
['i0001', 'i0002', 'i0003'])
for x in range(0, 100):
resp = client.predict(req)
print(resp)
您也可以自行構(gòu)建服務(wù)請求,詳情請參見請求格式。
請求格式
除Python外,使用其他語言客戶端調(diào)用服務(wù)都需要根據(jù).proto文件手動生成預(yù)測的請求代碼文件。如果您希望自行構(gòu)建服務(wù)請求,則可以參考如下protobuf的定義來生成相關(guān)的代碼:
tf_predict.proto: tensorflow模型的請求定義
syntax = "proto3"; option cc_enable_arenas = true; option go_package = ".;tf"; option java_package = "com.aliyun.openservices.eas.predict.proto"; option java_outer_classname = "PredictProtos"; enum ArrayDataType { // Not a legal value for DataType. Used to indicate a DataType field // has not been set. DT_INVALID = 0; // Data types that all computation devices are expected to be // capable to support. DT_FLOAT = 1; DT_DOUBLE = 2; DT_INT32 = 3; DT_UINT8 = 4; DT_INT16 = 5; DT_INT8 = 6; DT_STRING = 7; DT_COMPLEX64 = 8; // Single-precision complex DT_INT64 = 9; DT_BOOL = 10; DT_QINT8 = 11; // Quantized int8 DT_QUINT8 = 12; // Quantized uint8 DT_QINT32 = 13; // Quantized int32 DT_BFLOAT16 = 14; // Float32 truncated to 16 bits. Only for cast ops. DT_QINT16 = 15; // Quantized int16 DT_QUINT16 = 16; // Quantized uint16 DT_UINT16 = 17; DT_COMPLEX128 = 18; // Double-precision complex DT_HALF = 19; DT_RESOURCE = 20; DT_VARIANT = 21; // Arbitrary C++ data types } // Dimensions of an array message ArrayShape { repeated int64 dim = 1 [packed = true]; } // Protocol buffer representing an array message ArrayProto { // Data Type. ArrayDataType dtype = 1; // Shape of the array. ArrayShape array_shape = 2; // DT_FLOAT. repeated float float_val = 3 [packed = true]; // DT_DOUBLE. repeated double double_val = 4 [packed = true]; // DT_INT32, DT_INT16, DT_INT8, DT_UINT8. repeated int32 int_val = 5 [packed = true]; // DT_STRING. repeated bytes string_val = 6; // DT_INT64. repeated int64 int64_val = 7 [packed = true]; // DT_BOOL. repeated bool bool_val = 8 [packed = true]; } // PredictRequest specifies which TensorFlow model to run, as well as // how inputs are mapped to tensors and how outputs are filtered before // returning to user. message PredictRequest { // A named signature to evaluate. If unspecified, the default signature // will be used string signature_name = 1; // Input tensors. // Names of input tensor are alias names. The mapping from aliases to real // input tensor names is expected to be stored as named generic signature // under the key "inputs" in the model export. // Each alias listed in a generic signature named "inputs" should be provided // exactly once in order to run the prediction. map<string, ArrayProto> inputs = 2; // Output filter. // Names specified are alias names. The mapping from aliases to real output // tensor names is expected to be stored as named generic signature under // the key "outputs" in the model export. // Only tensors specified here will be run/fetched and returned, with the // exception that when none is specified, all tensors specified in the // named signature will be run/fetched and returned. repeated string output_filter = 3; // Debug flags // 0: just return prediction results, no debug information // 100: return prediction results, and save request to model_dir // 101: save timeline to model_dir int32 debug_level = 100; } // Response for PredictRequest on successful run. message PredictResponse { // Output tensors. map<string, ArrayProto> outputs = 1; }
easyrec_predict.proto: Tensorflow模型+FG的請求定義
syntax = "proto3"; option cc_enable_arenas = true; option go_package = ".;easyrec"; option java_package = "com.aliyun.openservices.eas.predict.proto"; option java_outer_classname = "EasyRecPredictProtos"; import "tf_predict.proto"; // context features message ContextFeatures { repeated PBFeature features = 1; } message PBFeature { oneof value { int32 int_feature = 1; int64 long_feature = 2; string string_feature = 3; float float_feature = 4; } } // PBRequest specifies the request for aggregator message PBRequest { // Debug flags // 0: just return prediction results, no debug information // 3: return features generated by FG module, string format, feature values are separated by \u0002, // could be used for checking feature consistency check and generating online deep learning samples // 100: return prediction results, and save request to model_dir // 101: save timeline to model_dir // 102: for recall models such as DSSM and MIND, only only return Faiss retrieved results // but also return user embedding vectors. int32 debug_level = 1; // user features map<string, PBFeature> user_features = 2; // item ids, static(daily updated) item features // are fetched from the feature cache resides in // each processor node by item_ids repeated string item_ids = 3; // context features for each item, realtime item features // could be passed as context features. map<string, ContextFeatures> context_features = 4; // embedding retrieval neighbor number. int32 faiss_neigh_num = 5; } // return results message Results { repeated double scores = 1 [packed = true]; } enum StatusCode { OK = 0; INPUT_EMPTY = 1; EXCEPTION = 2; } // PBResponse specifies the response for aggregator message PBResponse { // results map<string, Results> results = 1; // item features map<string, string> item_features = 2; // fg generate features map<string, string> generate_features = 3; // context features map<string, ContextFeatures> context_features = 4; string error_msg = 5; StatusCode status_code = 6; // item ids repeated string item_ids = 7; repeated string outputs = 8; // all fg input features map<string, string> raw_features = 9; // output tensors map<string, ArrayProto> tf_outputs = 10; }