PythonSDK
安裝
快速安裝
$ sudo pip install pydatahub
源碼安裝
$ git clone https://github.com/aliyun/aliyun-datahub-sdk-python.git
$ cd aliyun-datahub-sdk-python
$ sudo python setup.py install
常見問題
1.如果安裝過程中出現錯誤信息’Python.h: No such file or directory’,常用的操作系統安裝方式如下
$ sudo apt-get install python-dev # for python2.x installs
$ sudo apt-get install python3-dev # for python3.x installs
$ sudo yum install python-devel # for python2.x installs
$ sudo yum install python3-devel # for python3 installs
2.如果使用windows操作系統,根據提示信息可到 此處 下載安裝對應版本的 Visual C++ SDK。Windows 10 安裝cprotobuf依賴時如果報類似如下錯誤,也表示需要安裝Visual C++ 生成工具:
bulding 'cprotobuf.internal' extention
error: [WinError2] The system cannot find the file specified
推薦使用python3.6或以上,會明確提示所需版本及鏈接信息。
3.Windows 下如果安裝依賴時報類似如下錯誤,是環境問題所致,請搜索相關錯誤,根據具體情況,拷貝所需文件,或是直接使用 developer command prompt 工具進行安裝:
LINK : fatal error LNK1158: cannot run 'rc.exe'
4.Windows 7 如果提示如下錯誤,可安裝此 build tools:
error: Microsoft Visual C++ 14.0 is required. Get it with "Microsoft Visual C++ Build Tools": https://visualstudio.microsoft.com/downloads/
安裝驗證
$ python -c "from datahub import DataHub"
如果上述命令執行成功,恭喜你安裝DataHub Python版本SDK成功!
基本概念
詳見: 名詞解釋
準備工作
訪問DataHub服務需要使用阿里云認證賬號,需要提供阿里云accessId及accessKey。 同時需要提供訪問的服務地址。
創建Project
登錄DataHub WebConsole頁面,創建Project
或使用SDK接口進行創建
初始化DataHub
import sys
import traceback
from datahub import DataHub
from datahub.exceptions import ResourceExistException
from datahub.models import FieldType, RecordSchema, TupleRecord, BlobRecord, CursorType, RecordType
access_id = ***your access id***
access_key = ***your access key***
endpoint = ***your datahub server endpoint***
dh = DataHub(access_id, access_key, endpoint)
Project操作
創建示例
project_name = 'project'
comment = 'comment'
try:
dh.create_project(project_name, comment)
print("create project success!")
print("=======================================\n\n")
except ResourceExistException:
print("project already exist!")
print("=======================================\n\n")
except Exception as e:
print(traceback.format_exc())
sys.exit(-1)
Topic操作
Tuple Topic
Tuple類型Topic寫入的數據是有格式的,需要指定Record Schema,目前支持以下幾種數據類型:
類型 | 含義 | 值域 |
Bigint | 8字節有符號整型。請不要使用整型的最小值 (-9223372036854775808),這是系統保留值。 | -9223372036854775807 ~ 9223372036854775807 |
String | 字符串,只支持UTF-8編碼。 | 單個String列最長允許1MB。 |
Boolean | 布爾型。 | 可以表示為True/False,true/false, 0/1 |
Double | 8字節雙精度浮點數。 | -1.0 10308 ~ 1.0 10308 |
TimeStamp | 時間戳類型 | 表示到微秒的時間戳類型 |
創建示例
topic_name = "tuple_topic"
shard_count = 3
life_cycle = 7
record_schema = RecordSchema.from_lists(
['bigint_field', 'string_field', 'double_field', 'bool_field', 'time_field'],
[FieldType.BIGINT, FieldType.STRING, FieldType.DOUBLE, FieldType.BOOLEAN, FieldType.TIMESTAMP])
try:
dh.create_tuple_topic(project_name, topic_name, shard_count, life_cycle, record_schema, comment)
print("create tuple topic success!")
print("=======================================\n\n")
except ResourceExistException:
print("topic already exist!")
print("=======================================\n\n")
except Exception as e:
print(traceback.format_exc())
sys.exit(-1)
Blob Topic
Blob類型Topic支持寫入一塊二進制數據作為一個Record,數據將會以BASE64編碼傳輸。
topic_name = "blob_topic"
shard_count = 3
life_cycle = 7
try:
dh.create_blob_topic(project_name, topic_name, shard_count, life_cycle, comment)
print("create blob topic success!")
print("=======================================\n\n")
except ResourceExistException:
print("topic already exist!")
print("=======================================\n\n")
except Exception as e:
print(traceback.format_exc())
sys.exit(-1)
數據發布/訂閱
獲取Shard列表
list_shards接口獲取topic下的所有shard
shard_result = dh.list_shard(project_name, topic_name)
shards = shard_result.shards
print(len(shards))
返回結果是一個ListShardResult對象,包含一個Shard對象的list,list中的每個元素是一個shard,可以獲取shard_id,state狀態,begin_hash_key,end_hash_key等信息
發布數據
put_records接口向一個topic發布數據
put_result = dh.put_records(project_name, topic_name, records)
print(put_result.failed_record_count)
print(put_result.failed_records)
其中傳入參數records是一個List對象,每個元素為一個record,但是必須為相同類型的record,即Tuple類型或者Blob類型,返回結果為PutRecordsResult對象,包含failed_record_count和failed_records成員,failed_records是一個FailedRecord對象的list,FailedRecord對象包含成員index,error_code和error_message
寫入Tuple類型Record示例
try:
# block等待所有shard狀態ready
dh.wait_shards_ready(project_name, topic_name)
print("shards all ready!!!")
print("=======================================\n\n")
topic_result = dh.get_topic(project_name, topic_name)
print(topic_result)
if topic_result.record_type != RecordType.TUPLE:
print("topic type illegal!")
sys.exit(-1)
print("=======================================\n\n")
record_schema = topic_result.record_schema
records0 = []
record0 = TupleRecord(schema=record_schema, values=[1, 'yc1', 10.01, True, 1455869335000000])
record0.shard_id = '0'
record0.put_attribute('AK', '47')
records0.append(record0)
record1 = TupleRecord(schema=record_schema)
record1.set_value('bigint_field', 2)
record1.set_value('string_field', 'yc2')
record1.set_value('double_field', None)
record1.set_value('bool_field', False)
record1.set_value('time_field', 1455869335000011)
record1.hash_key = '4FFFFFFFFFFFFFFD7FFFFFFFFFFFFFFD'
records0.append(record1)
record2 = TupleRecord(schema=record_schema)
record2.set_value(0, 3)
record2.set_value(1, 'yc3')
record2.set_value(2, 1.1)
record2.set_value(3, False)
record2.set_value(4, 1455869335000011)
record2.attributes = {'key': 'value'}
record2.partition_key = 'TestPartitionKey'
records0.append(record2)
put_result = dh.put_records(project_name, topic_name, records0)
print(put_result)
print("put tuple %d records, failed count: %d" %(len(records0), put_result.failed_record_count))
# failed_record_count如果大于0最好對failed record再進行重試
print("=======================================\n\n")
except DatahubException as e:
print(e)
sys.exit(-1)
寫入BLOB類型Record示例
try:
records1 = []
record3 = BlobRecord(blob_data='data')
record3.shard_id = '0'
record3.put_attribute('a', 'b')
records1.append(record3)
put_result = dh.put_records(project_name, topic_name, records1)
print(put_result)
except DatahubException as e:
print(e)
sys.exit(-1)
獲取cursor
獲取Cursor,可以通過三種方式獲取:OLDEST, LATEST, SYSTEM_TIME
OLDEST: 表示獲取的cursor指向當前有效數據中時間最久遠的record
LATEST: 表示獲取的cursor指向當前最新的record
SYSTEM_TIME: 表示獲取的cursor指向大于等于該時間(單位毫秒)的第一條record
shard_id = '0'
time_stamp = 0
cursor_result0 = dh.get_cursor(project_name, topic_name, shard_id, CursorType.OLDEST)
cursor_result1 = dh.get_cursor(project_name, topic_name, shard_id, CursorType.LATEST)
cursor_result2 = dh.get_cursor(project_name, topic_name, shard_id, CursorType.SYSTEM_TIME, time_stamp)
cursor = cursor_result0.cursor
通過get_cursor接口獲取用于讀取指定位置之后數據的cursor
訂閱數據
從指定shard讀取數據,需要指定從哪個Cursor開始讀,并指定讀取的上限數據條數,如果從Cursor到shard結尾少于Limit條數的數據,則返回實際的條數的數據。
project_name = 'project'
shard_id = "0"
limit = 10
# 讀取blob topic的record
topic_name = 'blob_topic'
get_result = dh.get_blob_records(project_name, topic_name, shard_id, cursor, limit)
# 讀取tuple topic的record
topic_name = 'tuple_topic'
get_result = dh.get_tuple_records(project_name, topic_name, shard_id, record_schema, cursor, limit)
消費Tuple類型Record示例
try: # block等待所有shard狀態ready dh.wait_shards_ready(project_name, topic_name) print("shards all ready!!!") print("=======================================\n\n") topic_result = dh.get_topic(project_name, topic_name) print(topic_result) if topic_result.record_type != RecordType.TUPLE: print("topic type illegal!") sys.exit(-1) print("=======================================\n\n") shard_id = '0' limit = 10 cursor_result = dh.get_cursor(project_name, topic_name, shard_id, CursorType.OLDEST) cursor = cursor_result.cursor while True: get_result = dh.get_tuple_records(project_name, topic_name, shard_id, record_schema, cursor, limit) for record in get_result.records: print(record) if 0 == get_result.record_count: time.sleep(1) cursor = get_result.next_cursor except DatahubException as e: print(e) sys.exit(-1)
結尾