推薦使用EAS提供的官方SDK進行服務調用,從而有效減少編寫調用邏輯的時間并提高調用穩定性。本文介紹官方Python SDK接口詳情,并以常見類型的輸入輸出為例,提供了使用Python SDK進行服務調用的完整程序示例。
安裝方法
pip install -U eas-prediction --user
接口列表
類 | 接口 | 描述 |
PredictClient |
|
|
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
| 對PredictClient對象進行初始化。在上述設置參數的接口執行完成后,需要調用 | |
|
| |
StringRequest |
|
|
StringResponse |
|
|
TFRequest |
|
|
|
| |
|
| |
|
| |
TFResponse |
|
|
|
| |
TorchRequest |
| TorchRequest類的構造方法。 |
|
| |
|
| |
|
| |
TorchResponse |
|
|
|
| |
QueueClient |
|
|
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
Watcher |
|
|
| 功能:關閉一個Watcher對象,用于關閉后端的數據連接。 說明 一個客戶端只能啟動一個Watcher對象,使用完成后需要將該對象關閉才能啟動新的Watcher對象。 |
程序示例
字符串輸入輸出示例
對于使用自定義Processor部署服務的用戶而言,通常采用字符串進行服務調用(例如,PMML模型服務的調用),具體的Demo程序如下。
#!/usr/bin/env python from eas_prediction import PredictClient from eas_prediction import StringRequest if __name__ == '__main__': client = PredictClient('http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'scorecard_pmml_example') client.set_token('YWFlMDYyZDNmNTc3M2I3MzMwYmY0MmYwM2Y2MTYxMTY4NzBkNzdj****') client.init() request = StringRequest('[{"fea1": 1, "fea2": 2}]') for x in range(0, 1000000): resp = client.predict(request) print(resp)
TensorFlow輸入輸出示例
使用TensorFlow的用戶,需要將TFRequest和TFResponse分別作為輸入和輸出數據格式,具體Demo示例如下。
#!/usr/bin/env python from eas_prediction import PredictClient from eas_prediction import StringRequest from eas_prediction import TFRequest if __name__ == '__main__': client = PredictClient('http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'mnist_saved_model_example') client.set_token('YTg2ZjE0ZjM4ZmE3OTc0NzYxZDMyNmYzMTJjZTQ1YmU0N2FjMTAy****') client.init() #request = StringRequest('[{}]') req = TFRequest('predict_images') req.add_feed('images', [1, 784], TFRequest.DT_FLOAT, [1] * 784) for x in range(0, 1000000): resp = client.predict(req) print(resp)
通過VPC網絡直連方式調用服務的示例
通過網絡直連方式,您只能訪問部署在EAS專屬資源組的服務,且需要為該資源組與用戶指定的vSwitch連通網絡后才能使用。關于如何購買EAS專屬資源組和連通網絡,請參見使用專屬資源組和配置網絡連通。該調用方式與普通調用方式相比,僅需增加一行代碼
client.set_endpoint_type(ENDPOINT_TYPE_DIRECT)
即可,特別適合大流量高并發的服務,具體示例如下。#!/usr/bin/env python from eas_prediction import PredictClient from eas_prediction import StringRequest from eas_prediction import TFRequest from eas_prediction import ENDPOINT_TYPE_DIRECT if __name__ == '__main__': client = PredictClient('http://pai-eas-vpc.cn-hangzhou.aliyuncs.com', 'mnist_saved_model_example') client.set_token('M2FhNjJlZDBmMzBmMzE4NjFiNzZhMmUxY2IxZjkyMDczNzAzYjFi****') client.set_endpoint_type(ENDPOINT_TYPE_DIRECT) client.init() request = TFRequest('predict_images') request.add_feed('images', [1, 784], TFRequest.DT_FLOAT, [1] * 784) for x in range(0, 1000000): resp = client.predict(request) print(resp)
PyTorch輸入輸出示例
使用PyTorch的用戶,需要將TorchRequest和TorchResponse分別作為輸入和輸出數據格式,具體Demo示例如下。
#!/usr/bin/env python from eas_prediction import PredictClient from eas_prediction import TorchRequest if __name__ == '__main__': client = PredictClient('http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'pytorch_gpu_wl') client.init() req = TorchRequest() req.add_feed(0, [1, 3, 224, 224], TorchRequest.DT_FLOAT, [1] * 150528) # req.add_fetch(0) import time st = time.time() timer = 0 for x in range(0, 10): resp = client.predict(req) timer += (time.time() - st) st = time.time() print(resp.get_tensor_shape(0)) # print(resp) print("average response time: %s s" % (timer / 10) )
BladeProcessor輸入輸出示例
使用BladeProcessor的用戶,需要將BladeRequest和BladeResponse分別作為輸入和輸出數據格式,具體Demo示例如下。
#!/usr/bin/env python from eas_prediction import PredictClient from eas_prediction import BladeRequest if __name__ == '__main__': client = PredictClient('http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'nlp_model_example') client.init() req = BladeRequest() req.add_feed('input_data', 1, [1, 360, 128], BladeRequest.DT_FLOAT, [0.8] * 85680) req.add_feed('input_length', 1, [1], BladeRequest.DT_INT32, [187]) req.add_feed('start_token', 1, [1], BladeRequest.DT_INT32, [104]) req.add_fetch('output', BladeRequest.DT_FLOAT) import time st = time.time() timer = 0 for x in range(0, 10): resp = client.predict(req) timer += (time.time() - st) st = time.time() # print(resp) # print(resp.get_values('output')) print(resp.get_tensor_shape('output')) print("average response time: %s s" % (timer / 10) )
兼容EAS默認TensorFlow接口的BladeProcessor輸入輸出示例
BladeProcessor用戶可以使用兼容EAS默認TensorFlow接口的TFRequest與TFResponse作為數據的輸入輸出格式,具體Demo示例如下。
#!/usr/bin/env python from eas_prediction import PredictClient from eas_prediction.blade_tf_request import TFRequest # Need Importing blade TFRequest if __name__ == '__main__': client = PredictClient('http://182848887922****.cn-shanghai.pai-eas.aliyuncs.com', 'nlp_model_example') client.init() req = TFRequest(signature_name='predict_words') req.add_feed('input_data', [1, 360, 128], TFRequest.DT_FLOAT, [0.8] * 85680) req.add_feed('input_length', [1], TFRequest.DT_INT32, [187]) req.add_feed('start_token', [1], TFRequest.DT_INT32, [104]) req.add_fetch('output') import time st = time.time() timer = 0 for x in range(0, 10): resp = client.predict(req) timer += (time.time() - st) st = time.time() # print(resp) # print(resp.get_values('output')) print(resp.get_tensor_shape('output')) print("average response time: %s s" % (timer / 10) )
隊列服務發送、訂閱數據示例
通過QueueClient可向隊列服務中發送數據、查詢數據、查詢隊列服務的狀態以及訂閱隊列服務中的數據推送。以下方的Demo為例,介紹一個線程向隊列服務中推送數據,另外一個線程通過Watcher訂閱隊列服務中推送過來的數據。
#!/usr/bin/env python from eas_prediction import QueueClient import threading if __name__ == '__main__': endpoint = '182848887922****.cn-shanghai.pai-eas.aliyuncs.com' queue_name = 'test_group.qservice/sink' token = 'YmE3NDkyMzdiMzNmMGM3ZmE4ZmNjZDk0M2NiMDA3OTZmNzc1MTUx****' queue = QueueClient(endpoint, queue_name) queue.set_token(token) queue.init() queue.set_timeout(30000) # truncate all messages in the queue attributes = queue.attributes() if 'stream.lastEntry' in attributes: queue.truncate(int(attributes['stream.lastEntry']) + 1) count = 100 # create a thread to send messages to the queue def send_thread(): for i in range(count): index, request_id = queue.put('[{}]') print('send: ', i, index, request_id) # create a thread to watch messages from the queue def watch_thread(): watcher = queue.watch(0, 5, auto_commit=True) i = 0 for x in watcher.run(): print('recv: ', i, x.index, x.tags['requestId']) i += 1 if i == count: break watcher.close() thread1 = threading.Thread(target=watch_thread) thread2 = threading.Thread(target=send_thread) thread1.start() thread2.start() thread1.join() thread2.join()