本篇文檔將詳細介紹自定義排序模型用到的JSON文件配置以及提供給用戶自行實現的代碼示例。
介紹
本篇文檔將詳細介紹自定義排序模型用到的JSON文件配置以及提供給用戶自行實現的代碼示例。
快速入門
用戶需要實現CustomModel繼承BaseModel:
框架會通過build函數觸發構圖,具體調用步驟如下:
def build(self):
self.build_placeholder()
self.build_model()
self.setup_global_step()
self.reg_loss()
self.loss_op()
self.update_op()
self.training_op()
self.predictions_op()
self.mark_output()
self.metrics_op()
self.summary_op()
self.trace_sample_op()
用戶需要實現以下幾個方法:
def build_model(self):
pass
def update_op(self):
pass
def reg_loss(self):
pass
def training_op(self):
pass
def loss_op(self):
pass
CustomModel代碼參考:
from collections import OrderedDict
import tensorflow as tf
from tensorflow.contrib import layers
from tensorflow.contrib.framework.python.ops import arg_scope
from tensorflow.python.framework import ops
from tensorflow.python.ops import variable_scope
from model_ops.tflog import tflogger as logging
import model_ops.optimizer_ops as myopt
from model.base_model import BaseModel
from model_ops import ops as base_ops
from model_ops import utils
class CustomModel(BaseModel):
def __init__(self,
config,
name="CTR"):
super(CustomModel, self).__init__(config,name)
# Define model variables collection
self.collections_dnn_hidden_layer = "{}_dnn_hidden_layer".format(self.name)
self.collections_dnn_hidden_output = "{}_dnn_hidden_output".format(self.name)
self.layer_dict = OrderedDict()
self.embedding_columns = ['feature1','feature2']
for feature_name in self.embedding_columns:
self.generate_embedding_feature_column(feature_name,hash_bucket_size=1000,dimension=16,initializer=tf.zeros_initializer,combiner="sum",is_share_embedding=False,shared_embedding_name=None)
"""
self.real_valued_columns = ['feature3','feature4']
for feature_name in self.real_valued_columns:
self.generate_real_valued_feature_column(feature_name,dtype="Float",value_dimension=1)
self.sparse_id_columns = ['feature5','feature6']
for feature_name in self.sparse_id_columns:
self.generate_sparse_id_feature_column(feature_name,hash_bucket_size=1000,dimension=16,combiner="sum",is_share_embedding=False,shared_embedding_name=None)
"""
self.embedding_partitino_size = 4 * 1024 * 1024
self.dnn_partition_size = 64 * 1024
self.dnn_l2_reg = 1e-6
self.clip_gradients = 5.0
self.dnn_hidden_units = [1024, 512, 256]
def build_placeholder(self):
try:
self.is_training = tf.get_default_graph().get_tensor_by_name("training:0")
except KeyError:
self.is_training = tf.placeholder(tf.bool, name="training")
def setup_global_step(self):
global_step = tf.Variable(
initial_value=0,
name="global_step",
trainable=False,
dtype=tf.int64,
collections=[tf.GraphKeys.GLOBAL_STEP, tf.GraphKeys.GLOBAL_VARIABLES])
self.global_step = global_step
def embedding_layer(self):
with tf.variable_scope(name_or_scope="Embedding_Layer",
partitioner=base_ops.partitioner(self.config.ps_num,
self.embedding_partitino_size),
reuse=tf.AUTO_REUSE) as scope:
logging.info('ps num: {}, embedding prtition size: {} \n scope :{}'.format(self.config.ps_num,self.embedding_partitino_size,scope))
self.layer_dict['dnn'] = layers.input_from_feature_columns(self.features,
self.feature_columns_from_column_names(
self.embedding_columns),
scope=scope)
def dnn_layer(self):
dnn_layer = []
dnn_layer.append(self.layer_dict['dnn'])
with tf.variable_scope(name_or_scope="{}_Score_Network".format(self.name),
partitioner=base_ops.partitioner(self.config.ps_num,
self.dnn_partition_size)):
self.dnn_net = tf.concat(values=dnn_layer, axis=1)
with arg_scope(base_ops.model_arg_scope(weight_decay=self.dnn_l2_reg)):
for layer_id, num_hidden_units in enumerate(self.dnn_hidden_units):
with variable_scope.variable_scope("hiddenlayer_{}".format(layer_id)) as dnn_hidden_layer_scope:
tf.contrib.layers.apply_regularization(
regularizer=tf.contrib.layers.l2_regularizer(float(self.dnn_l2_reg)),
weights_list=[self.dnn_net])
self.dnn_net = layers.fully_connected(
self.dnn_net,
num_hidden_units,
utils.getActivationFunctionOp("llrelu"),
scope=dnn_hidden_layer_scope,
variables_collections=[self.collections_dnn_hidden_layer],
outputs_collections=[self.collections_dnn_hidden_output],
normalizer_fn=layers.batch_norm,
normalizer_params={"scale": True, "is_training": self.is_training})
def logits_layer(self):
with tf.variable_scope(name_or_scope="{}_Logits".format(self.name),
partitioner=base_ops.partitioner(self.config.ps_num,
self.dnn_partition_size)) as dnn_logits_scope:
with arg_scope(base_ops.model_arg_scope(weight_decay=self.dnn_l2_reg)):
self.logits = layers.linear(
self.dnn_net,
1,
scope=dnn_logits_scope,
variables_collections=[self.collections_dnn_hidden_layer],
outputs_collections=[self.collections_dnn_hidden_output])
def build_model(self):
self.embedding_layer()
self.dnn_layer()
self.logits_layer()
def update_op(self):
update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS)
self.update_ops = []
for update_op in update_ops:
if update_op.name.startswith(self.name):
self.update_ops.append(update_op)
def reg_loss(self):
reg_losses = tf.get_collection(tf.GraphKeys.REGULARIZATION_LOSSES)
self.reg_losses = []
for reg_loss in reg_losses:
if reg_loss.name.startswith(self.name):
self.reg_losses.append(reg_loss)
self.reg_loss = tf.reduce_sum(self.reg_losses)
def training_op(self):
if self.config.predict:
self.train_op = None
return
with tf.variable_scope(name_or_scope="Optimize_Layer",
reuse=tf.AUTO_REUSE):
gs = tf.train.get_or_create_global_step()
logging.info("Global_step:{},{}".format(self.name, str(gs)))
logging.info("Model_name:{},train_op_final_loss:{}".format(self.name, str(self.loss)))
self.train_op, _, _ = myopt.optimize_loss(
loss=self.loss,
global_step=self.global_step,
learning_rate=0.01,
optimizer=tf.train.AdamAsyncOptimizer(learning_rate=0.01, beta1=0.9,
beta2=0.999, epsilon=1e-8,
use_locking=False),
update_ops=self.update_ops,
clip_gradients=self.clip_gradients,
variables=ops.get_collection(ops.GraphKeys.TRAINABLE_VARIABLES),
increment_global_step=True,
summaries=myopt.OPTIMIZER_SUMMARIES)
def loss_op(self):
with tf.name_scope("{}_Loss_Op".format(self.name)):
label = self.label
self.loss = tf.reduce_mean(
tf.nn.sigmoid_cross_entropy_with_logits(
logits=self.logits,
labels=label))
self.loss = self.loss + self.reg_loss
def metrics_op(self):
super(CustomModel, self).metrics_op()
def summary_op(self):
with tf.name_scope("{}_Metrics_Scalar".format(self.name)):
for key, metric in self.metrics.items():
tf.summary.scalar(name=key, tensor=metric)
with tf.name_scope("{}_Layer_Summary".format(self.name)):
base_ops.add_norm2_summary(self.collections_dnn_hidden_layer)
base_ops.add_dense_output_summary(self.collections_dnn_hidden_output)
base_ops.add_weight_summary(self.collections_dnn_hidden_layer)
Feature
樣本輸入可以通過self.features獲取,key為用戶配置的特征名。
請一律使用 contrib.layers.input_from_feature_columns
,來實現 embedding 功能,不要用其他 embedding 函數,否則會出現無法線上預估問題。目前只支持 sparse_column_with_hash_bucket
,embedding_column
,real_valued_column
,shared_embedding_columns
這幾種 column。注意同一個embedding_column不能用兩次,要用shared_embedding_columns。
開發建議:
為了避免線上模型兼容問題,我們封裝了幾個column接口,建議直接使用這幾個接口:
#生成embedding_column
self.generate_embedding_feature_column(
feature_name,
hash_bucket_size,
dimension,
initializer=tf.zeros_initializer,
combiner="sum",
is_share_embedding=False,
shared_embedding_name=None
)
#生成real_valued_column
self.generate_real_valued_feature_column(
feature_name,
dtype="Float", #僅支持Float和Int
value_dimension=1
)
#生成sparse_column
self.generate_sparse_id_feature_column(
feature_name,
hash_bucket_size,
combiner="sum"
)
#獲取配置的feature column
self.feature_columns_from_column_names(
feature_list
)
模型規范
為了適配線上Service,我們模型需要做以下規范:
初始化記得調用父類:super(CustomModel, self).__init__(config,name)。
logits:需要傳給self.logits。我們最終會對logits做sigmoid操作作為最終預估分數。如果客戶需要用其他方式算分,需要重寫predictions_op方法。
loss:需要傳給self.loss。
reg_loss:需要傳給self.reg_loss。
metrics_op:要記得調用父類super(CustomModel, self).metrics_op(),我們會做一些系統通用指標監控。
以下方法不建議用戶實現:
build_placeholde、rmark_output、trace_sample_op使用框架默認邏輯。
模型開發注意事項
variable 及權重
用戶如果需要使用額外的 tf.Variable
或者用 contrib
之外的一些網絡函數,注意把 variable 加到 MODEL_VARIABLES
這個 collection 里,我們會根據這個加載權重。請注意,需要在線加載權重的再加入,像global_step這種是不需要的!
示例:
from tensorflow.python.framework import ops
from tensorflow.python.ops import variable_scope as vs
self._weights = vs.get_variable(
_WEIGHTS_VARIABLE_NAME, [total_arg_size, output_size],
dtype=dtype,
initializer=kernel_initializer,
collections=[ops.GraphKeys.GLOBAL_VARIABLES, ops.GraphKeys.MODEL_VARIABLES])