Python SDK示例:Table
本文為您介紹Python SDK中表相關(guān)的典型場景操作示例。
列出所有表
通過調(diào)用入口對象的list_tables()
方法可以列出項(xiàng)目空間下的所有表。
for table in odps.list_tables():
# 處理每張表。
判斷表是否存在
通過調(diào)用入口對象的exist_table()
方法判斷表是否存在;通過調(diào)用get_table()
方法獲取表。
t = odps.get_table('table_name')
t.schema
odps.Schema {
c_int_a bigint
c_int_b bigint
c_double_a double
c_double_b double
c_string_a string
c_string_b string
c_bool_a boolean
c_bool_b boolean
c_datetime_a datetime
c_datetime_b datetime
}
t.lifecycle
-1
print(t.creation_time)
2014-05-15 14:58:43
t.is_virtual_view
False
t.size
1408
t.schema.columns
[<column c_int_a, type bigint>,
<column c_int_b, type bigint>,
<column c_double_a, type double>,
<column c_double_b, type double>,
<column c_string_a, type string>,
<column c_string_b, type string>,
<column c_bool_a, type boolean>,
<column c_bool_b, type boolean>,
<column c_datetime_a, type datetime>,
<column c_datetime_b, type datetime>]
創(chuàng)建表的Schema
初始化方法有如下兩種:
通過表的列以及可選的分區(qū)進(jìn)行初始化。
from odps.models import Schema, Column, Partition columns = [ Column(name='num', type='bigint', comment='the column'), Column(name='num2', type='double', comment='the column2'), ] partitions = [Partition(name='pt', type='string', comment='the partition')] schema = Schema(columns=columns, partitions=partitions)
初始化后,您可獲取字段信息、分區(qū)信息等。
獲取所有字段信息。
print(schema.columns)
返回示例如下。
[<column num, type bigint>, <column num2, type double>, <partition pt, type string>]
獲取分區(qū)字段。
print(schema.partitions)
返回示例如下。
[<partition pt, type string>]
獲取非分區(qū)字段名稱。
print(schema.names)
返回示例如下。
['num', 'num2']
獲取非分區(qū)字段類型。
print(schema.types)
返回示例如下。
[bigint, double]
使用
Schema.from_lists()
方法。該方法更容易調(diào)用,但無法直接設(shè)置列和分區(qū)的注釋。from odps.models import Schema schema = Schema.from_lists(['num', 'num2'], ['bigint', 'double'], ['pt'], ['string']) print(schema.columns)
返回值示例如下。
[<column num, type bigint>, <column num2, type double>, <partition pt, type string>]
創(chuàng)建表
您可以使用o.create_table()
方法創(chuàng)建表,使用方式有兩種:使用表Schema方式、使用字段名和字段類型方式。同時創(chuàng)建表時表字段的數(shù)據(jù)類型有一定的限制條件,詳情如下。
使用表Schema創(chuàng)建表
使用表Schema創(chuàng)建表時,您需要先創(chuàng)建表的Schema,然后通過Schema創(chuàng)建表。
#創(chuàng)建表的schema
from odps.models import Schema
schema = Schema.from_lists(['num', 'num2'], ['bigint', 'double'], ['pt'], ['string'])
#通過schema創(chuàng)建表
table = o.create_table('my_new_table', schema)
#只有不存在表時,才創(chuàng)建表。
table = o.create_table('my_new_table', schema, if_not_exists=True)
#設(shè)置生命周期。
table = o.create_table('my_new_table', schema, lifecycle=7)
表創(chuàng)建完成后,您可以通過print(o.exist_table('my_new_table'))
驗(yàn)證表是否創(chuàng)建成功,返回True
表示表創(chuàng)建成功。
使用字段名及字段類型創(chuàng)建表
#創(chuàng)建分區(qū)表my_new_table,可傳入(表字段列表,分區(qū)字段列表)。
table = o.create_table('my_new_table', ('num bigint, num2 double', 'pt string'), if_not_exists=True)
#創(chuàng)建非分區(qū)表my_new_table02。
table = o.create_table('my_new_table02', 'num bigint, num2 double', if_not_exists=True)
表創(chuàng)建完成后,您可以通過print(o.exist_table('my_new_table'))
驗(yàn)證表是否創(chuàng)建成功,返回True
表示表創(chuàng)建成功。
使用字段名及字段類型創(chuàng)建表:新數(shù)據(jù)類型
未打開新數(shù)據(jù)類型開關(guān)時(默認(rèn)關(guān)閉),創(chuàng)建表的數(shù)據(jù)類型只允許為BIGINT、DOUBLE、DECIMAL、STRING、DATETIME、BOOLEAN、MAP和ARRAY類型。如果您需要創(chuàng)建TINYINT和STRUCT等新數(shù)據(jù)類型字段的表,可以打開options.sql.use_odps2_extension = True
開關(guān),示例如下。
from odps import options
options.sql.use_odps2_extension = True
table = o.create_table('my_new_table', 'cat smallint, content struct<title:varchar(100), body:string>')
刪除表
使用delete_table()
方法刪除已經(jīng)存在的表。
o.delete_table('my_table_name', if_exists=True) # 只有表存在時,才刪除表。
t.drop() # Table對象存在時,直接調(diào)用Drop方法刪除。
表分區(qū)
判斷是否為分區(qū)表。
table = o.get_table('my_new_table') if table.schema.partitions: print('Table %s is partitioned.' % table.name)
遍歷表全部分區(qū)。
table = o.get_table('my_new_table') for partition in table.partitions: # 遍歷所有分區(qū) print(partition.name) # 具體的遍歷步驟,這里是打印分區(qū)名 for partition in table.iterate_partitions(spec='pt=test'): # 遍歷 pt=test 分區(qū)下的二級分區(qū) print(partition.name) # 具體的遍歷步驟,這里是打印分區(qū)名 for partition in table.iterate_partitions(spec='dt>20230119'): # 遍歷 dt>20230119 分區(qū)下的二級分區(qū) print(partition.name) # 具體的遍歷步驟,這里是打印分區(qū)名
重要PyODPS自0.11.3版本開始,支持為
iterate_partitions
指定邏輯表達(dá)式,如上述示例中的dt>20230119
。判斷分區(qū)是否存在。
table = o.get_table('my_new_table') table.exist_partition('pt=test,sub=2015')
獲取分區(qū)。
table = o.get_table('my_new_table') partition = table.get_partition('pt=test') print(partition.creation_time) partition.size
創(chuàng)建分區(qū)。
t = o.get_table('my_new_table') t.create_partition('pt=test', if_not_exists=True) # 指定if_not_exists參數(shù),分區(qū)不存在時才創(chuàng)建分區(qū)。
刪除分區(qū)。
t = o.get_table('my_new_table') t.delete_partition('pt=test', if_exists=True) # 自定if_exists參數(shù),分區(qū)存在時才刪除分區(qū)。 partition.drop() # 分區(qū)對象存在時,直接對分區(qū)對象調(diào)用Drop方法刪除。
讀取表數(shù)據(jù)
有若干種方法能夠獲取表數(shù)據(jù)。
如果只是查看每個表的開始的小于1萬條數(shù)據(jù),則可以使用head方法。
from odps import ODPS t = o.get_table('dual') for record in t.head(3): # 處理每個Record對象
使用 with 表達(dá)式的寫法:
with t.open_reader(partition='pt=test') as reader: count = reader.count for record in reader[5:10]: # 可以執(zhí)行多次,直到將count數(shù)量的record讀完,這里可以改造成并行操作 # 處理一條記錄
不使用 with 表達(dá)式的寫法:
reader = t.open_reader(partition='pt=test') count = reader.count for record in reader[5:10]: # 可以執(zhí)行多次,直到將count數(shù)量的record讀完,這里可以改造成并行操作 # 處理一條記錄
直接讀取成 Pandas DataFrame:
with t.open_reader(partition='pt=test') as reader: pd_df = reader.to_pandas()
寫入表數(shù)據(jù)
類似于open_reader
,table對象同樣能執(zhí)行open_writer
來打開writer,并寫數(shù)據(jù)。
使用with寫法:
with t.open_writer(partition='pt=test') as writer: records = [[111, 'aaa', True], # 這里可以是list [222, 'bbb', False], [333, 'ccc', True], [444, '中文', False]] writer.write(records) # 這里records可以是可迭代對象 records = [t.new_record([111, 'aaa', True]), # 也可以是Record對象 t.new_record([222, 'bbb', False]), t.new_record([333, 'ccc', True]), t.new_record([444, '中文', False])] writer.write(records)
如果分區(qū)不存在,可以使用 create_partition 參數(shù)指定創(chuàng)建分區(qū),如:
with t.open_writer(partition='pt=test', create_partition=True) as writer: records = [[111, 'aaa', True], # 這里可以是list [222, 'bbb', False], [333, 'ccc', True], [444, '中文', False]] writer.write(records) # 這里records可以是可迭代對象
更簡單的寫數(shù)據(jù)方法是使用ODPS對象的write_table方法,例如:
records = [[111, 'aaa', True], # 這里可以是list [222, 'bbb', False], [333, 'ccc', True], [444, '中文', False]] o.write_table('test_table', records, partition='pt=test', create_partition=True)
說明每次調(diào)用
write_table
,MaxCompute 都會在服務(wù)端生成一個文件。這一操作需要較大的時間開銷, 同時過多的文件會降低后續(xù)的查詢效率。因此,建議您在使用write_table
方法時,一次性寫入多組數(shù)據(jù), 或者傳入一個generator對象。write_table
寫表時會追加到原有數(shù)據(jù)。PyODPS不提供覆蓋數(shù)據(jù)的選項(xiàng),如果需要覆蓋數(shù)據(jù),需要手動清除原有數(shù)據(jù)。對于非分區(qū)表,需要調(diào)用table.truncate()
,對于分區(qū)表,需要刪除分區(qū)后再建立。
使用Arrow格式讀寫數(shù)據(jù)
Apache Arrow是一種跨語言的通用數(shù)據(jù)讀寫格式,支持在各種不同平臺間進(jìn)行數(shù)據(jù)交換。 自2021年起, MaxCompute支持使用Arrow格式讀取表數(shù)據(jù),PyODPS則從0.11.2版本開始支持該功能。具體來說,如果在Python環(huán)境中安裝pyarrow后,在調(diào)用open_writer
時增加arrow=True
參數(shù),即可讀寫Arrow RecordBatch 。
import pandas as pd
import pyarrow as pa
with t.open_writer(partition='pt=test', create_partition=True, arrow=True) as writer:
records = [[111, 'aaa', True],
[222, 'bbb', False],
[333, 'ccc', True],
[444, '中文', False]]
df = pd.DataFrame(records, columns=["int_val", "str_val", "bool_val"])
# 寫入 RecordBatch
batch = pa.RecordBatch.from_pandas(df)
writer.write(batch)
# 也可以直接寫入 Pandas DataFrame
writer.write(df)