為了在DLC任務中方便地讀寫MaxCompute表數據,PAI團隊開發了paiio模塊。paiio支持TableRecordDataset、TableReader及TableWriter三種接口,本文詳細介紹這三種接口的使用說明及讀寫MaxCompute表數據的使用示例。
使用限制
paiio模塊支持TensorFlow 1.12、1.15和2.0版本。僅在分布式訓練(DLC)任務中選擇這些版本對應的鏡像時,才可使用paiio模塊。
paiio模塊不支持自定義鏡像。
準備工作:配置賬戶信息
使用paiio模塊讀寫MaxCompute表數據之前,需要配置MaxCompute賬戶的AccessKey信息。PAI支持從配置文件讀取配置信息,您可以將配置文件放置在掛載的文件系統中,然后在代碼中通過環境變量引用。
編寫配置文件,內容如下。
access_id=xxxx access_key=xxxx end_point=http://xxxx
參數
描述
access_id
阿里云賬號的AccessKey ID。
access_key
阿里云賬號的AccessKey Secret。
end_point
MaxCompute的Endpoint,例如華東2(上海)配置為
http://service.cn-shanghai.maxcompute.aliyun.com/api
。詳情請參見Endpoint。在代碼中指定配置文件路徑,方式如下。
os.environ['ODPS_CONFIG_FILE_PATH'] = '<your MaxCompute config file path>'
其中<your MaxCompute config file path>表示配置文件的路徑。
TableRecordDataset使用說明
接口說明
TensorFlow社區推薦在1.2及以上版本中使用Dataset接口(詳情請參見Dataset)替代原有的線程和隊列接口構建數據流。通過多個Dataset接口的組合變換生成計算數據,可以簡化數據輸入部分的代碼。
接口定義(Python)
class TableRecordDataset(Dataset): def __init__(self, filenames, record_defaults, selected_cols=None, excluded_cols=None, slice_id=0, slice_count=1, num_threads=0, capacity=0):
參數
參數
是否必選
類型
默認值
描述
filenames
是
STRING
無
待讀取的表名集合(列表),多張表的Schema必須一致。表名格式為
odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/...
。record_defaults
是
LIST或TUPLE
無
用于讀出列的數據類型轉換及列為空時的默認值。如果該值與實際讀出的列數不符,或數據類型無法自動轉換,則執行過程中系統會拋出異常。
系統支持的數據類型包括FLOAT32、FLOAT64、INT32、INT64、BOOL及STRING,INT64類型的默認值請參見
np.array(0, np.int64)
。selected_cols
否
STRING
None
選取的列,格式為半角逗號(,)分隔的字符串。默認值None表示讀取所有列。該參數與excluded_cols不能同時使用。
excluded_cols
否
STRING
None
排除的列,格式為半角逗號(,)分隔的字符串。默認值None表示讀取所有列。該參數與selected_cols不能同時使用。
slice_id
否
INT
0
在分布式讀取場景下,當前分片的編號(從0開始編號)。分布式讀取時,系統根據slice_count將表均分為多個分片,讀取slice_id對應的分片。
slice_id為默認值0時,如果slice_count取值為1,則表示讀取整張表。如果slice_count大于1,則表示讀取第0個分片。
slice_count
否
INT
1
在分布式讀取場景下,總的分片數量,通常為Worker數量。默認值1表示不分片,即讀取整張表,
num_threads
否
INT
0
預取數據時,每個訪問表的內置Reader啟用的線程(獨立于計算線程)數量。取值范圍為1~64。如果num_threads取值為0,則系統自動將新建的預取線程數配置為計算線程池線程數的1/4。
說明因為I/O對每個模型的整體計算影響不同,所以提高預取線程數,不一定可以提升整體模型的訓練速度。
capacity
否
INT
0
讀取表的總預取量,單位為行數。如果num_threads大于1,則每個線程的預取量為capacity/num_threads行(向上取整)。如果capacity為0,則內置Reader根據所讀表的前N行(系統默認N=256)平均值自動配置總預取量,使得每個線程的預取數據約占空間64 MB。
說明如果MaxCompute表字段為DOUBLE類型,則TensorFlow中需要使用np.float64格式與其對應。
返回值
返回Dataset對象,可以作為Pipeline工作流構建的輸入。
使用示例
假設在myproject項目中存儲了一張名為test的表,其部分內容如下所示。
itemid(BIGINT) | name(STRING) | price(DOUBLE) | virtual(BOOL) |
25 | "Apple" | 5.0 | False |
38 | "Pear" | 4.5 | False |
17 | "Watermelon" | 2.2 | False |
以下代碼實現了使用TableRecordDataset接口讀取test表itemid和price列的數據。
import os
import tensorflow as tf
import paiio
# 指定配置文件路徑。請替換為配置文件實際存放的路徑。
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# 定義要讀取的Table, 可以是多個。請替換為需要訪問的表名稱和相應的MaxCompute項目名稱。
table = ["odps://${your_projectname}/tables/${table_name}"]
# 定義TableRecordDataset, 讀取表的itemid和price列。
dataset = paiio.data.TableRecordDataset(table,
record_defaults=[0, 0.0],
selected_cols="itemid,price",
num_threads=1,
capacity=10)
# 設置epoch 2, batch size 3, prefetch 100 batch。
dataset = dataset.repeat(2).batch(3).prefetch(100)
ids, prices = tf.compat.v1.data.make_one_shot_iterator(dataset).get_next()
with tf.compat.v1.Session() as sess:
sess.run(tf.compat.v1.global_variables_initializer())
sess.run(tf.compat.v1.local_variables_initializer())
try:
while True:
batch_ids, batch_prices = sess.run([ids, prices])
print("batch_ids:", batch_ids)
print("batch_prices:", batch_prices)
except tf.errors.OutOfRangeError:
print("End of dataset")
TableReader使用說明
接口說明
TableReader基于MaxCompute SDK實現,不依賴TensorFlow框架,可以直接訪問MaxCompute表并即時獲取I/O結果。
創建Reader并打開表
接口定義
reader = paiio.python_io.TableReader(table, selected_cols="", excluded_cols="", slice_id=0, slice_count=1):
參數
返回值
Reader對象。
參數 | 是否必選 | 類型 | 默認值 | 描述 |
table | 是 | STRING | 無 | 需要打開的MaxCompute表名,格式為 |
selected_cols | 否 | STRING | 空字符串("") | 選取的列,格式為英文逗號(,)分隔的字符串。默認值空字符串("")表示讀取所有列。該參數與excluded_cols不能同時使用。 |
excluded_cols | 否 | STRING | 空字符串("") | 排除的列,格式為英文逗號(,)分隔的字符串。默認值空字符串("")表示讀取所有列。該參數與selected_cols不能同時使用。 |
slice_id | 否 | INT | 0 | 在分布式讀取場景下,當前分片的編號,取值范圍為[0, slice_count-1]。分布式讀取時,系統根據slice_count將表均分為多個分片,讀取slice_id對應的分片。默認值0表示不分片,即讀取表的所有行。 |
slice_count | 否 | INT | 1 | 在分布式讀取場景下,總的分片數量,通常為Worker數量。 |
讀取記錄
接口定義
reader.read(num_records=1)
參數
num_records表示順序讀取的行數。默認值為1,即讀取1行。如果num_records參數超出未讀的行數,則返回讀取到的所有行。如果未讀取到記錄,則拋出OutOfRange異常(paiio.python_io.OutOfRangeException
)。
返回值
返回一個numpy ndarray數組(或稱為recarray),數組中每個元素為表的一行數據組成的一個TUPLE。
定位到相應行
接口定義
reader.seek(offset=0)
參數
offset表示定位到的行(行從0開始編號),下一個Read操作將從定位的行開始。如果配置了slice_id和slice_count,則按分片位置進行相對行的定位。如果offset超出表的總行數,則系統拋出OutOfRange異常。如果之前的讀取位置已經超出表尾,則繼續進行seek系統會拋出OutOfRange異常(paiio.python_io.OutOfRangeException
)。
讀取一個batch_size時,如果剩余行數不足一個batch_size,則read操作會返回剩余行且不拋異常。此時,如果繼續進行seek操作,則系統會拋異常。
返回值
無返回值。如果操作出錯,則系統拋出異常。
獲取表的總記錄數
接口定義
reader.get_row_count()
參數
無
返回值
返回表的行數。如果配置了slice_id和slice_count,則返回分片大小。
獲取表的Schema
接口定義
reader.get_schema()
參數
無
返回值
返回1D-stuctured ndarray,每個元素對應reader中選定的MaxCompute表中一列的Schema,包括如下三個元素。
參數 | 描述 |
colname | 列名。 |
typestr | MaxCompute數據類型名稱。 |
pytype | typestr對應的Python數據類型。 |
typestr和pytype的對應關系如下表所示。
typestr | pytype |
BIGINT | INT |
DOUBLE | FLOAT |
BOOLEAN | BOOL |
STRING | OBJECT |
DATETIME | INT |
MAP 說明 PAI-TensorFlow不支持對MAP類型數據進行操作。 | OBJECT |
關閉表
接口定義
reader.close()
參數
無
返回值
無返回值。如果操作出錯,則系統拋出異常。
使用示例
假設在myproject項目中存儲了一張名為test的表,其內容如下所示。
uid(BIGINT) | name(STRING) | price(DOUBLE) | virtual(BOOL) |
25 | "Apple" | 5.0 | False |
38 | "Pear" | 4.5 | False |
17 | "Watermelon" | 2.2 | False |
以下代碼實現了使用TableReader讀取uid、name及price列的數據。
import os
import paiio
# 指定配置文件路徑。請替換為配置文件實際存放的路徑。
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# 打開一張表,返回reader對象。請替換為需要訪問的表名稱和相應的MaxCompute項目名稱。
reader = paiio.python_io.TableReader("odps://myproject/tables/test", selected_cols="uid,name,price")
# 獲得表的總行數。
total_records_num = reader.get_row_count() # return 3
batch_size = 2
# 讀表,返回值將是一個recarray數組,形式為[(uid, name, price)*2]。
records = reader.read(batch_size) # 返回[(25, "Apple", 5.0), (38, "Pear", 4.5)]
records = reader.read(batch_size) # 返回[(17, "Watermelon", 2.2)]
# 繼續讀取將拋出OutOfRange異常。
# Close the reader.
reader.close()
TableWriter使用說明
TableWriter基于MaxCompute SDK實現,不依賴TensorFlow框架,可以直接對MaxCompute表進行寫入并返回。
接口說明
創建Writer并打開表
接口定義
writer = paiio.python_io.TableWriter(table, slice_id=0)
說明該接口不會清空原表中的數據,采用追加的方式寫入數據。
對于新寫入的數據,關閉表之后才能對其進行讀取。
參數
參數
是否必選
類型
默認值
描述
table
是
STRING
無
待打開的MaxCompute表名,格式為
odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/...
slice_id
否
INT
0
在分布式場景,寫表至不同的分片,從而避免寫沖突。在單機場景,使用默認值0即可。在多機場景,如果多個Worker(包括PS)同時使用同一個slice_id寫表,則會導致寫入失敗。
返回值
返回Writer對象。
寫入記錄
接口定義
writer.write(values, indices)
參數
參數
是否必選
類型
默認值
描述
values
是
STRING
無
待寫入的數據。支持寫入單行數據或多行數據:
如果僅寫入單行數據,則向values參數傳入一個由標量組成的TUPLE、LIST或1D-ndarray。如果傳入的是LIST或ndarray,則說明寫入的各列數據類型一致。
如果寫入N行數據(N>=1),可以向values參數傳入一個LIST或1D-ndarray,參數中的每個元素都應該對應一個單行的數據(用TUPLE或LIST表示,也可以通過Structure形式存放于ndarray中)。
indices
是
INT
無
指定數據寫入的列,支持傳入由INT類型Index組成的TUPLE、LIST或1D-ndarray。indices中每個數(i)對應表中相應的第i列(列數從0開始編號)。
返回值
無返回值。如果寫過程出錯,則系統會拋出異常并退出。
關閉表
接口定義
writer.close()
說明在with語句的區塊中,無需顯示調用close()接口關閉表。
參數
無
返回值
無返回值。如果操作出錯,則系統拋出異常。
示例
通過with語句使用TableWriter,代碼如下。
with paiio.python_io.TableWriter(table) as writer: # Prepare values for writing. writer.write(values, incides) # Table would be closed automatically outside this section.
使用示例
import paiio
import os
# 指定配置文件路徑。請替換為配置文件實際存放的路徑。
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# 準備數據。
values = [(25, "Apple", 5.0, False),
(38, "Pear", 4.5, False),
(17, "Watermelon", 2.2, False)]
# 打開一個表,返回writer對象。請替換為需要訪問的表名稱和相應的MaxCompute項目名稱。
writer = paiio.python_io.TableWriter("odps://project/tables/test")
# Write records to the 0-3 columns of the table. 將數據寫至表中的第0-3列。
records = writer.write(values, indices=[0, 1, 2, 3])
# 關閉writer。
writer.close()
后續操作
代碼配置完成后,您可以參照以下步驟使用paiio進行MaxCompute表的讀寫操作: