日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

使用paiio讀寫MaxCompute表數據

為了在DLC任務中方便地讀寫MaxCompute表數據,PAI團隊開發了paiio模塊。paiio支持TableRecordDataset、TableReaderTableWriter三種接口,本文詳細介紹這三種接口的使用說明及讀寫MaxCompute表數據的使用示例。

使用限制

  • paiio模塊支持TensorFlow 1.12、1.152.0版本。僅在分布式訓練(DLC)任務中選擇這些版本對應的鏡像時,才可使用paiio模塊。

  • paiio模塊不支持自定義鏡像。

準備工作:配置賬戶信息

使用paiio模塊讀寫MaxCompute表數據之前,需要配置MaxCompute賬戶的AccessKey信息。PAI支持從配置文件讀取配置信息,您可以將配置文件放置在掛載的文件系統中,然后在代碼中通過環境變量引用。

  1. 編寫配置文件,內容如下。

    access_id=xxxx
    access_key=xxxx
    end_point=http://xxxx

    參數

    描述

    access_id

    阿里云賬號的AccessKey ID。

    access_key

    阿里云賬號的AccessKey Secret。

    end_point

    MaxComputeEndpoint,例如華東2(上海)配置為http://service.cn-shanghai.maxcompute.aliyun.com/api。詳情請參見Endpoint。

  2. 在代碼中指定配置文件路徑,方式如下。

    os.environ['ODPS_CONFIG_FILE_PATH'] = '<your MaxCompute config file path>'

    其中<your MaxCompute config file path>表示配置文件的路徑。

TableRecordDataset使用說明

接口說明

TensorFlow社區推薦在1.2及以上版本中使用Dataset接口(詳情請參見Dataset)替代原有的線程和隊列接口構建數據流。通過多個Dataset接口的組合變換生成計算數據,可以簡化數據輸入部分的代碼。

  • 接口定義(Python)

    class TableRecordDataset(Dataset):
      def __init__(self,
                   filenames,
                   record_defaults,
                   selected_cols=None,
                   excluded_cols=None,
                   slice_id=0,
                   slice_count=1,
                   num_threads=0,
                   capacity=0):
  • 參數

    參數

    是否必選

    類型

    默認值

    描述

    filenames

    STRING

    待讀取的表名集合(列表),多張表的Schema必須一致。表名格式為odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/...

    record_defaults

    LISTTUPLE

    用于讀出列的數據類型轉換及列為空時的默認值。如果該值與實際讀出的列數不符,或數據類型無法自動轉換,則執行過程中系統會拋出異常。

    系統支持的數據類型包括FLOAT32、FLOAT64、INT32、INT64、BOOLSTRING,INT64類型的默認值請參見np.array(0, np.int64)

    selected_cols

    STRING

    None

    選取的列,格式為半角逗號(,)分隔的字符串。默認值None表示讀取所有列。該參數與excluded_cols不能同時使用。

    excluded_cols

    STRING

    None

    排除的列,格式為半角逗號(,)分隔的字符串。默認值None表示讀取所有列。該參數與selected_cols不能同時使用。

    slice_id

    INT

    0

    在分布式讀取場景下,當前分片的編號(從0開始編號)。分布式讀取時,系統根據slice_count將表均分為多個分片,讀取slice_id對應的分片。

    slice_id為默認值0時,如果slice_count取值為1,則表示讀取整張表。如果slice_count大于1,則表示讀取第0個分片。

    slice_count

    INT

    1

    在分布式讀取場景下,總的分片數量,通常為Worker數量。默認值1表示不分片,即讀取整張表,

    num_threads

    INT

    0

    預取數據時,每個訪問表的內置Reader啟用的線程(獨立于計算線程)數量。取值范圍為1~64。如果num_threads取值為0,則系統自動將新建的預取線程數配置為計算線程池線程數的1/4。

    說明

    因為I/O對每個模型的整體計算影響不同,所以提高預取線程數,不一定可以提升整體模型的訓練速度。

    capacity

    INT

    0

    讀取表的總預取量,單位為行數。如果num_threads大于1,則每個線程的預取量為capacity/num_threads行(向上取整)。如果capacity0,則內置Reader根據所讀表的前N行(系統默認N=256)平均值自動配置總預取量,使得每個線程的預取數據約占空間64 MB。

    說明

    如果MaxCompute表字段為DOUBLE類型,則TensorFlow中需要使用np.float64格式與其對應。

  • 返回值

    返回Dataset對象,可以作為Pipeline工作流構建的輸入。

使用示例

假設在myproject項目中存儲了一張名為test的表,其部分內容如下所示。

itemid(BIGINT)

name(STRING)

price(DOUBLE)

virtual(BOOL)

25

"Apple"

5.0

False

38

"Pear"

4.5

False

17

"Watermelon"

2.2

False

以下代碼實現了使用TableRecordDataset接口讀取testitemidprice列的數據。

import os
import tensorflow as tf
import paiio

