Go SDK介紹
快速上手
DataHub相關的基本概念
詳情參見DataHub基本概念
準備工作
訪問DataHub服務需要使用阿里云認證賬號,需要提供阿里云accessId及accessKey。 同時需要提供可訪問的DataHub服務地址。
獲取Datahub Go SDK包
go get -u -insecure github.com/aliyun/aliyun-datahub-sdk-go/datahub
初始化DataHub Client
Datahub GO SDK提供的所有API接口均由 datahub.DataHub 接口實現,所以第一步就是初始化一個DataHub對象。1). 使用默認參數創建DataHub對象
import "github.com/aliyun/aliyun-datahub-sdk-go/datahub"
accessId := ""
accessKey := ""
endpoint := ""
dh := datahub.New(accessId, accessKey, endpoint)
2). 使用自定義參數創建對象目前支持配置的參數有:
參數 | 參數類型 | 默認值 | 有效值 | 描述 |
UserAgent | string | - | - | 用戶代理 |
CompressorType | CompressorType | NOCOMPRESS | NOCOMPRESS(不壓縮)、LZ4、DEFLATE、ZLIB | 傳輸時支持的壓縮格式 |
EnableBinary | bool | true | true/false | 主要在put/get record時,使用protobuf協議。DataHub版本未支持protobuf時需要手動指定enable_pb為False |
HttpClient | *http.Client | datahub.DefaultHttpClient() | - | 具體可參考net/http |
endpoint := ""
accessId := ""
accessKey := ""
token := ""
account := datahub.NewAliyunAccount(accessId, accessKey)
// 臨時AK鑒權
// account := datahub.NewStsCredential(accessId, accessKey, token)
config := datahub.NewDefaultConfig()
config.CompressorType = datahub.DEFLATE
config.EnableBinary = true;
config.HttpClient = datahub.DefaultHttpClient()
dh := datahub.NewClientWithConfig(endpoint, config, account)
DataHub GO SDK 支持使用GO MOD進行包管理
require (
github.com/aliyun/aliyun-datahub-sdk-go/datahub v0.1.4
)
點位消費示例
func OffsetConsume() {
accessId := ""
accessKey := ""
endpoint := "https://dh-cn-hangzhou.aliyuncs.com"
dh := datahub.New(accessId, accessKey, endpoint)
projectName := ""
topicName := ""
subId := ""
shardId := "0"
shardIds := []string{"0"}
session, err := dh.OpenSubscriptionSession(projectName, topicName, subId, shardIds)
if err != nil {
fmt.Println("Open subscription session failed", err)
return
}
offset := session.Offsets[shardId]
var gc *datahub.GetCursorResult = nil
//sequence < 0說明未消費
if offset.Sequence < 0 {
// 獲取生命周期內第一條record的cursor
gc, err = dh.GetCursor(projectName, topicName, shardId, datahub.OLDEST)
if err != nil {
fmt.Println("Get oldest cursor failed", err)
return
}
} else {
// 獲取下一條記錄的Cursor
nextSequence := offset.Sequence + 1
gc, err = dh.GetCursor(projectName, topicName, shardId, datahub.SEQUENCE, nextSequence)
if err != nil {
//按照SEQUENCE getCursor可能報SeekOutOfRange錯誤,表示當前cursor的數據已過期
if _, ok := err.(*datahub.SeekOutOfRangeError); ok {
fmt.Println("Get cursor by sequence success for SeekOutOfRangeError, will retry...")
gc, err = dh.GetCursor(projectName, topicName, shardId, datahub.OLDEST)
if err != nil {
fmt.Println("Get oldest cursor failed", err)
return
}
}
}
}
topic, err := dh.GetTopic(projectName, topicName)
if err != nil {
fmt.Println("Get topic failed", err)
return
}
// 讀取并保存點位,這里以讀取Tuple數據為例,并且每1000條記錄保存一次點位
recordCount := 0
limitNum := 100
cursor := gc.Cursor
for true {
gr, err := dh.GetTupleRecords(projectName, topicName, shardId, cursor, limitNum, topic.RecordSchema)
if err != nil {
fmt.Println("Get records failed", err)
break
}
if gr.RecordCount == 0 {
fmt.Println("No data, sleep 5 seconds...")
time.Sleep(time.Second * 5)
continue
}
for _, record := range gr.Records {
// 處理數據,這里只打印
data, _ := record.(*datahub.TupleRecord)
fmt.Println(data.Values)
recordCount += 1
// 每1000條數據提交一次點位信息
if recordCount%1000 == 0 {
fmt.Println("Commit offset", record.GetSequence())
offset.Sequence = record.GetSequence()
offset.Timestamp = record.GetSystemTime()
offsetMap := map[string]datahub.SubscriptionOffset{shardId: offset}
err := dh.CommitSubscriptionOffset(projectName, topicName, subId, offsetMap)
if err != nil {
if _, ok := err.(*datahub.SubscriptionOffsetResetError); ok {
fmt.Println("Subscription reset, will reopen...")
// 點位被重置,需要重新open session
session, err = dh.OpenSubscriptionSession(projectName, topicName, subId, shardIds)
if err != nil {
fmt.Println("Reopen subscription session failed", err)
break
}
offset = session.Offsets[shardId]
} else if _, ok := err.(*datahub.SubscriptionOffsetResetError); ok {
fmt.Println("Subscription used by other one")
break
} else {
fmt.Println("Commit offset failed", err)
break
}
}
recordCount = 0
}
}
cursor = gr.NextCursor
}
}
接口示例
project 操作
項目(Project)是DataHub數據的基本組織單元,下面包含多個Topic。值得注意的是,DataHub的項目空間與MaxCompute的項目空間是相互獨立的。用戶在MaxCompute中創建的項目不能復用于DataHub,需要獨立創建。
創建Project
CreateProject(projectName, comment string) error
參數
projectName: project name
comment: project comment
return
error
ResourceExistError
AuthorizationFailedError
DatahubClientError
InvalidParameterError
示例
func createProjet(dh datahub.DataHub, projectName string) {
if err := dh.CreateProject(projectName, "project comment"); err != nil {
fmt.Println("create project failed")
fmt.Println(err)
return
}
fmt.Println("create successful")
}
刪除Project
DeleteProject接口刪除project。
DeleteProject(projectName string) error
參數
projectName: project name
return
error
ResourceNotFoundError
AuthorizationFailedError
DatahubClientError
InvalidParameterError
示例
func deleteProject(dh datahub.DataHub, projectName string) {
if err := dh.DeleteProject("123"); err != nil {
fmt.Println("delete project failed")
fmt.Println(err)
return
}
fmt.Println("delete project successful")
}
列出Project
ListProject 接口列出project。
ListProject() (*ListProjectResult, error)
參數
return
1. type ListProjectResult struct {
2. ProjectNames []string `json:"ProjectNames"`
3. }
error
AuthorizationFailedError
DatahubClientError
示例
func listProject(dh datahub.DataHub, projectName string) {
lp, err := dh.ListProject()
if err != nil {
fmt.Println("get project list failed")
fmt.Println(err)
return
}
fmt.Println("get project list successful")
for _, projectName := range lp.ProjectNames {
fmt.Println(projectName)
}
}
查詢Project
GetProject查詢project
GetProject(projectName string) (*GetProjectResult, error)
參數
projectName: project name
return
type GetProjectResult struct {
CreateTime int64 `json:"CreateTime"`
LastModifyTime int64 `json:"LastModifyTime"`
Comment string `json"Comment"`
}
error
ResourceNotFoundError
AuthorizationFailedError
DatahubClientError
InvalidParameterError
示例
func getProject(dh datahub.DataHub, projectName string) {
gp, err := dh.GetProject(projectName)
if err != nil {
fmt.Println("get project message failed")
fmt.Println(err)
return
}
fmt.Println("get project message successful")
fmt.Println(*gp)
}
更新project
UpdateProject(projectName, comment string) error
參數
projectName: project name
comment: project comment
return
error
ResourceNotFoundError
AuthorizationFailedError
DatahubClientError
InvalidParameterError
示例
1. func updateProject(dh datahub.DataHub, projectName string) {
2. if err := dh.UpdateProject(projectName, "new project comment"); err != nil {
3. fmt.Println("update project comment failed")
4. fmt.Println(err)
5. return
6. }
7. fmt.Println("update project comment successful")
8. }
topic操作
Topic是 DataHub 訂閱和發布的最小單位,用戶可以用Topic來表示一類或者一種流數據。目前支持Tuple與Blob兩種類型。Tuple類型的Topic支持類似于數據庫的記錄的數據,每條記錄包含多個列。Blob類型的Topic僅支持寫入一塊二進制數據。
創建Topic
Tuple Topic
CreateTupleTopic(projectName, topicName, comment string, shardCount, lifeCycle int, recordSchema *RecordSchema) error
Tuple類型Topic寫入的數據是有格式的,需要指定Record Schema,目前支持以下幾種數據類型:
類型 | 含義 | 值域 |
BIGINT | 8字節有符號整型 | -9223372036854775807 ~ 9223372036854775807 |
DOUBLE | 8字節雙精度浮點數 | -1.0 _10^308 ~ 1.0 _10^308 |
BOOLEAN | 布爾類型 | True/False或true/false或0/1 |
TIMESTAMP | 時間戳類型 | 表示到微秒的時間戳類型 |
STRING | 字符串,只支持UTF-8編碼 | 單個STRING列最長允許2MB |
DECIMAL | s |
參數
projectName: project name
topicName: topic name
comment: topic comment
lifeCycle: The expire time of the data (Unit: DAY). The data written before that time is not accessible.
recordSchema: The records schema of this topic.
return
error
ResourceExistError
AuthorizationFailedError
DatahubClientError
InvalidParameterError
示例
func Example_CreateTupleTopic(dh datahub.DataHub, projectName, topicName string) {
recordSchema := datahub.NewRecordSchema()
recordSchema.AddField(datahub.Field{Name: "bigint_field", Type: datahub.BIGINT, AllowNull: true}).
AddField(datahub.Field{Name: "timestamp_field", Type: datahub.TIMESTAMP, AllowNull: false}).
AddField(datahub.Field{Name: "string_field", Type: datahub.STRING}).
AddField(datahub.Field{Name: "double_field", Type: datahub.DOUBLE}).
AddField(datahub.Field{Name: "boolean_field", Type: datahub.BOOLEAN})
if err := dh.CreateTupleTopic(projectName, topicName, "topic comment", 5, 7, recordSchema); err != nil {
fmt.Println("create topic failed")
fmt.Println(err)
return
}
fmt.Println("create topic successful")
}
Blob Topic
CreateBlobTopic(projectName, topicName, comment string, shardCount, lifeCycle int) error
參數
projectName: project name
topicName: topic name
comment: topic comment
lifeCycle: The expire time of the data (Unit: DAY). The data written before that time is not accessible.
return
error
ResourceExistError
AuthorizationFailedError
DatahubClientError
InvalidParameterError
示例
func Example_CreateBlobTopic(dh datahub.DataHub, projectName, topicName string) {
if err := dh.CreateBlobTopic(projectName, topicName, "topic comment", 5, 7); err != nil {
fmt.Println("create topic failed")
fmt.Println(err)
return
}
fmt.Println("create topic successful")
}
刪除Topic
DeleteTopic(projectName, topicName string) error
參數
projectName: project name
topicName: topic name
return
error
ResourceNotFoundError
AuthorizationFailedError
DatahubClientError
InvalidParameterError
示例
func ExampleDataHub_DeleteTopic(dh datahub.DataHub, projectName, topicName string) {
if err := dh.DeleteTopic(projectName, topicName); err != nil {
fmt.Println("delete failed")
fmt.Println(err)
return
}
fmt.Println("delete successful")
}
列出Topic
ListTopic(projectName string) (*ListTopicResult, error)
參數
projectName: project name
return
type ListTopicResult struct {
TopicNames [] string `json:"TopicNames"`
}
error
ResourceNotFoundError
AuthorizationFailedError
DatahubClientError
InvalidParameterError
示例
func ExampleDataHub_ListTopic(dh datahub.DataHub, projectName, topicName string) {
lt, err := dh.ListTopic(projectName)
if err != nil {
fmt.Println("get topic list failed")
fmt.Println(err)
return
}
fmt.Println("get topic list successful")
fmt.Println(lt)
}
更新Topic
UpdateTopic(projectName, topicName, comment string) error
參數
projectName: project name
topicName: topic name
comment: topic comment
return
error
ResourceNotFoundError
AuthorizationFailedError
DatahubClientError
InvalidParameterError
示例
func ExampleDataHub_UpdateTopic(dh datahub.DataHub, projectName, topicName string) {
if err := dh.UpdateTopic(projectName, topicName, "new topic comment"); err != nil {
fmt.Println("update topic comment failed")
fmt.Println(err)
return
}
fmt.Println("update topic comment successful")
}
schema類型
schema是用來標明數據存儲的名稱和對應類型的,在創建tuple topic 和讀寫 record 的時候用到。因為網絡傳輸中,數據都是以字符串的形式發送,需要schema來轉換成對應的類型。schema就是一個Field對象的slice,Field包含三個參數,第一個參數是field的名稱,第二個是field的類型,第三個參數是bool值,True表示field的值允許為空, False表示field的值不能為空。
獲取schema
對于已創建的Tuple topic,可以使用get_topic接口來獲取schema信息
示例
func getSchema(dh datahub.DataHub, projectName, topicName string) {
gt, err := dh.GetTopic(projectName, "topic_test")
if err != nil {
fmt.Println("get topic failed")
fmt.Println(err)
return
} else {
schema := gt.RecordSchema
fmt.Println(schema)
}
}
定義schema
要創建新的tuple topic,需要自己定義schema,schema可以通過以下方式進行初始化。
直接創建
func createSchema1() {
fields := []datahub.Field{
{"bigint_field", datahub.BIGINT, true},
{"timestamp_field", datahub.TIMESTAMP, false},
{"string_field", datahub.STRING, false},
{"double_field", datahub.DOUBLE, false},
{"boolean_field", datahub.BOOLEAN, true},
{"decimal_field", datahub.DECIMAL, false},
}
schema := datahub.RecordSchema{
fields,
}
fmt.Println(schema)
}
逐個對schema進行set
func createSchema2() {
recordSchema := datahub.NewRecordSchema()
recordSchema.AddField(datahub.Field{Name: "bigint_field", Type: datahub.BIGINT, AllowNull: true}).
AddField(datahub.Field{Name: "timestamp_field", Type: datahub.TIMESTAMP, AllowNull: false}).
AddField(datahub.Field{Name: "string_field", Type: datahub.STRING}).
AddField(datahub.Field{Name: "double_field", Type: datahub.DOUBLE}).
AddField(datahub.Field{Name: "boolean_field", Type: datahub.BOOLEAN}).
AddField(datahub.Field{Name: "decimal_field", Type: datahub.DECIMAL})
}
通過JSON字符串定義schema
func createSchema3() {
str := ""
schema, err := datahub.NewRecordSchemaFromJson(str)
if err != nil {
fmt.Println("create recordSchema failed")
fmt.Println(err)
return
}
fmt.Println("create recordSchema successful")
fmt.Println(schema)
}
JSON字符串的格式如下:“{“fields”:[{“type”:”BIGINT”,”name”:”a”},{“type”:”STRING”,”name”:”b”}]}”
shard 操作
Shard表示對一個Topic進行數據傳輸的并發通道,每個Shard會有對應的ID。每個Shard會有多種狀態:Opening - 啟動中,Active - 啟動完成可服務。每個Shard啟用以后會占用一定的服務端資源,建議按需申請Shard數量。shard可以進行合并和分裂,當數據量增大時,可以采用分裂shard來增加數據通道,提高數據寫入的并發量,當數據量減小時,應該合并shard減少服務器資源浪費。例如淘寶在雙11期間,數據量驟增,這個時候每個shard的寫入壓力過大,便可以增加shard提高寫入效率,在雙11過后,數據量明顯降低,則需要合并shard。
列出shard
ListShard(projectName, topicName string) (*ListShardResult, error)
參數
projectName: project name
topicName: topic name
return
1. type SplitShardResult struct {
2. NewShards []ShardEntry `json:"NewShards"`
3. }
error
ResourceNotFoundError
AuthorizationFailedError
DatahubClientError
InvalidParameterError
示例
func ExampleDataHub_ListShard() {
ls, err := dh.ListShard(projectName, topicName)
if err != nil {
fmt.Println("get shard list failed")
fmt.Println(err)
return
}
fmt.Println("get shard list successful")
for _, shard := range ls.Shards {
fmt.Println(shard)
}
}
分裂shard
只有處于ACTIVE狀態的shard才可以進行分裂,分裂成功后,會生成兩個新的shard,同時原shard狀態會變為CLOSED。分裂shard時,需要指定splitKey,可以采用系調用第一個method,系統將會自動生成spiltKey,如果有特殊需求,則可以采用第二個method自己指定spiltKey。spiltKey規則可以參考基本概念中的Shard Hash Key Range。
SplitShard(projectName, topicName, shardId string) (SplitShardResult, error) SplitShardWithSplitKey(projectName, topicName, shardId, splitKey string) (SplitShardResult, error)
參數
projectName: project name
topicName: topic name
shardId: The shard which to split
splitKey: The split key which is used to split shard
return
type SplitShardResult struct {
NewShards []ShardEntry `json:"NewShards"`
}
error
ResourceNotFoundError
AuthorizationFailedError
DatahubClientError
InvalidParameterError
示例
func ExampleDataHub_SplitShard() {
// the shardId of you want to split
shardId := "0"
ss, err := dh.SplitShard(projectName, topicName, shardId)
if err != nil {
fmt.Println("split shard failed")
fmt.Println(err)
return
}
fmt.Println("split shard successful")
fmt.Println(ss)
// After splitting, you need to wait for all shard states to be ready
// before you can perform related operations.
dh.WaitAllShardsReady(projectName, topicName)
}
合并shard
合并兩個shard時,要求兩個shard必須是相鄰的,并且狀態都是ACTIVE。
MergeShard(projectName, topicName, shardId, adjacentShardId string) (*MergeShardResult, error)
參數
projectName: project name
topicName: topic name
shardId: The shard which will be merged
adjacentShardId: The adjacent shard of the specified shard.
示例
return
type MergeShardResult struct {
ShardId string `json:"ShardId"`
BeginHashKey string `json:"BeginHashKey"`
EndHashKey string `json:"EndHashKey"`
}
error
ResourceNotFoundError
InvalidOperationError
AuthorizationFailedError
DatahubClientError
InvalidParameterError
ShardSealedError
func ExampleDataHub_MergeShard() {
shardId := "3"
adjacentShardId := "4"
ms, err := dh.MergeShard(projectName, topicName, shardId, adjacentShardId)
if err != nil {
fmt.Println("merge shard failed")
fmt.Println(err)
return
}
fmt.Println("merge shard successful")
fmt.Println(ms)
// After splitting, you need to wait for all shard states to be ready
// before you can perform related operations.
dh.WaitAllShardsReady(projectName, topicName)
}
數據發布/訂閱
處于ACTIVE和CLOSED狀態的shard都可以進行數據訂閱,但是只有處于ACTIVE狀態的shard可以進行數據發布,向CLOSED狀態的shard發布數據會直接返回ShardSealedError錯誤,處于CLOSED狀態的shard讀取數據到末尾時也會返回ShardSealedError錯誤,表示不會有新的數據。
發布數據
向某個topic下發布數據記錄時,每條數據記錄需要指定該topic下的一個shard, 因此一般需要通過 listShard 接口查看下當前topic下的shard列表。使用PutRecords接口時注意檢查返回結果是否數據發布失敗的情況。
PutRecords(projectName, topicName string, records []IRecord) (*PutRecordsResult, error) PutRecordsByShard(projectName, topicName, shardId string, records []IRecord) error
服務器2.12版本及之后版本開始支持PutRecordsByShard接口,低版本請使用PutRecords接口。
參數
projectName: project name
topicName: topic name
shardId : id of shard
records: Records list to written.
return
type PutRecordsResult struct {
FailedRecordCount int `json:"FailedRecordCount"`
FailedRecords []FailedRecord `json:"FailedRecords"`
}
error
ResourceNotFoundError
AuthorizationFailedError
DatahubClientError
InvalidParameterError
示例
// put tuple data
func putTupleData() {
topic, err := dh.GetTopic(projectName, topicName)
if err != nil {
fmt.Println("get topic failed")
fmt.Println(err)
return
}
fmt.Println("get topic successful")
records := make([]datahub.IRecord, 3)
record1 := datahub.NewTupleRecord(topic.RecordSchema, 0)
record1.ShardId = "0"
record1.SetValueByName("field1", "TEST1")
record1.SetValueByName("field2", 1)
//you can add some attributes when put record
record1.SetAttribute("attribute", "test attribute")
records[0] = record1
record2 := datahub.NewTupleRecord(topic.RecordSchema, 0)
record2.ShardId = "1"
record2.SetValueByName("field1", datahub.String("TEST2"))
record2.SetValueByName("field2", datahub.Bigint(2))
records[1] = record2
record3 := datahub.NewTupleRecord(topic.RecordSchema, 0)
record3.ShardId = "2"
record3.SetValueByName("field1", datahub.String("TEST3"))
record3.SetValueByName("field2", datahub.Bigint(3))
records[2] = record3
maxReTry := 3
retryNum := 0
for retryNum < maxReTry {
result, err := dh.PutRecords(projectName, topicName, records)
if err != nil {
if _, ok := err.(*datahub.LimitExceededError); ok {
fmt.Println("maybe qps exceed limit,retry")
retryNum++
time.Sleep(5 * time.Second)
continue
} else {
fmt.Println("put record failed")
fmt.Println(err)
return
}
}
fmt.Printf("put successful num is %d, put records failed num is %d\n", len(records)-result.FailedRecordCount, result.FailedRecordCount)
for _, v := range result.FailedRecords {
fmt.Println(v)
}
break
}
if retryNum >= maxReTry {
fmt.Printf("put records failed ")
}
}
// put blob data
func putBlobData() {
records := make([]datahub.IRecord, 3)
record1 := datahub.NewBlobRecord([]byte("blob test1"), 0)
record1.ShardId = "0"
records[0] = record1
record2 := datahub.NewBlobRecord([]byte("blob test2"), 0)
record2.ShardId = "1"
record2.SetAttribute("attribute", "test attribute")
records[1] = record2
record3 := datahub.NewBlobRecord([]byte("blob test3"), 0)
record3.ShardId = "2"
records[2] = record3
maxReTry := 3
retryNum := 0
for retryNum < maxReTry {
result, err := dh.PutRecords(projectName, blobTopicName, records)
if err != nil {
if _, ok := err.(*datahub.LimitExceededError); ok {
fmt.Println("maybe qps exceed limit,retry")
retryNum++
time.Sleep(5 * time.Second)
continue
} else {
fmt.Println("put record failed")
fmt.Println(err)
return
}
}
fmt.Printf("put successful num is %d, put records failed num is %d\n", len(records)-result.FailedRecordCount, result.FailedRecordCount)
for _, v := range result.FailedRecords {
fmt.Println(v)
}
break
}
if retryNum >= maxReTry {
fmt.Printf("put records failed ")
}
}
// put data by shard
func putDataByShard() {
shardId := "0"
records := make([]datahub.IRecord, 3)
record1 := datahub.NewBlobRecord([]byte("blob test1"), 0)
records[0] = record1
record2 := datahub.NewBlobRecord([]byte("blob test2"), 0)
record2.SetAttribute("attribute", "test attribute")
records[1] = record2
record3 := datahub.NewBlobRecord([]byte("blob test3"), 0)
records[2] = record3
maxReTry := 3
retryNum := 0
for retryNum < maxReTry {
if err := dh.PutRecordsByShard(projectName, blobTopicName, shardId, records); err != nil {
if _, ok := err.(*datahub.LimitExceededError); ok {
fmt.Println("maybe qps exceed limit,retry")
retryNum++
time.Sleep(5 * time.Second)
continue
} else {
fmt.Println("put record failed")
fmt.Println(err)
return
}
}
}
if retryNum >= maxReTry {
fmt.Printf("put records failed ")
}else {
fmt.Println("put record successful")
}
}
除了數據本身以外,在進行數據發布時,還可以添加和數據相關的額外信息,例如數據采集場景等。添加方式為
record1 := datahub.NewTupleRecord(topic.RecordSchema, 0)
record1.SetAttribute("attribute","test attribute")
record2 := datahub.NewBlobRecord([]byte("blob test2"), 0)
record2.SetAttribute("attribute","test attribute")
訂閱數據
訂閱一個topic下的數據,同樣需要指定對應的shard,同時需要指定讀取游標位置,通過 getCursor 接口獲取。
GetCursor(projectName, topicName, shardId string, ctype CursorType, param …int64) (*GetCursorResult, error)
參數
projectName: project name
topicName: topic name
shardId: The id of the shard.
ctype: Which type used to get cursor.可以通過四種方式獲?。篛LDEST, LATEST, SEQUENCE, SYSTEM_TIME。
OLDEST: 表示獲取的cursor指向當前有效數據中時間最久遠的record
LATEST: 表示獲取的cursor指向當前最新的record
SEQUENCE: 表示獲取的cursor指向該序列的record
SYSTEM_TIME: 表示獲取的cursor指向該時間之后接收到的第一條record
param: Parameter used to get cursor.when use SEQUENCE and SYSTEM_TIME need to be set.
return
type GetCursorResult struct {
Cursor string `json:"Cursor"`
RecordTime int64 `json:"RecordTime"`
Sequence int64 `json:"Sequence"`
}
error
ResourceNotFoundError
SeekOutOfRangeError
AuthorizationFailedError
DatahubClientError
InvalidParameterError
ShardSealedError
示例
func cursor(dh datahub.DataHub, projectName, topicName string) {
shardId := "0"
gr, err := dh.GetCursor(projectName, topicName, shardId, datahub.OLDEST)
if err != nil {
fmt.Println("get cursor failed")
fmt.Println(err)
}else{
fmt.Println(gr)
}
gr, err = dh.GetCursor(projectName, topicName, shardId, datahub.LATEST)
fmt.Println(err)
fmt.Println(gr)
var seq int64 = 10
gr, err = dh.GetCursor(projectName, topicName, shardId, datahub.SEQUENCE, seq)
if err != nil {
fmt.Println("get cursor failed")
fmt.Println(err)
}else{
fmt.Println(gr)
}
}
從指定shard讀取數據,需要指定從哪個cursor開始讀,并指定讀取的上限數據條數,如果從cursor到shard結尾少于Limit條數的數據,則返回實際的條數的數據。
Tuple topic data
GetTupleRecords(projectName, topicName, shardId, cursor string, limit int, recordSchema _RecordSchema) (_GetRecordsResult, error)
參數
projectName: project name
topicName: topic name
shardId: The id of the shard.
cursor: The start cursor used to read data.
limit:Max record size to read.
recordSchema: RecordSchema for the topic.
return
type GetRecordsResult struct {
NextCursor string `json:"NextCursor"`
RecordCount int `json:"RecordCount"`
StartSequence int64 `json:"StartSeq"`
Records []IRecord `json:"Records"`
RecordSchema *RecordSchema `json:"-"`
}
error
ResourceNotFoundError
AuthorizationFailedError
DatahubClientError
InvalidParameterError
示例
func getTupleData() {
shardId := "1"
topic, err := dh.GetTopic(projectName, topicName)
if err != nil {
fmt.Println("get topic failed")
return
}
fmt.Println("get topic successful")
cursor, err := dh.GetCursor(projectName, topicName, shardId, datahub.OLDEST)
if err != nil {
fmt.Println("get cursor failed")
fmt.Println(err)
return
}
fmt.Println("get cursor successful")
limitNum := 100
maxReTry := 3
retryNum := 0
for retryNum < maxReTry {
gr, err := dh.GetTupleRecords(projectName, topicName, shardId, cursor.Cursor, limitNum, topic.RecordSchema)
if err != nil {
if _, ok := err.(*datahub.LimitExceededError); ok {
fmt.Println("maybe qps exceed limit,retry")
retryNum++
time.Sleep(5 * time.Second)
continue
} else {
fmt.Println("get record failed")
fmt.Println(err)
return
}
}
fmt.Println("get record successful")
for _, record := range gr.Records {
data, ok := record.(*datahub.TupleRecord)
if !ok {
fmt.Printf("record type is not TupleRecord, is %v\n", reflect.TypeOf(record))
} else {
fmt.Println(data.Values)
}
}
break
}
if retryNum >= maxReTry {
fmt.Printf("get records failed ")
}
}
Blob topic data
GetBlobRecords(projectName, topicName, shardId, cursor string, limit int) (*GetRecordsResult, error)
參數
projectName: project name
topicName: topic name
shardId: The id of the shard.
cursor: The start cursor used to read data.
limit:Max record size to read.
return
type GetRecordsResult struct {
NextCursor string `json:"NextCursor"`
RecordCount int `json:"RecordCount"`
StartSequence int64 `json:"StartSeq"`
Records []IRecord `json:"Records"`
RecordSchema *RecordSchema `json:"-"`
}
error
ResourceNotFoundError
AuthorizationFailedError
DatahubClientError
InvalidParameterError
示例
func getBlobData() {
shardId := "1"
cursor, err := dh.GetCursor(projectName, blobTopicName, shardId, datahub.OLDEST)
if err != nil {
fmt.Println("get cursor failed")
fmt.Println(err)
return
}
fmt.Println("get cursor successful")
limitNum := 100
maxReTry := 3
retryNum := 0
for retryNum < maxReTry {
gr, err := dh.GetBlobRecords(projectName, blobTopicName, shardId, cursor.Cursor, limitNum)
if err != nil {
if _, ok := err.(*datahub.LimitExceededError); ok {
fmt.Println("maybe qps exceed limit,retry")
retryNum++
time.Sleep(5 * time.Second)
continue
} else {
fmt.Println("get record failed")
fmt.Println(err)
return
}
}
fmt.Println("get record successful")
for _, record := range gr.Records {
data, ok := record.(*datahub.BlobRecord)
if !ok {
fmt.Printf("record type is not TupleRecord, is %v\n", reflect.TypeOf(record))
} else {
fmt.Println(data.StoreData)
}
}
break
}
if retryNum >= maxReTry {
fmt.Printf("get records failed ")
}
}
meter操作
metering info是對shard的資源占用情況的統計信息,一小時更新一次。
GetMeterInfo(projectName, topicName, shardId string) (*GetMeterInfoResult, error)
參數
projectName: project name
topicName: topic name
shardId: The id of the shard.
return
type GetMeterInfoResult struct {
ActiveTime int64 `json:"ActiveTime"`
Storage int64 `json:"Storage"`
}
error- error
ResourceNotFoundError
AuthorizationFailedError
DatahubClientError
InvalidParameterError
示例
func meter(dh datahub.DataHub, projectName, topicName string) {
shardId := "0"
gmi, err := dh.GetMeterInfo(projectName, topicName, shardId)
if err != nil {
fmt.Println("get meter information failed")
return
}
fmt.Println("get meter information successful")
fmt.Println(gmi)
}
connector操作
DataHub Connector是把DataHub服務中的流式數據同步到其他云產品中的功能,目前支持將Topic中的數據實時/準實時同步到MaxCompute(原ODPS)、OSS(Object Storage Service,阿里云對象存儲服務)、ES(Elasticsearch)、ADS(AnalyticDB for MySQL,分析型數據庫MySQL版)、MYSQL、FC(Function Compute、函數計算)、OTS(Open Table Store、表格存儲)、DataHub中。用戶只需要向DataHub中寫入一次數據,并在DataHub服務中配置好同步功能,便可以在其他云產品中使用這份數據。這里所有的示例代碼均以MaxCompute為例。MaxCompute Config的配置信息可以參考同步數據到MaxCompute。datahub2.14.0版本之后將接口參數connectorType修改connectorId(createConnector除外),不過接口依舊兼容2.14.0之前版本,只需將參數connectorType轉為string作為參數即可。
使用示例
1. gcr, err := dh.GetConnector(projectName, topicName, string(datahub.SinkOdps))
創建connector
CreateConnector(projectName, topicName string, cType ConnectorType, columnFields []string, config interface{}) (CreateConnectorResult, error) CreateConnectorWithStartTime(projectName, topicName string, cType ConnectorType, columnFields []string, sinkStartTime int64, config interface{}) (CreateConnectorResult, error)
參數
projectName: project name
topicName: topic name
cType: The type of connector which you want create.
columnFields: Which fields you want synchronize.
sinkStartTime: Start time for this job,
config: Detail config of specified connector type.
return
type CreateConnectorResult struct {
ConnectorId string `json:"ConnectorId"`
}
error
ResourceNotFoundError
ResourceExistError
AuthorizationFailedError
DatahubClientError
InvalidParameterError
示例
func createConnector(dh datahub.DataHub, projectName, topicName string) {
odpsEndpoint := ""
odpsProject := "datahub_test"
odpsTable := "datahub_go_example"
odpsAccessId := ""
odpsAccessKey := "="
odpsTimeRange := 60
odpsPartitionMode := datahub.SystemTimeMode
connectorType := datahub.SinkOdps
odpsPartitionConfig := datahub.NewPartitionConfig()
odpsPartitionConfig.AddConfig("ds", "%Y%m%d")
odpsPartitionConfig.AddConfig("hh", "%H")
odpsPartitionConfig.AddConfig("mm", "%M")
sinkOdpsConfig := datahub.SinkOdpsConfig{
Endpoint: odpsEndpoint,
Project: odpsProject,
Table: odpsTable,
AccessId: odpsAccessId,
AccessKey: odpsAccessKey,
TimeRange: odpsTimeRange,
PartitionMode: odpsPartitionMode,
PartitionConfig: *odpsPartitionConfig,
}
fileds := []string{"field1", "field2"}
if err := dh.CreateConnector(projectName, topicName, connectorType, fileds, *sinkOdpsConfig); err != nil {
fmt.Println("create odps connector failed")
fmt.Println(err)
return
}
fmt.Println("create odps connector successful")
}
列出connector
ListConnector(projectName, topicName string) (*ListConnectorResult, error)
參數
projectName: project name
topicName: topic name
return
type ListConnectorResult struct {
ConnectorIds []string `json:"Connectors"`
}
error
ResourceNotFoundError
AuthorizationFailedError
DatahubClientError
InvalidParameterError
示例
func listConnector(dh datahub.DataHub, projectName, topicName string) {
lc, err := dh.ListConnector(projectName, topicName)
if err != nil {
fmt.Println("get connector list failed")
fmt.Println(err)
return
}
fmt.Println("get connector list successful")
fmt.Println(lc)
}
查詢connector
GetConnector(projectName, topicName, connectorId string) (*GetConnectorResult, error)
參數
projectName: project name
topicName: topic name
connectorId: The id of the connector
return
type GetConnectorResult struct {
CreateTime int64 `json:"CreateTime"`
LastModifyTime int64 `json:"LastModifyTime"`
ConnectorId string `json:"ConnectorId"`
ClusterAddress string `json:"ClusterAddress"`
Type ConnectorType `json:"Type"`
State ConnectorState `json:"State"`
ColumnFields []string `json:"ColumnFields"`
ExtraConfig map[string]string `json:"ExtraInfo"`
Creator string `json:"Creator"`
Owner string `json:"Owner"`
Config interface{} `json:"Config"`
}
error
AuthorizationFailedError
DatahubClientError
InvalidParameterError
示例
func getConnector(dh datahub.DataHub, projectName, topicName, connectorId string) {
gcr, err := dh.GetConnector(projectName, topicName, connectorId)
if err != nil {
fmt.Println("get odps conector failed")
fmt.Println(err)
return
}
fmt.Println("get odps conector successful")
fmt.Println(*gcr)
}
更新connector配置
UpdateConnector(projectName, topicName, connectorId string, config interface{}) error
參數
projectName: project name
topicName: topic name
connectorId: The id of the connector.
config: Detail config of specified connector type.
return
error
ResourceNotFoundError
AuthorizationFailedError
DatahubClientError
InvalidParameterError
示例
func updateConnector(dh datahub.DataHub, projectName, topicName, connectorId string) {
gc, err := dh.GetConnector(projectName, topicName, connectorId)
if err != nil {
fmt.Println("get odps connector failed")
fmt.Println(err)
return
}
config, ok := gc.Config.(datahub.SinkOdpsConfig)
if !ok {
fmt.Println("convert config to SinkOdpsConfig failed")
return
}
// modify the config
config.TimeRange = 200
if err := dh.UpdateConnector(projectName, topicName, connectorId, config); err != nil {
fmt.Println("update odps config failed")
fmt.Println(err)
return
}
fmt.Println("update odps config successful")
}
刪除connector
DeleteConnector(projectName, topicName, connectorId string) error
參數
projectName: project name
topicName: topic name
connectorId: The id of the connector.
return
error
ResourceNotFoundError
AuthorizationFailedError
DatahubClientError
InvalidParameterError
示例
func deleteConnector(dh datahub.DataHub, projectName, topicName, connectorId string) {
if err := dh.DeleteConnector(projectName, topicName, connectorId); err != nil {
fmt.Println("delete odps connector failed")
fmt.Println(err)
return
}
fmt.Println("delete odps connector successful")
}
查詢connector shard狀態
可以獲取某個topic下所有shard的狀態信息,也可以獲取topic下指定shard的狀態信息。
GetConnectorShardStatus(projectName, topicName, connectorId string) (GetConnectorShardStatusResult, error) GetConnectorShardStatusByShard(projectName, topicName, connectorId, shardId string) (ConnectorShardStatusEntry, error)
參數
projectName: project name
topicName: topic name
shardId: The id of the shard.
connectorId: The id of the connector.
return
// getConnectorShardStatus
type GetConnectorShardStatusResult struct {
ShardStatus map[string]ConnectorShardStatusEntry `json:"ShardStatusInfos"`
}
// GetConnectorShardStatusByShard
type ConnectorShardStatusEntry struct {
StartSequence int64 `json:"StartSequence"`
EndSequence int64 `json:"EndSequence"`
CurrentSequence int64 `json:"CurrentSequence"`
CurrentTimestamp int64 `json:"CurrentTimestamp"`
UpdateTime int64 `json:"UpdateTime"`
State ConnectorShardState `json:"State"`
LastErrorMessage string `json:"LastErrorMessage"`
DiscardCount int64 `json:"DiscardCount"`
DoneTime int64 `json:"DoneTime"`
WorkerAddress string `json:"WorkerAddress"`
}
error
ResourceNotFoundError
AuthorizationFailedError
DatahubClientError
InvalidParameterError
示例
func getConnectorShardStatus(dh datahub.DataHub, projectName, topicName, connectorId string) {
gcs, err := dh.GetConnectorShardStatus(projectName, topicName, connectorId)
if err != nil {
fmt.Println("get connector shard status failed")
fmt.Println(err)
return
}
fmt.Println("get connector shard status successful")
for shard, status := range gcs.ShardStatus {
fmt.Println(shard, status.State)
}
shardId := "0"
gc, err := dh.GetConnectorShardStatusByShard(projectName, topicName, connectorId, shardId)
if err != nil {
fmt.Println("get connector shard status failed")
fmt.Println(err)
return
}
fmt.Println("get connector shard status successful")
fmt.Println(*gc)
}
重啟connector shard
可以重啟topic下的所有shard,也可以重啟topic下的指定shard。
ReloadConnector(projectName, topicName, connectorId string) error ReloadConnectorByShard(projectName, topicName, connectorId, shardId string) error
參數
projectName: project name
topicName: topic name
connectorId: The id of the connector.
shardId: The id of the shard.
return
error
ResourceNotFoundError
AuthorizationFailedError
DatahubClientError
InvalidParameterError
示例
func reloadConnector(dh datahub.DataHub, projectName, topicName, connectorId string) {
if err := dh.ReloadConnector(projectName, topicName, connectorId); err != nil {
fmt.Println("reload connector shard failed")
fmt.Println(err)
return
}
fmt.Println("reload connector shard successful")
shardId := "2"
if err := dh.ReloadConnectorByShard(projectName, topicName, connectorId, shardId); err != nil {
fmt.Println("reload connector shard failed")
fmt.Println(err)
return
}
fmt.Println("reload connector shard successful")
}
添加新field
可以給connector添加指定列,但要求datahub的topic中和odps都存在對應的列。
AppendConnectorField(projectName, topicName, connectorId, fieldName string) error
參數
projectName: project name
topicName: topic name
connectorId: The id of the connector.
fieldName: The name of the field.
return
error
ResourceNotFoundError
InvalidParameterError
示例
func appendConnectorField(dh datahub.DataHub, projectName, topicName, connectorId string) {
if err := dh.AppendConnectorField(projectName, topicName, connectorId, "field2"); err != nil {
fmt.Println("append filed failed")
fmt.Println(err)
return
}
fmt.Println("append filed successful")
}
更新connector狀態
connector狀態分兩種,CONNECTOR_PAUSED和CONNECTOR_RUNNING,分別表示停止和運行中。
UpdateConnectorState(projectName, topicName, connectorId string, state ConnectorState) error
參數
projectName: project name
topicName: topic name
connectorId: The id of the connector.
state:The state of the connector which you want update.
return
error
ResourceNotFoundError
AuthorizationFailedError
DatahubClientError
InvalidParameterError
示例
func updateConnectorState(dh datahub.DataHub, projectName, topicName, connectorId string) {
if err := dh.UpdateConnectorState(projectName, topicName, connectorId, datahub.ConnectorStopped); err != nil {
fmt.Println("update connector state failed")
fmt.Println(err)
return
}
fmt.Println("update connector state successful")
if err := dh.UpdateConnectorState(projectName, topicName, connectorId, datahub.ConnectorRunning); err != nil {
fmt.Println("update connector state failed")
fmt.Println(err)
return
}
fmt.Println("update connector state successful")
}
更新connector點位信息
UpdateConnectorOffset(projectName, topicName, connectorId, shardId string, offset ConnectorOffset) error
參數
projectName: project name
topicName: topic name
shardId: The id of the shard.
connectorId: The id of the connector.
offset: The connector offset.
return
error
ResourceNotFoundError
AuthorizationFailedError
DatahubClientError
InvalidParameterError
示例
func updateConnectorOffset(dh datahub.DataHub, projectName, topicName, connectorId string) {
shardId := "10"
offset := datahub.ConnectorOffset{
Timestamp: 1565864139000,
Sequence: 104,
}
dh.UpdateConnectorState(projectName, topicName, connectorId, datahub.ConnectorStopped)
defer dh.UpdateConnectorState(projectName, topicName, connectorId, datahub.ConnectorRunning)
if err := dh.UpdateConnectorOffset(projectName, topicName, connectorId, shardId, offset); err != nil {
fmt.Println("update connector offset failed")
fmt.Println(err)
return
}
fmt.Println("update connector offset successful")
}
查詢connector完成時間
只有MaxCompute可以查詢完成時間。
GetConnectorDoneTime(projectName, topicName, connectorId string) (*GetConnectorDoneTimeResult, error)
參數
projectName: project name
topicName: topic name
connectorId: The id of the connector.
return
1. type GetConnectorDoneTimeResult struct {
2. DoneTime int64 `json:"DoneTime"`
3. }
error
ResourceNotFoundError
AuthorizationFailedError
DatahubClientError
InvalidParameterError
示例
func doneTime(dh datahub.DataHub, projectName, topicName, connectorId string) {
gcd, err := dh.GetConnectorDoneTime(projectName, topicName, connectorId)
if err != nil {
fmt.Println("get connector done time failed")
fmt.Println(err)
return
}
fmt.Println("get connector done time successful")
fmt.Println(gcd.DoneTime)
}
subscription操作
訂閱服務提供了服務端保存用戶消費點位的功能,只需要通過簡單配置和處理,就可以實現高可用的點位存儲服務。
創建subscription
CreateSubscription(projectName, topicName, comment string) error
參數
projectName: project name
topicName: topic name
comment: subscription comment
return
error
ResourceExistError
AuthorizationFailedError
DatahubClientError
InvalidParameterError
示例
func createSubscription() {
csr, err := dh.CreateSubscription(projectName, topicName, "sub comment")
if err != nil {
fmt.Println("create subscription failed")
fmt.Println(err)
return
}
fmt.Println("create subscription successful")
fmt.Println(*csr)
}
刪除subscription
DeleteSubscription(projectName, topicName, subId string) error
參數
projectName: project name
topicName: topic name
subId: The id of the subscription.
return
error
ResourceNotFoundError
AuthorizationFailedError
DatahubClientError
InvalidParameterError
示例
func delSubscription(dh datahub.DataHub, projectName, topicName string) {
subId := "1565577384801DCN0O"
if err := dh.DeleteSubscription(projectName, topicName, subId); err != nil {
fmt.Println("delete subscription failed")
return
}
fmt.Println("delete subscription successful")
}
查詢subscription
GetSubscription(projectName, topicName, subId string) (*GetSubscriptionResult, error)
參數
projectName: project name
topicName: topic name
subId: The id of the subscription.
return
type GetSubscriptionResult struct {
SubscriptionEntry
}
error
AuthorizationFailedError
DatahubClientError
InvalidParameterError
示例
func getSubscription(dh datahub.DataHub, projectName, topicName string) {
subId := "1565577384801DCN0O"
gs, err := dh.GetSubscription(projectName, topicName, subId)
if err != nil {
fmt.Println("get subscription failed")
fmt.Println(err)
return
}
fmt.Println("get subscription successful")
fmt.Println(gs)
}
列出subscription
通過pageIndex和pageSize獲取指定范圍的subscription信息,如pageIndex=1, pageSize=10,獲取1-10個subscription; pageIndex=2, pageSize=5則獲取6-10的subscription。
ListSubscription(projectName, topicName string, pageIndex, pageSize int) (*ListSubscriptionResult, error)
參數
projectName: project name
topicName: topic name
pageIndex: The page index used to list subscriptions. - pageSize: The page size used to list subscriptions.
return
type ListSubscriptionResult struct {
TotalCount int64 `json:"TotalCount"`
Subscriptions []SubscriptionEntry `json:"Subscriptions"`
}
error
AuthorizationFailedError
DatahubClientError
InvalidParameterError
示例
func listSubscription(dh datahub.DataHub, projectName, topicName string) {
pageIndex := 1
pageSize := 5
ls, err := dh.ListSubscription(projectName, topicName, pageIndex, pageSize)
if err != nil {
fmt.Println("get subscription list failed")
fmt.Println(err)
return
}
fmt.Println("get subscription list successful")
for _, sub := range ls.Subscriptions {
fmt.Println(sub)
}
}
更新subscription
目前僅支持更新subscription comment
UpdateSubscription(projectName, topicName, subId, comment string) error
參數
projectName: project name
topicName: topic name
subId: The id of the subscription.
comment: subcription comment
return
error
ResourceNotFoundError
AuthorizationFailedError
DatahubClientError
InvalidParameterError
示例
func updateSubscription(dh datahub.DataHub, projectName, topicName string) {
subId := "1565580329258VXSY8"
if err := dh.UpdateSubscription(projectName, topicName, subId, "new sub comment"); err != nil {
fmt.Println("update subscription comment failed")
fmt.Println(err)
return
}
fmt.Println("update subscription comment successful")
}
更新subscription狀態
subscription 有兩種狀態,SUB_OFFLINE 和 SUB_ONLINE,分別表示離線和在線。
UpdateSubscriptionState(projectName, topicName, subId string, state SubscriptionState) error
參數
projectName: project name
topicName: topic name
subId: The id of the subscription.
state: The state you want to change.
return
error
ResourceNotFoundError
AuthorizationFailedError
DatahubClientError
InvalidParameterError
示例
func updateSubState(dh datahub.DataHub, projectName, topicName string) {
subId := "1565580329258VXSY8"
if err := dh.UpdateSubscriptionState(projectName, topicName, subId, datahub.SUB_OFFLINE); err != nil {
fmt.Println("update subscription state failed")
fmt.Println(err)
return
}
fmt.Println("update subscription state successful")
}
offset操作
一個subscription創建后,初始狀態是未消費的,要使用subscription服務提供的點位存儲功能,需要進行一些offset操作。
初始化offset
初始化subscrition是使用subscription進行點位操作的第一步。一個subscription不支持并行操作,如果需要在多個進程中消費同一份數據,則需要使用不同的subscription。調用OpenSubscriptionSession之后,獲取的點位信息中,SessionId會+1,并且之前的session失效,無法進行更新offset操作。
OpenSubscriptionSession(projectName, topicName, subId string, shardIds []string) (*OpenSubscriptionSessionResult, error)
參數
projectName: project name
topicName: topic name
subId: The id of the subscription.
shardIds: The id list of the shards.
return
type OpenSubscriptionSessionResult struct {
Offsets map[string]SubscriptionOffset `json:"Offsets"`
}
// SubscriptionOffset
type SubscriptionOffset struct {
Timestamp int64 `json:"Timestamp"`
Sequence int64 `json:"Sequence"`
VersionId int64 `json:"Version"`
SessionId *int64 `json:"SessionId"`
}
error
ResourceNotFoundError
AuthorizationFailedError
DatahubClientError
InvalidParameterError
示例
func openOffset(dh datahub.DataHub, projectName, topicName string) {
subId := "1565580329258VXSY8"
shardIds := []string{"0", "1", "2"}
oss, err := dh.OpenSubscriptionSession(projectName, topicName, subId, shardIds)
if err != nil {
fmt.Println("open session failed")
fmt.Println(err)
return
}
fmt.Println("open session successful")
fmt.Println(oss)
}
獲取offset
獲取subscription的當前點位信息。與OpenSubscriptionSession不同的是,GetSubscriptionOffse獲取的點位信息中SubscriptionOffset的SessionId為nil,是無法進行commit點位操作的,因此GetSubscriptionOffset一般用來查看點位信息。
GetSubscriptionOffset(projectName, topicName, subId string, shardIds []string) (*GetSubscriptionOffsetResult, error)
參數
projectName: project name
topicName: topic name
subId: The id of the subscription.
shardIds: The id list of the shards.
return
type OpenSubscriptionSessionResult struct {
Offsets map[string]SubscriptionOffset `json:"Offsets"`
}
// SubscriptionOffset
type SubscriptionOffset struct {
Timestamp int64 `json:"Timestamp"`
Sequence int64 `json:"Sequence"`
VersionId int64 `json:"Version"`
SessionId *int64 `json:"SessionId"`
}
error
ResourceNotFoundError
AuthorizationFailedError
DatahubClientError
InvalidParameterError
示例
func getOffset(dh datahub.DataHub, projectName, topicName string) {
subId := "1565580329258VXSY8"
shardIds := []string{"0", "1", "2"}
gss, err := dh.GetSubscriptionOffset(projectName, topicName, subId, shardIds)
if err != nil {
fmt.Println("get session failed")
fmt.Println(err)
return
}
fmt.Println("get session successful")
fmt.Println(gss)
}
更新offset
更新點位時會驗證versionId和sessionId,必須與當前session一致才會更新成功。更新點位時,需要同時設置Timestamp和Sequence,才會更新為有效點位,如果兩者不對應,則會更新點位到Timestamp對應的點位,建議更新點位時,選擇record中對應的Timestamp和Sequence進行點位更新。
CommitSubscriptionOffset(projectName, topicName, subId string, offsets map[string]SubscriptionOffset) error
參數
projectName: project name
topicName: topic name
subId: The id of the subscription.
offsets: The offset map of shards.
return
error
ResourceNotFoundError
AuthorizationFailedError
DatahubClientError
InvalidParameterError
示例
func updateOffset() {
shardIds := []string{"0", "1", "2"}
oss, err := dh.OpenSubscriptionSession(projectName, topicName, subId, shardIds)
if err != nil {
fmt.Println("open session failed")
fmt.Println(err)
}
fmt.Println("open session successful")
fmt.Println(oss)
offset := oss.Offsets["0"]
// set offset message
offset.Sequence = 900
offset.Timestamp = 1565593166690
offsetMap := map[string]datahub.SubscriptionOffset{
"0": offset,
}
if err := dh.CommitSubscriptionOffset(projectName, topicName, subId, offsetMap); err != nil {
if _, ok := err.(*datahub.SubscriptionOfflineError); ok {
fmt.Println("the subscription has offline")
} else if _, ok := err.(*datahub.SubscriptionSessionInvalidError); ok {
fmt.Println("the subscription is open elsewhere")
} else if _, ok := err.(*datahub.SubscriptionOffsetResetError); ok {
fmt.Println("the subscription is reset elsewhere")
} else {
fmt.Println(err)
}
fmt.Println("update offset failed")
return
}
fmt.Println("update offset successful")
}
重置offset
重置offset可以將offset重置到某個時間點上,重置之后,并且獲取的offset信息中,VersionId會+1,之前的session失效,無法進行更新點位操作。
ResetSubscriptionOffset(projectName, topicName, subId string, offsets map[string]SubscriptionOffset) error
參數
projectName: project name
topicName: topic name
subId: The id of the subscription.
offsets: The offset map of shards.
return
error
ResourceNotFoundError
AuthorizationFailedError
DatahubClientError
InvalidParameterError
示例
func resetOffset(dh datahub.DataHub, projectName, topicName string) {
subId := "1565580329258VXSY8"
offset := datahub.SubscriptionOffset{
Timestamp: 1565593166690,
}
offsetMap := map[string]datahub.SubscriptionOffset{
"1": offset,
}
if err := dh.ResetSubscriptionOffset(projectName, topicName, subId, offsetMap); err != nil {
fmt.Println("reset offset failed")
fmt.Println(err)
return
}
fmt.Println("reset offset successful")
}
error類型
GO SDK對datahub的錯誤類型進行了整理,用戶可以使用類型斷言進行錯誤類型的判斷,然后根據錯誤的類型進行響應的處理。其中錯誤類型中,DatahubClientError、LimitExceededError、ServiceTemporaryUnavailableError 屬于可重試錯誤,除此之外,其余error屬于不可重試錯誤,而DatahubClientError中包含部分可重試錯誤,例如server busy,server unavailable等,因此建議遇到可重試error時,可以在代碼邏輯中添加重試邏輯,但應嚴格限制重試次數。
類名 | 錯誤碼 | 描述 |
InvalidParameterError | InvalidParameter, InvalidCursor | 非法參數 |
ResourceNotFoundError | ResourceNotFound, NoSuchProject, NoSuchTopic, NoSuchShard, NoSuchSubscription, NoSuchConnector, NoSuchMeteringInfo | 訪問的資源不存在(注:進行Split/Merge操作后,立即發送其他請求,有可能會拋出該異常 )。 |
ResourceExistError | ResourceAlreadyExist, ProjectAlreadyExist, TopicAlreadyExist, ConnectorAlreadyExist | 資源已存在(創建時如果資源已存在,就會拋出這個異常。 |
SeekOutOfRangeError | SeekOutOfRange | getCursor時,給的sequence不在有效范圍內(通常數據已過期),或給的timestamp大于當前時間。 |
AuthorizationFailedError | Unauthorized | Authorization 簽名解析異常,檢查AK是否填寫正確。 |
NoPermissionError | NoPermission, OperationDenied | 沒有權限,通常是RAM配置不正確,或沒有正確授權子賬號。 |
NewShardSealedError | InvalidShardOperation | shard 處于CLOSED狀態可讀不可寫,繼續往CLOSED的shard 寫數據,或讀到最后一條數據后繼續讀取,會拋出該異常。 |
LimitExceededError | LimitExceeded | 接口使用超限,參考限制描述。 |
SubscriptionOfflineError | SubscriptionOffline | 訂閱處于下線狀態不可用。 |
SubscriptionSessionInvalidError | OffsetSessionChanged, OffsetSessionClosed | 訂閱會話異常,使用訂閱時會建立一個session,用于提交點位,如果有其他客戶端使用該訂閱,會得到該異常。 |
SubscriptionOffsetResetError | OffsetReseted | 訂閱點位被重置。 |
MalformedRecordError | MalformedRecord、ShardNotReady | 非法的 Record 格式,可能的情況有:schema 不正確、包含非utf-8字符、客戶端使用pb而服務端不支持等。 |
ServiceTemporaryUnavailableError | - | 一般是網絡問題,例如連接異常斷開,通常重試即可。 |
DatahubClientError | 其他所有,并且是所有異常的基類 | 如排除以上異常情況,通常重試即可,但應限制重試次數。 |
DatahubClientError
datahub的基礎錯誤類型,所有的error都繼承了這個錯誤類型。datahub的錯誤類型除了已經定義的錯誤類型,其余錯誤均屬于DatahubClientError,其中包括服務器busy、服務器unavailable等可重試錯誤,用戶可以在自己的代碼邏輯中添加一些重試機制。
type DatahubClientError struct {
StatusCode int `json:"StatusCode"` // Http status code
RequestId string `json:"RequestId"` // Request-id to trace the request
Code string `json:"ErrorCode"` // Datahub error code
Message string `json:"ErrorMessage"` // Error msg of the error code
}
error使用示例:
func example_error() {
accessId := ""
accessKey := ""
endpoint := ""
projectName := "datahub_go_test"
maxRetry := 3
dh := datahub.New(accessId, accessKey, endpoint)
if err := dh.CreateProject(projectName, "project comment"); err != nil {
if _, ok := err.(*datahub.InvalidParameterError); ok {
fmt.Println("invalid parameter,please check your input parameter")
} else if _, ok := err.(*datahub.ResourceExistError); ok {
fmt.Println("project already exists")
} else if _, ok := err.(*datahub.AuthorizationFailedError); ok {
fmt.Println("accessId or accessKey err,please check your accessId and accessKey")
} else if _, ok := err.(*datahub.LimitExceededError); ok {
fmt.Println("limit exceed, so retry")
for i := 0; i < maxRetry; i++ {
// wait 5 seconds
time.Sleep(5 * time.Second)
if err := dh.CreateProject(projectName, "project comment"); err != nil {
fmt.Println("create project failed")
fmt.Println(err)
} else {
fmt.Println("create project successful")
break
}
}
} else {
fmt.Println("unknown error")
fmt.Println(err)
}
} else {
fmt.Println("create project successful")
}
}