TableRecordDataset
本文中含有需要您注意的重要提示信息,忽略該信息可能對您的業(yè)務(wù)造成影響,請務(wù)必仔細閱讀。
您可以使用TableRecordDataset接口按照行讀取MaxComepute表數(shù)據(jù)并構(gòu)建數(shù)據(jù)流。
TensorFlow社區(qū)推薦在1.2及以上版本,使用Dataset接口代替線程和隊列構(gòu)建數(shù)據(jù)流。通過多個Dataset接口的組合變換生成計算數(shù)據(jù),可以簡化數(shù)據(jù)輸入代碼。
公共云GPU服務(wù)器即將過保下線,您可以繼續(xù)提交CPU版本的TensorFlow任務(wù)。如需使用GPU進行模型訓(xùn)練,請前往DLC提交任務(wù),具體操作請參見創(chuàng)建訓(xùn)練任務(wù)。
接口說明
PAI-TF提供的TableRecordDataset與原生TensorFlow RecordDataset相似,可以為數(shù)據(jù)變換(Transformation)的Dataset接口提供數(shù)據(jù)源。TableRecordDataset的接口定義如下。
class TableRecordDataset(Dataset):
def __init__(self,
filenames,
record_defaults,
selected_cols=None,
excluded_cols=None,
slice_id=0,
slice_count=1,
num_threads=0,
capacity=0):
參數(shù) | 描述 |
filenames | 待讀取的表名集合(列表),同一張表可以重復(fù)讀取。 |
record_defaults | 待讀取列的數(shù)據(jù)類型或列為空時的默認數(shù)據(jù)類型。如果該類型與實際讀取的列類型不符,或數(shù)據(jù)類型無法自動轉(zhuǎn)換,則執(zhí)行過程中系統(tǒng)會拋出異常。系統(tǒng)支持的數(shù)據(jù)類型包括FLOAT32、FLOAT64、INT32、INT64、BOOL及STRING。 |
selected_cols | 選取的列,格式為英文逗號(,)分隔的字符串。 |
excluded_cols | 排除的列,格式為英文逗號(,)分隔的字符串。不能同時使用excluded_cols和selected_cols。 |
slice_id | 當(dāng)前分區(qū)的編號。分布式讀取時,系統(tǒng)根據(jù)slice_count將表平均分為多個分區(qū),讀取slice_id對應(yīng)的分區(qū)。 |
slice_count | 分布式讀取時,總的分區(qū)數(shù)量,通常為Worker數(shù)量。 |
num_threads | 預(yù)取數(shù)據(jù)時,每個訪問表的內(nèi)置Reader啟用的線程(獨立于計算線程)數(shù)量。取值范圍為1~64。如果num_threads取值為0,則系統(tǒng)自動將新建的預(yù)取線程數(shù)配置為計算線程池線程數(shù)的1/4。 說明 因為I/O對每個模型的整體計算影響不同,所以提高預(yù)取線程數(shù),不一定可以提升整體模型的訓(xùn)練速度。 |
capacity | 讀取表的總預(yù)取量,單位為行數(shù)。如果num_threads大于1,則每個線程的預(yù)取量為capacity/num_threads行(向上取整)。如果capacity為0,則內(nèi)置Reader根據(jù)所讀表的前N行(系統(tǒng)默認N=256)平均值自動配置總預(yù)取量,使得每個線程的預(yù)取數(shù)據(jù)約占空間64 MB。 說明 如果手動配置預(yù)取量,當(dāng)單線程的預(yù)取量大于1 GB,系統(tǒng)僅輸出告警信息以提示您檢查配置,而不會中斷程序運行。 |
如果MaxCompute表字段為DOUBLE類型,則TensorFlow中需要使用np.float格式與其對應(yīng)。
返回值
TableRecordDataset返回一個新的Dataset對象,可以作為Pipeline工作流構(gòu)建的輸入。
### other definition codes was ignored here.
# Suppose an odps table named 'sample_table' was built in
# 'test' project, which includes 5 columns:
# (itemid bigint, name string, price double,
# virtual bool, tags string)
# Table name would be passed from run commands.
tables = ["odps://test/tables/sample_table"]
# Firstly, we define a new TableRecordDataset to read itemid and price.
dataset = tf.data.TableRecordDataset(tables,
record_defaults = (0, 0.0),
selected_cols = "itemid, price")
# Get a batch of 128
dataset = dataset.batch(128)
# Set epoch as 10
dataset = dataset.repeat(10)
# At last we got a batch of ids and prices.
[ids, prices] = dataset.make_one_shot_iterator().get_next()
### Then we do other graph construction and finally run the graph.
執(zhí)行Session時調(diào)用get_next()
方法,從表中讀取128行數(shù)據(jù),并根據(jù)record_defaults指定的類型將每列數(shù)據(jù)解析為對應(yīng)類型的Tensor。其中get_next()
返回的output_types需要與record_defaults的參數(shù)類型相同,output_shapes的Tensor Shape需要與record_defaults的元素數(shù)量一致。
Console參數(shù)
如果將表作為輸入,提交任務(wù)時,需要使用-Dtables配置待訪問的表名。
pai -name tensorflow1120_cpu_ext -Dtables=odps://algo_platform_dev/tables/sample;
如果讀取2張以上的表,則需要使用英文逗號(,)分隔多個表名。
pai -name tensorflow1120_cpu_ext -Dtables=odps://algo_platform_dev/tables/sample,odps://algo_platform_dev/tables/sample2
如果訪問分區(qū)表,則需要在表名后添加分區(qū)。
pai -name tensorflow1120_cpu_ext -Dtables=odps://algo_platform_dev/tables/sample/pt=1;
示例
以邏輯回歸(Logistic Regression)為例,介紹如何使用TableRecordDataset讀取表數(shù)據(jù)并進行模型訓(xùn)練。
數(shù)據(jù)準備。
TableRecordReader是將整行數(shù)據(jù)作為一個字符串導(dǎo)入MaxCompute表,讀取之后再進行解析。而使用TableRecordDataset時,建議MaxCompute數(shù)據(jù)表按照列存放相應(yīng)的數(shù)據(jù),Dataset接口會將表中的數(shù)據(jù)以指定類型的Tensor返回。
創(chuàng)建表。
使用MaxCompute創(chuàng)建一個包含四列數(shù)據(jù)的表。
odps@ algo_platform_dev>create table sample (col1 double, col2 double, col3 double, col4 double); Data Health Manager:Your health synthesize score is 5, so, your job priority is 7 ID = 201803050245351****6Vgsxo2 OK odps@ algo_platform_dev>read sample; +------------+------------+------------+------------+ | col1 | col2 | col3 | col4 | +------------+------------+------------+------------+ +------------+------------+------------+------------+
導(dǎo)入數(shù)據(jù)。
下載測試數(shù)據(jù),并使用MaxCompute Console Tunnel命令將其導(dǎo)入MaxCompute表。
#查看下載的測試數(shù)據(jù)。 $head -n 3 sample.csv 0,0,0.017179100152531324,1 0,1,0.823381420409002,1 0,2,1.6488850495540865,1
#將數(shù)據(jù)導(dǎo)入MaxCompute表。 odps@ algo_platform_dev>tunnel upload /tmp/data/sample.csv sample -fd=,; Upload session: 20180305135640c8cc650a0000**** Start upload:sample.csv Using \n to split records Upload in strict schema mode: true Total bytes:260093 Split input to 1 blocks 2018-03-05 13:56:40 scan block: '1' 2018-03-05 13:56:40 scan block complete, blockid=1 2018-03-05 13:56:40 upload block: '1' 2018-03-05 13:56:41 upload block complete, blockid=1 upload complete, average speed is 254 KB/s OK odps@ algo_platform_dev>read sample 3; +------------+------------+------------+------------+ | col1 | col2 | col3 | col4 | +------------+------------+------------+------------+ | 0.0 | 0.0 | 0.017179100152531324 | 1.0 | | 0.0 | 1.0 | 0.823381420409002 | 1.0 | | 0.0 | 2.0 | 1.6488850495540865 | 1.0 | +------------+------------+------------+------------+
說明因為該測試數(shù)據(jù)的每行內(nèi)容使用英文逗號(,)分隔,所以使用
-fd=,
配置分隔符為英文逗號(,)才能將每行數(shù)據(jù)分為四列導(dǎo)入至相應(yīng)的MaxCompute表。
構(gòu)建輸入數(shù)據(jù)和模型。
構(gòu)建輸入數(shù)據(jù)的示例代碼如下。除無需定義tf.train.Coordinator和運行start_queue_runners以外,其余代碼與使用TableRecordReader的代碼相同。
#define the input def input_fn(): dataset = tf.data.TableRecordDataset([FLAGS.tables], record_defaults=[1.0]*4).repeat().batch(128) v1, v2, v3, v4 = dataset.make_one_shot_iterator().get_next() labels = tf.reshape(tf.cast(v4, tf.int32), [128]) features = tf.stack([v1, v2, v3], axis=1) return features, labels
完整的示例代碼lr_dataset.py如下。
import tensorflow as tf tf.app.flags.DEFINE_string("tables", "", "tables info") FLAGS = tf.app.flags.FLAGS #define the input def input_fn(): dataset = tf.data.TableRecordDataset([FLAGS.tables], record_defaults=[1.0]*4).repeat().batch(128) v1, v2, v3, v4 = dataset.make_one_shot_iterator().get_next() labels = tf.reshape(tf.cast(v4, tf.int32), [128]) features = tf.stack([v1, v2, v3], axis=1) return features, labels #construct the model def model_fn(features, labels): W = tf.Variable(tf.zeros([3, 2])) b = tf.Variable(tf.zeros([2])) pred = tf.matmul(features, W) + b loss = tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(logits=pred,labels=labels)) # Gradient Descent optimizer = tf.train.GradientDescentOptimizer(0.05).minimize(loss) return loss, optimizer features, labels = input_fn() loss, optimizer = model_fn(features, labels) init = tf.global_variables_initializer() local_init = tf.local_variables_initializer() sess = tf.Session() sess.run(init) sess.run(local_init) for step in range(10000): _, c = sess.run([optimizer, loss]) if step % 2000 == 0: print("loss," , c)
提交任務(wù)。
odps@ algo_platform_dev>pai -name tensorflow1120_cpu_ext -Dtables=odps://algo_platform_dev/tables/sample -Dscript=file:///tmp/lr_dataset.py;
查看執(zhí)行結(jié)果。
單擊提交任務(wù)返回的Logview鏈接,查看執(zhí)行結(jié)果。
start launching tensorflow job ('loss,', 0.6931472) ('loss,', 0.007929571) ('loss,', 0.0016527221) ('loss,', 0.0023481336) ('loss,', 0.0011788738)