# 指定配置文件路徑。請替換為配置文件實際存放的路徑。
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# 定義要讀取的Table, 可以是多個。請替換為需要訪問的表名稱和相應的MaxCompute項目名稱。
table = ["odps://${your_projectname}/tables/${table_name}"]
# 定義TableRecordDataset, 讀取表的itemid和price列。
dataset = paiio.data.TableRecordDataset(table,
                                       record_defaults=[0, 0.0],
                                       selected_cols="itemid,price",
                                       num_threads=1,
                                       capacity=10)
# 設置epoch 2, batch size 3, prefetch 100 batch。
dataset = dataset.repeat(2).batch(3).prefetch(100)

ids, prices = tf.compat.v1.data.make_one_shot_iterator(dataset).get_next()

with tf.compat.v1.Session() as sess:
    sess.run(tf.compat.v1.global_variables_initializer())
    sess.run(tf.compat.v1.local_variables_initializer())
    try:
        while True:
            batch_ids, batch_prices = sess.run([ids, prices])
            print("batch_ids:", batch_ids)
            print("batch_prices:", batch_prices)
    except tf.errors.OutOfRangeError:
        print("End of dataset")

TableReader使用說明

接口說明

TableReader基于MaxCompute SDK實現,不依賴TensorFlow框架,可以直接訪問MaxCompute表并即時獲取I/O結果。

  • 創建Reader并打開表

    • 接口定義

    • reader = paiio.python_io.TableReader(table,
                           selected_cols="",
                          excluded_cols="",
                           slice_id=0,
                          slice_count=1):
    • 參數

    • 參數

      是否必選

      類型

      默認值

      描述

      table

      STRING

      需要打開的MaxCompute表名,格式為odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/...

      selected_cols

      STRING

      空字符串("")

      選取的列,格式為英文逗號(,)分隔的字符串。默認值空字符串("")表示讀取所有列。該參數與excluded_cols不能同時使用。

      excluded_cols

      STRING

      空字符串("")

      排除的列,格式為英文逗號(,)分隔的字符串。默認值空字符串("")表示讀取所有列。該參數與selected_cols不能同時使用。

      slice_id

      INT

      0

      在分布式讀取場景下,當前分片的編號,取值范圍為[0, slice_count-1]。分布式讀取時,系統根據slice_count將表均分為多個分片,讀取slice_id對應的分片。默認值0表示不分片,即讀取表的所有行。

      slice_count

      INT

      1

      在分布式讀取場景下,總的分片數量,通常為Worker數量。

    • 返回值

      Reader對象。

  • 讀取記錄

    • 接口定義

    • reader.read(num_records=1)
    • 參數

      num_records表示順序讀取的行數。默認值為1,即讀取1行。如果num_records參數超出未讀的行數,則返回讀取到的所有行。如果未讀取到記錄,則拋出OutOfRange異常(paiio.python_io.OutOfRangeException)。

    • 返回值

      返回一個numpy ndarray數組(或稱為recarray),數組中每個元素為表的一行數據組成的一個TUPLE。

  • 定位到相應行

    • 接口定義

    • reader.seek(offset=0)
    • 參數

    • offset表示定位到的行(行從0開始編號),下一個Read操作將從定位的行開始。如果配置了slice_idslice_count,則按分片位置進行相對行的定位。如果offset超出表的總行數,則系統拋出OutOfRange異常。如果之前的讀取位置已經超出表尾,則繼續進行seek系統會拋出OutOfRange異常(paiio.python_io.OutOfRangeException)。

      重要

      讀取一個batch_size時,如果剩余行數不足一個batch_size,則read操作會返回剩余行且不拋異常。此時,如果繼續進行seek操作,則系統會拋異常。

    • 返回值

      無返回值。如果操作出錯,則系統拋出異常。

  • 獲取表的總記錄數

    • 接口定義

    • reader.get_row_count()
    • 參數

    • 返回值

      返回表的行數。如果配置了slice_idslice_count,則返回分片大小。

  • 獲取表的Schema

    • 接口定義

    • reader.get_schema()
    • 參數

    • 返回值

    • 返回1D-stuctured ndarray,每個元素對應reader中選定的MaxCompute表中一列的Schema,包括如下三個元素。

      參數

      描述

      colname

      列名。

      typestr

      MaxCompute數據類型名稱。

      pytype

      typestr對應的Python數據類型。

      typestrpytype的對應關系如下表所示。

      typestr

      pytype

      BIGINT

      INT

      DOUBLE

      FLOAT

      BOOLEAN

      BOOL

      STRING

      OBJECT

      DATETIME

      INT

      MAP

      說明

      PAI-TensorFlow不支持對MAP類型數據進行操作。

      OBJECT

  • 關閉表

    • 接口定義

    • reader.close()
    • 參數

    • 返回值

      無返回值。如果操作出錯,則系統拋出異常。

使用示例

假設在myproject項目中存儲了一張名為test的表,其內容如下所示。

uid(BIGINT)

name(STRING)

price(DOUBLE)

virtual(BOOL)

25

"Apple"

5.0

False

38

"Pear"

4.5

False

17

