Python SDK示例:SQL
本文為您介紹Python SDK中執(zhí)行SQL命令相關(guān)的典型場景操作示例。
注意事項
PyODPS支持MaxCompute SQL查詢,并可以讀取執(zhí)行的結(jié)果,使用時有以下注意事項。
入口對象的
execute_sql('statement')
和run_sql('statement')
方法可以執(zhí)行SQL語句,返回值是運(yùn)行實例,詳情請參見任務(wù)實例。目前暫不支持使用Arrow格式讀取Instance結(jié)果。
并非所有可以執(zhí)行的MaxCompute命令都是PyODPS可以接受的SQL語句。在調(diào)用非DDL、DML語句時,請使用其他方法,例如:
GRANT、REVOKE等語句請使用run_security_query方法。
PAI命令請使用run_xflow或execute_xflow方法。
調(diào)用SQL引擎執(zhí)行SQL,會按照SQL作業(yè)進(jìn)行計費(fèi),計費(fèi)詳情請參見計費(fèi)項與計費(fèi)方式概述。
執(zhí)行SQL
import os
from odps import ODPS
# 確保 ALIBABA_CLOUD_ACCESS_KEY_ID 環(huán)境變量設(shè)置為用戶 Access Key ID,
# ALIBABA_CLOUD_ACCESS_KEY_SECRET 環(huán)境變量設(shè)置為用戶 Access Key Secret,
# 不建議直接使用 Access Key ID / Access Key Secret 字符串
o = ODPS(
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_ID'),
os.getenv('ALIBABA_CLOUD_ACCESS_KEY_SECRET'),
project='your-default-project',
endpoint='your-end-point',
)
o.execute_sql('select * from table_name') # 同步的方式執(zhí)行,會阻塞直到SQL執(zhí)行完成。
instance = o.run_sql('select * from table_name') # 以異步的方式提交
print(instance.get_logview_address()) # 獲取logview地址
instance.wait_for_success() # 阻塞直到完成
設(shè)置運(yùn)行參數(shù)
在運(yùn)行時如果需要設(shè)置參數(shù),您可以通過設(shè)置hints參數(shù)來實現(xiàn),參數(shù)的類型是dict。
o.execute_sql('select * from pyodps_iris', hints={'odps.sql.mapper.split.size': 16})
您可以對于全局配置設(shè)置sql.settings,后續(xù)每次運(yùn)行時則都會添加相關(guān)的運(yùn)行時參數(shù)。
from odps import options
options.sql.settings = {'odps.sql.mapper.split.size': 16}
o.execute_sql('select * from pyodps_iris') # 會根據(jù)全局配置添加hints
讀取SQL執(zhí)行結(jié)果
運(yùn)行SQL的Instance能夠直接執(zhí)行open_reader
操作讀取SQL執(zhí)行結(jié)果。讀取時會出現(xiàn)以下兩種情況:
SQL返回了結(jié)構(gòu)化的數(shù)據(jù)。
with o.execute_sql('select * from table_name').open_reader() as reader: for record in reader: print(record) # 處理每一個record
SQL可能執(zhí)行了
desc
命令,這時可以通過reader.raw
取到原始的SQL執(zhí)行結(jié)果。with o.execute_sql('desc table_name').open_reader() as reader: print(reader.raw)
設(shè)置使用哪種結(jié)果接口
如果您設(shè)置了options.tunnel.use_instance_tunnel == True
,在后續(xù)調(diào)用open_reader
時,PyODPS會默認(rèn)調(diào)用Instance Tunnel, 否則會調(diào)用舊的Result接口。如果您使用了版本較低的 MaxCompute服務(wù),或者調(diào)用Instance Tunnel出現(xiàn)了問題,PyODPS會給出告警并自動降級到舊的Result接口,您可根據(jù)告警信息判斷導(dǎo)致降級的原因。如果Instance Tunnel的返回結(jié)果不合預(yù)期, 您可以將該選項設(shè)為False,在調(diào)用open_reader
時,也可以使用tunnel
參數(shù)來指定使用何種結(jié)果接口。
使用Instance Tunnel
with o.execute_sql('select * from dual').open_reader(tunnel=True) as reader: for record in reader: print(record) # 處理每一個record
使用 Results 接口
with o.execute_sql('select * from dual').open_reader(tunnel=False) as reader: for record in reader: print(record) # 處理每一個record
設(shè)置讀取數(shù)據(jù)規(guī)模
如果您想要限制下載數(shù)據(jù)的規(guī)模,可以為open_reader
增加limit選項, 或者設(shè)置 options.tunnel.limit_instance_tunnel = True
。如果未設(shè)置 options.tunnel.limit_instance_tunnel
,MaxCompute會自動打開數(shù)據(jù)量限制,此時,可下載的數(shù)據(jù)條數(shù)受到Project配置的Tunnel下載數(shù)據(jù)規(guī)模數(shù)限制, 通常該限制為10000條。
設(shè)置讀取結(jié)果為pandas DataFrame
# 直接使用 reader 的 to_pandas 方法
with o.execute_sql('select * from dual').open_reader(tunnel=True) as reader:
# pd_df 類型為 pandas DataFrame
pd_df = reader.to_pandas()
設(shè)置讀取速度(進(jìn)程數(shù))
多進(jìn)程加速僅在 PyODPS 0.11.3 及以上版本中支持。
您可以通過n_process
指定使用進(jìn)程數(shù)。
import multiprocessing
n_process = multiprocessing.cpu_count()
with o.execute_sql('select * from dual').open_reader(tunnel=True) as reader:
# n_process 指定成機(jī)器核數(shù)
pd_df = reader.to_pandas(n_process=n_process)
設(shè)置alias
在運(yùn)行SQL時,如果某個UDF引用的資源是動態(tài)變化的,您可以alias
舊的資源名到新的資源,這樣就可以避免重新刪除并重新創(chuàng)建UDF。
from odps.models import Schema
myfunc = '''\
from odps.udf import annotate
from odps.distcache import get_cache_file
@annotate('bigint->bigint')
class Example(object):
def __init__(self):
self.n = int(get_cache_file('test_alias_res1').read())
def evaluate(self, arg):
return arg + self.n
'''
res1 = o.create_resource('test_alias_res1', 'file', file_obj='1')
o.create_resource('test_alias.py', 'py', file_obj=myfunc)
o.create_function('test_alias_func',
class_type='test_alias.Example',
resources=['test_alias.py', 'test_alias_res1'])
table = o.create_table(
'test_table',
schema=Schema.from_lists(['size'], ['bigint']),
if_not_exists=True
)
data = [[1, ], ]
# 寫入一行數(shù)據(jù),只包含一個值1
o.write_table(table, 0, [table.new_record(it) for it in data])
with o.execute_sql(
'select test_alias_func(size) from test_table').open_reader() as reader:
print(reader[0][0])
res2 = o.create_resource('test_alias_res2', 'file', file_obj='2')
# 把內(nèi)容為1的資源alias成內(nèi)容為2的資源,不需要修改UDF或資源
with o.execute_sql(
'select test_alias_func(size) from test_table',
aliases={'test_alias_res1': 'test_alias_res2'}).open_reader() as reader:
print(reader[0][0])
在交互式環(huán)境執(zhí)行 SQL
在ipython和jupyter里支持使用SQL插件的方式運(yùn)行SQL,且支持參數(shù)化查詢, 詳情參考交互體驗增強(qiáng)文檔。
設(shè)置biz_id
在少數(shù)情形下,在提交SQL時,可能需要同時提交biz_id,否則執(zhí)行會報錯。此時,您可以在全局options里設(shè)置biz_id。
from odps import options
options.biz_id = 'my_biz_id'
o.execute_sql('select * from pyodps_iris')