"Watermelon"

2.2

False

以下代碼實現了使用TableReader讀取uidnameprice列的數據。

    import os
    import paiio
    
    # 指定配置文件路徑。請替換為配置文件實際存放的路徑。
    os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
    # 打開一張表,返回reader對象。請替換為需要訪問的表名稱和相應的MaxCompute項目名稱。
    reader = paiio.python_io.TableReader("odps://myproject/tables/test", selected_cols="uid,name,price")
    
    # 獲得表的總行數。
    total_records_num = reader.get_row_count() # return 3
    
    batch_size = 2
    # 讀表,返回值將是一個recarray數組,形式為[(uid, name, price)*2]。
    records = reader.read(batch_size) # 返回[(25, "Apple", 5.0), (38, "Pear", 4.5)]
    records = reader.read(batch_size) # 返回[(17, "Watermelon", 2.2)]
    # 繼續讀取將拋出OutOfRange異常。
    
    # Close the reader.
    reader.close()

TableWriter使用說明

TableWriter基于MaxCompute SDK實現,不依賴TensorFlow框架,可以直接對MaxCompute表進行寫入并返回。

接口說明

  • 創建Writer并打開表

    • 接口定義

      writer = paiio.python_io.TableWriter(table, slice_id=0)
      說明
      • 該接口不會清空原表中的數據,采用追加的方式寫入數據。

      • 對于新寫入的數據,關閉表之后才能對其進行讀取。

    • 參數

      參數

      是否必選

      類型

      默認值

      描述

      table

      STRING

      待打開的MaxCompute表名,格式為odps://${your_projectname}/tables/${table_name}/${pt_1}/${pt_2}/...

      slice_id

      INT

      0

      在分布式場景,寫表至不同的分片,從而避免寫沖突。在單機場景,使用默認值0即可。在多機場景,如果多個Worker(包括PS)同時使用同一個slice_id寫表,則會導致寫入失敗。

    • 返回值

      返回Writer對象。

  • 寫入記錄

    • 接口定義

      writer.write(values, indices)
    • 參數

      參數

      是否必選

      類型

      默認值

      描述

      values

      STRING

      待寫入的數據。支持寫入單行數據或多行數據:

      • 如果僅寫入單行數據,則向values參數傳入一個由標量組成的TUPLE、LIST1D-ndarray。如果傳入的是LISTndarray,則說明寫入的各列數據類型一致。

      • 如果寫入N行數據(N>=1),可以向values參數傳入一個LIST1D-ndarray,參數中的每個元素都應該對應一個單行的數據(用TUPLELIST表示,也可以通過Structure形式存放于ndarray中)。

      indices

      INT

      指定數據寫入的列,支持傳入由INT類型Index組成的TUPLE、LIST1D-ndarray。indices中每個數(i)對應表中相應的第i列(列數從0開始編號)。

    • 返回值

      無返回值。如果寫過程出錯,則系統會拋出異常并退出。

  • 關閉表

    • 接口定義

      writer.close()
      說明

      with語句的區塊中,無需顯示調用close()接口關閉表。

    • 參數

    • 返回值

      無返回值。如果操作出錯,則系統拋出異常。

    • 示例

      通過with語句使用TableWriter,代碼如下。

      with paiio.python_io.TableWriter(table) as writer:
        # Prepare values for writing.
          writer.write(values, incides)
          # Table would be closed automatically outside this section.

使用示例

import paiio
import os

# 指定配置文件路徑。請替換為配置文件實際存放的路徑。
os.environ['ODPS_CONFIG_FILE_PATH'] = "/mnt/data/odps_config.ini"
# 準備數據。
values = [(25, "Apple", 5.0, False),
          (38, "Pear", 4.5, False),
          (17, "Watermelon", 2.2, False)]

# 打開一個表,返回writer對象。請替換為需要訪問的表名稱和相應的MaxCompute項目名稱。
writer = paiio.python_io.TableWriter("odps://project/tables/test")

# Write records to the 0-3 columns of the table. 將數據寫至表中的第0-3列。
records = writer.write(values, indices=[0, 1, 2, 3])

# 關閉writer。
writer.close()

后續操作

代碼配置完成后,您可以參照以下步驟使用paiio進行MaxCompute表的讀寫操作:

  1. 創建數據集,上傳您已準備的配置和代碼文件至數據源。如何創建數據集,請參見創建及管理數據集。

  2. 創建分布式訓練(DLC)任務,其中關鍵參數配置說明如下,其他參數配置說明,請參見創建訓練任務。

    • 節點鏡像PAI官方鏡像選擇TensorFlow 1.12、TensorFlow 1.15TensorFlow 2.0版本對應的鏡像。

    • 數據集配置數據集選擇步驟1創建的數據集;掛載路徑配置為/mnt/data/。

    • 執行命令:配置為python /mnt/data/xxx.py。其中xxx.py即為步驟1上傳的代碼文件。

  3. 單擊確定

    成功提交訓練任務后,您可以在實例日志中查看運行結果,詳情請參見查看任務日志。