日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

Go SDK

Go SDK介紹

快速上手

DataHub相關的基本概念

詳情參見DataHub基本概念

準備工作

  • 訪問DataHub服務需要使用阿里云認證賬號,需要提供阿里云accessId及accessKey。 同時需要提供可訪問的DataHub服務地址。

  • 獲取Datahub Go SDK包

go get -u -insecure github.com/aliyun/aliyun-datahub-sdk-go/datahub
  • 初始化DataHub Client

Datahub GO SDK提供的所有API接口均由 datahub.DataHub 接口實現,所以第一步就是初始化一個DataHub對象。1). 使用默認參數創建DataHub對象

import "github.com/aliyun/aliyun-datahub-sdk-go/datahub"
accessId := ""
accessKey := ""
endpoint := ""
dh := datahub.New(accessId, accessKey, endpoint)

2). 使用自定義參數創建對象目前支持配置的參數有:

參數

參數類型

默認值

有效值

描述

UserAgent

string

-

-

用戶代理

CompressorType

CompressorType

NOCOMPRESS

NOCOMPRESS(不壓縮)、LZ4、DEFLATE、ZLIB

傳輸時支持的壓縮格式

EnableBinary

bool

true

true/false

主要在put/get record時,使用protobuf協議。DataHub版本未支持protobuf時需要手動指定enable_pb為False

HttpClient

*http.Client

datahub.DefaultHttpClient()

-

具體可參考net/http

endpoint := ""
accessId := ""
accessKey := ""
token := ""
account := datahub.NewAliyunAccount(accessId, accessKey)
// 臨時AK鑒權
// account := datahub.NewStsCredential(accessId, accessKey, token)
config := datahub.NewDefaultConfig()
config.CompressorType = datahub.DEFLATE
config.EnableBinary = true;
config.HttpClient = datahub.DefaultHttpClient()
dh := datahub.NewClientWithConfig(endpoint, config, account)
  • DataHub GO SDK 支持使用GO MOD進行包管理

require (
    github.com/aliyun/aliyun-datahub-sdk-go/datahub v0.1.4
)

點位消費示例

func OffsetConsume() {
    accessId := ""
    accessKey := ""
    endpoint := "https://dh-cn-hangzhou.aliyuncs.com"
    dh := datahub.New(accessId, accessKey, endpoint)

    projectName := ""
    topicName := ""
    subId := ""
    shardId := "0"
    shardIds := []string{"0"}

    session, err := dh.OpenSubscriptionSession(projectName, topicName, subId, shardIds)
    if err != nil {
        fmt.Println("Open subscription session failed", err)
        return
    }

    offset := session.Offsets[shardId]
    var gc *datahub.GetCursorResult = nil

    //sequence < 0說明未消費
    if offset.Sequence < 0 {
        // 獲取生命周期內第一條record的cursor
        gc, err = dh.GetCursor(projectName, topicName, shardId, datahub.OLDEST)
        if err != nil {
            fmt.Println("Get oldest cursor failed", err)
            return
        }
    } else {
        // 獲取下一條記錄的Cursor
        nextSequence := offset.Sequence + 1
        gc, err = dh.GetCursor(projectName, topicName, shardId, datahub.SEQUENCE, nextSequence)

        if err != nil {
            //按照SEQUENCE getCursor可能報SeekOutOfRange錯誤,表示當前cursor的數據已過期
            if _, ok := err.(*datahub.SeekOutOfRangeError); ok {
                fmt.Println("Get cursor by sequence success for SeekOutOfRangeError, will retry...")
                gc, err = dh.GetCursor(projectName, topicName, shardId, datahub.OLDEST)
                if err != nil {
                    fmt.Println("Get oldest cursor failed", err)
                    return
                }
            }
        }
    }

    topic, err := dh.GetTopic(projectName, topicName)
    if err != nil {
        fmt.Println("Get topic failed", err)
        return
    }

    // 讀取并保存點位,這里以讀取Tuple數據為例,并且每1000條記錄保存一次點位
    recordCount := 0
    limitNum := 100
    cursor := gc.Cursor
    for true {
        gr, err := dh.GetTupleRecords(projectName, topicName, shardId, cursor, limitNum, topic.RecordSchema)

        if err != nil {
            fmt.Println("Get records failed", err)
            break
        }
        if gr.RecordCount == 0 {
            fmt.Println("No data, sleep 5 seconds...")
            time.Sleep(time.Second * 5)
            continue
        }

        for _, record := range gr.Records {
            // 處理數據,這里只打印
            data, _ := record.(*datahub.TupleRecord)
            fmt.Println(data.Values)

            recordCount += 1
            // 每1000條數據提交一次點位信息
            if recordCount%1000 == 0 {
                fmt.Println("Commit offset", record.GetSequence())
                offset.Sequence = record.GetSequence()
                offset.Timestamp = record.GetSystemTime()

                offsetMap := map[string]datahub.SubscriptionOffset{shardId: offset}
                err := dh.CommitSubscriptionOffset(projectName, topicName, subId, offsetMap)
                if err != nil {
                    if _, ok := err.(*datahub.SubscriptionOffsetResetError); ok {
                        fmt.Println("Subscription reset, will reopen...")
                        // 點位被重置,需要重新open session
                        session, err = dh.OpenSubscriptionSession(projectName, topicName, subId, shardIds)
                        if err != nil {
                            fmt.Println("Reopen subscription session failed", err)
                            break
                        }
                        offset = session.Offsets[shardId]
                    } else if _, ok := err.(*datahub.SubscriptionOffsetResetError); ok {
                        fmt.Println("Subscription used by other one")
                        break
                    } else {
                        fmt.Println("Commit offset failed", err)
                        break
                    }
                }
                recordCount = 0
            }
        }
        cursor = gr.NextCursor
    }
}

接口示例

project 操作

項目(Project)是DataHub數據的基本組織單元,下面包含多個Topic。值得注意的是,DataHub的項目空間與MaxCompute的項目空間是相互獨立的。用戶在MaxCompute中創建的項目不能復用于DataHub,需要獨立創建。

創建Project

說明

CreateProject(projectName, comment string) error

  • 參數

    • projectName: project name

    • comment: project comment

  • return

  • error

    • ResourceExistError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • 示例

func createProjet(dh datahub.DataHub, projectName string) {
    if err := dh.CreateProject(projectName, "project comment"); err != nil {
        fmt.Println("create project failed")
        fmt.Println(err)
        return
    }
    fmt.Println("create successful")
}

刪除Project

DeleteProject接口刪除project。

說明

DeleteProject(projectName string) error

  • 參數

    • projectName: project name

  • return

  • error

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • 示例

func deleteProject(dh datahub.DataHub, projectName string) {
    if err := dh.DeleteProject("123"); err != nil {
        fmt.Println("delete project failed")
        fmt.Println(err)
        return
    }
    fmt.Println("delete project successful")
}

列出Project

ListProject 接口列出project。

說明

ListProject() (*ListProjectResult, error)

  • 參數

  • return

1. type ListProjectResult struct {
2.     ProjectNames []string `json:"ProjectNames"`
3. }
  • error

    • AuthorizationFailedError

    • DatahubClientError

  • 示例

func listProject(dh datahub.DataHub, projectName string) {
    lp, err := dh.ListProject()
    if err != nil {
        fmt.Println("get project list failed")
        fmt.Println(err)
        return
    }
    fmt.Println("get project list successful")
    for _, projectName := range lp.ProjectNames {
        fmt.Println(projectName)
    }
}

查詢Project

GetProject查詢project

說明

GetProject(projectName string) (*GetProjectResult, error)

  • 參數

    • projectName: project name

  • return

type GetProjectResult struct {
    CreateTime     int64  `json:"CreateTime"`
    LastModifyTime int64  `json:"LastModifyTime"`
    Comment        string `json"Comment"`
}
  • error

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • 示例

func getProject(dh datahub.DataHub, projectName string) {
    gp, err := dh.GetProject(projectName)
    if err != nil {
        fmt.Println("get project message failed")
        fmt.Println(err)
        return
    }
    fmt.Println("get project message successful")
    fmt.Println(*gp)
}

更新project

說明

UpdateProject(projectName, comment string) error

  • 參數

    • projectName: project name

    • comment: project comment

  • return

  • error

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • 示例

1. func updateProject(dh datahub.DataHub, projectName string) {
2.     if err := dh.UpdateProject(projectName, "new project comment"); err != nil {
3.         fmt.Println("update project comment failed")
4.         fmt.Println(err)
5.         return
6.     }
7.     fmt.Println("update project comment successful")
8. }

topic操作

Topic是 DataHub 訂閱和發布的最小單位,用戶可以用Topic來表示一類或者一種流數據。目前支持Tuple與Blob兩種類型。Tuple類型的Topic支持類似于數據庫的記錄的數據,每條記錄包含多個列。Blob類型的Topic僅支持寫入一塊二進制數據。

創建Topic

Tuple Topic

說明

CreateTupleTopic(projectName, topicName, comment string, shardCount, lifeCycle int, recordSchema *RecordSchema) error

Tuple類型Topic寫入的數據是有格式的,需要指定Record Schema,目前支持以下幾種數據類型:

類型

含義

值域

BIGINT

8字節有符號整型

-9223372036854775807 ~ 9223372036854775807

DOUBLE

8字節雙精度浮點數

-1.0 _10^308 ~ 1.0 _10^308

BOOLEAN

布爾類型

True/False或true/false或0/1

TIMESTAMP

時間戳類型

表示到微秒的時間戳類型

STRING

字符串,只支持UTF-8編碼

單個STRING列最長允許2MB

DECIMAL

s

  • 參數

    • projectName: project name

    • topicName: topic name

    • comment: topic comment

    • lifeCycle: The expire time of the data (Unit: DAY). The data written before that time is not accessible.

    • recordSchema: The records schema of this topic.

  • return

  • error

    • ResourceExistError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • 示例

func Example_CreateTupleTopic(dh datahub.DataHub, projectName, topicName string) {
    recordSchema := datahub.NewRecordSchema()
    recordSchema.AddField(datahub.Field{Name: "bigint_field", Type: datahub.BIGINT, AllowNull: true}).
        AddField(datahub.Field{Name: "timestamp_field", Type: datahub.TIMESTAMP, AllowNull: false}).
        AddField(datahub.Field{Name: "string_field", Type: datahub.STRING}).
        AddField(datahub.Field{Name: "double_field", Type: datahub.DOUBLE}).
        AddField(datahub.Field{Name: "boolean_field", Type: datahub.BOOLEAN})
    if err := dh.CreateTupleTopic(projectName, topicName, "topic comment", 5, 7, recordSchema); err != nil {
        fmt.Println("create topic failed")
        fmt.Println(err)
        return
    }
    fmt.Println("create topic successful")
}

Blob Topic

說明

CreateBlobTopic(projectName, topicName, comment string, shardCount, lifeCycle int) error

  • 參數

    • projectName: project name

    • topicName: topic name

    • comment: topic comment

    • lifeCycle: The expire time of the data (Unit: DAY). The data written before that time is not accessible.

  • return

  • error

    • ResourceExistError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • 示例

func Example_CreateBlobTopic(dh datahub.DataHub, projectName, topicName string) {
    if err := dh.CreateBlobTopic(projectName, topicName, "topic comment", 5, 7); err != nil {
        fmt.Println("create topic failed")
        fmt.Println(err)
        return
    }
    fmt.Println("create topic successful")
}

刪除Topic

說明

DeleteTopic(projectName, topicName string) error

  • 參數

    • projectName: project name

    • topicName: topic name

  • return

  • error

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • 示例

func ExampleDataHub_DeleteTopic(dh datahub.DataHub, projectName, topicName string) {
    if err := dh.DeleteTopic(projectName, topicName); err != nil {
        fmt.Println("delete failed")
        fmt.Println(err)
        return
    }
    fmt.Println("delete successful")
}

列出Topic

說明

ListTopic(projectName string) (*ListTopicResult, error)

  • 參數

    • projectName: project name

  • return

type ListTopicResult struct {
    TopicNames [] string `json:"TopicNames"`
}
  • error

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • 示例

func ExampleDataHub_ListTopic(dh datahub.DataHub, projectName, topicName string) {
    lt, err := dh.ListTopic(projectName)
    if err != nil {
        fmt.Println("get topic list failed")
        fmt.Println(err)
        return
    }
    fmt.Println("get topic list successful")
    fmt.Println(lt)
}

更新Topic

說明

UpdateTopic(projectName, topicName, comment string) error

  • 參數

    • projectName: project name

    • topicName: topic name

    • comment: topic comment

  • return

  • error

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • 示例

func ExampleDataHub_UpdateTopic(dh datahub.DataHub, projectName, topicName string) {
    if err := dh.UpdateTopic(projectName, topicName, "new topic comment"); err != nil {
        fmt.Println("update topic comment failed")
        fmt.Println(err)
        return
    }
    fmt.Println("update topic comment successful")
}

schema類型

schema是用來標明數據存儲的名稱和對應類型的,在創建tuple topic 和讀寫 record 的時候用到。因為網絡傳輸中,數據都是以字符串的形式發送,需要schema來轉換成對應的類型。schema就是一個Field對象的slice,Field包含三個參數,第一個參數是field的名稱,第二個是field的類型,第三個參數是bool值,True表示field的值允許為空, False表示field的值不能為空。

獲取schema

對于已創建的Tuple topic,可以使用get_topic接口來獲取schema信息

  • 示例

func getSchema(dh datahub.DataHub, projectName, topicName string) {
    gt, err := dh.GetTopic(projectName, "topic_test")
    if err != nil {
        fmt.Println("get topic failed")
        fmt.Println(err)
        return
    } else {
        schema := gt.RecordSchema
        fmt.Println(schema)
    }
}

定義schema

要創建新的tuple topic,需要自己定義schema,schema可以通過以下方式進行初始化。

  • 直接創建

func createSchema1() {
    fields := []datahub.Field{
        {"bigint_field", datahub.BIGINT, true},
        {"timestamp_field", datahub.TIMESTAMP, false},
        {"string_field", datahub.STRING, false},
        {"double_field", datahub.DOUBLE, false},
        {"boolean_field", datahub.BOOLEAN, true},
        {"decimal_field", datahub.DECIMAL, false},
    }
    schema := datahub.RecordSchema{
        fields,
    }
    fmt.Println(schema)
}
  • 逐個對schema進行set

func createSchema2() {
    recordSchema := datahub.NewRecordSchema()
    recordSchema.AddField(datahub.Field{Name: "bigint_field", Type: datahub.BIGINT, AllowNull: true}).
        AddField(datahub.Field{Name: "timestamp_field", Type: datahub.TIMESTAMP, AllowNull: false}).
        AddField(datahub.Field{Name: "string_field", Type: datahub.STRING}).
        AddField(datahub.Field{Name: "double_field", Type: datahub.DOUBLE}).
        AddField(datahub.Field{Name: "boolean_field", Type: datahub.BOOLEAN}).
        AddField(datahub.Field{Name: "decimal_field", Type: datahub.DECIMAL})
}
  • 通過JSON字符串定義schema

func createSchema3() {
    str := ""
    schema, err := datahub.NewRecordSchemaFromJson(str)
    if err != nil {
        fmt.Println("create recordSchema failed")
        fmt.Println(err)
        return
    }
    fmt.Println("create recordSchema successful")
    fmt.Println(schema)
}

JSON字符串的格式如下:“{“fields”:[{“type”:”BIGINT”,”name”:”a”},{“type”:”STRING”,”name”:”b”}]}”

shard 操作

Shard表示對一個Topic進行數據傳輸的并發通道,每個Shard會有對應的ID。每個Shard會有多種狀態:Opening - 啟動中,Active - 啟動完成可服務。每個Shard啟用以后會占用一定的服務端資源,建議按需申請Shard數量。shard可以進行合并和分裂,當數據量增大時,可以采用分裂shard來增加數據通道,提高數據寫入的并發量,當數據量減小時,應該合并shard減少服務器資源浪費。例如淘寶在雙11期間,數據量驟增,這個時候每個shard的寫入壓力過大,便可以增加shard提高寫入效率,在雙11過后,數據量明顯降低,則需要合并shard。

列出shard

說明

ListShard(projectName, topicName string) (*ListShardResult, error)

  • 參數

    • projectName: project name

    • topicName: topic name

  • return

1. type SplitShardResult struct {
2.     NewShards []ShardEntry `json:"NewShards"`
3. }
  • error

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • 示例

func ExampleDataHub_ListShard() {
    ls, err := dh.ListShard(projectName, topicName)
    if err != nil {
        fmt.Println("get shard list failed")
        fmt.Println(err)
        return
    }
    fmt.Println("get shard list successful")
    for _, shard := range ls.Shards {
        fmt.Println(shard)
    }
}

分裂shard

只有處于ACTIVE狀態的shard才可以進行分裂,分裂成功后,會生成兩個新的shard,同時原shard狀態會變為CLOSED。分裂shard時,需要指定splitKey,可以采用系調用第一個method,系統將會自動生成spiltKey,如果有特殊需求,則可以采用第二個method自己指定spiltKey。spiltKey規則可以參考基本概念中的Shard Hash Key Range。

說明

SplitShard(projectName, topicName, shardId string) (SplitShardResult, error) SplitShardWithSplitKey(projectName, topicName, shardId, splitKey string) (SplitShardResult, error)

  • 參數

    • projectName: project name

    • topicName: topic name

    • shardId: The shard which to split

    • splitKey: The split key which is used to split shard

  • return

type SplitShardResult struct {
    NewShards []ShardEntry `json:"NewShards"`
}
  • error

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • 示例

func ExampleDataHub_SplitShard() {
    // the shardId of you want to split
    shardId := "0"
    ss, err := dh.SplitShard(projectName, topicName, shardId)
    if err != nil {
        fmt.Println("split shard failed")
        fmt.Println(err)
        return
    }
    fmt.Println("split shard successful")
    fmt.Println(ss)
    // After splitting, you need to wait for all shard states to be ready
    // before you can perform related operations.
    dh.WaitAllShardsReady(projectName, topicName)
}

合并shard

合并兩個shard時,要求兩個shard必須是相鄰的,并且狀態都是ACTIVE。

說明

MergeShard(projectName, topicName, shardId, adjacentShardId string) (*MergeShardResult, error)

  • 參數

    • projectName: project name

    • topicName: topic name

    • shardId: The shard which will be merged

    • adjacentShardId: The adjacent shard of the specified shard.

  • 示例

  • return

type MergeShardResult struct {
    ShardId      string `json:"ShardId"`
    BeginHashKey string `json:"BeginHashKey"`
    EndHashKey   string `json:"EndHashKey"`
}
  • error

    • ResourceNotFoundError

    • InvalidOperationError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

    • ShardSealedError

func ExampleDataHub_MergeShard() {
    shardId := "3"
    adjacentShardId := "4"
    ms, err := dh.MergeShard(projectName, topicName, shardId, adjacentShardId)
    if err != nil {
        fmt.Println("merge shard failed")
        fmt.Println(err)
        return
    }
    fmt.Println("merge shard successful")
    fmt.Println(ms)
    // After splitting, you need to wait for all shard states to be ready
    // before you can perform related operations.
    dh.WaitAllShardsReady(projectName, topicName)
}

數據發布/訂閱

處于ACTIVE和CLOSED狀態的shard都可以進行數據訂閱,但是只有處于ACTIVE狀態的shard可以進行數據發布,向CLOSED狀態的shard發布數據會直接返回ShardSealedError錯誤,處于CLOSED狀態的shard讀取數據到末尾時也會返回ShardSealedError錯誤,表示不會有新的數據。

發布數據

向某個topic下發布數據記錄時,每條數據記錄需要指定該topic下的一個shard, 因此一般需要通過 listShard 接口查看下當前topic下的shard列表。使用PutRecords接口時注意檢查返回結果是否數據發布失敗的情況。

說明

PutRecords(projectName, topicName string, records []IRecord) (*PutRecordsResult, error) PutRecordsByShard(projectName, topicName, shardId string, records []IRecord) error

服務器2.12版本及之后版本開始支持PutRecordsByShard接口,低版本請使用PutRecords接口。

  • 參數

    • projectName: project name

    • topicName: topic name

    • shardId : id of shard

    • records: Records list to written.

  • return

type PutRecordsResult struct {
    FailedRecordCount int            `json:"FailedRecordCount"`
    FailedRecords     []FailedRecord `json:"FailedRecords"`
}
  • error

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • 示例

// put tuple data
func putTupleData() {
    topic, err := dh.GetTopic(projectName, topicName)
    if err != nil {
        fmt.Println("get topic failed")
        fmt.Println(err)
        return
    }
    fmt.Println("get topic successful")
    records := make([]datahub.IRecord, 3)
    record1 := datahub.NewTupleRecord(topic.RecordSchema, 0)
    record1.ShardId = "0"
    record1.SetValueByName("field1", "TEST1")
    record1.SetValueByName("field2", 1)
    //you can add some attributes when put record
    record1.SetAttribute("attribute", "test attribute")
    records[0] = record1
    record2 := datahub.NewTupleRecord(topic.RecordSchema, 0)
    record2.ShardId = "1"
    record2.SetValueByName("field1", datahub.String("TEST2"))
    record2.SetValueByName("field2", datahub.Bigint(2))
    records[1] = record2
    record3 := datahub.NewTupleRecord(topic.RecordSchema, 0)
    record3.ShardId = "2"
    record3.SetValueByName("field1", datahub.String("TEST3"))
    record3.SetValueByName("field2", datahub.Bigint(3))
    records[2] = record3
    maxReTry := 3
    retryNum := 0
    for retryNum < maxReTry {
        result, err := dh.PutRecords(projectName, topicName, records)
        if err != nil {
            if _, ok := err.(*datahub.LimitExceededError); ok {
                fmt.Println("maybe qps exceed limit,retry")
                retryNum++
                time.Sleep(5 * time.Second)
                continue
            } else {
                fmt.Println("put record failed")
                fmt.Println(err)
                return
            }
        }
        fmt.Printf("put successful num is %d, put records failed num is %d\n", len(records)-result.FailedRecordCount, result.FailedRecordCount)
        for _, v := range result.FailedRecords {
            fmt.Println(v)
        }
        break
    }
    if retryNum >= maxReTry {
        fmt.Printf("put records failed ")
    }
}
// put blob data
func putBlobData() {
    records := make([]datahub.IRecord, 3)
    record1 := datahub.NewBlobRecord([]byte("blob test1"), 0)
    record1.ShardId = "0"
    records[0] = record1
    record2 := datahub.NewBlobRecord([]byte("blob test2"), 0)
    record2.ShardId = "1"
    record2.SetAttribute("attribute", "test attribute")
    records[1] = record2
    record3 := datahub.NewBlobRecord([]byte("blob test3"), 0)
    record3.ShardId = "2"
    records[2] = record3
    maxReTry := 3
    retryNum := 0
    for retryNum < maxReTry {
        result, err := dh.PutRecords(projectName, blobTopicName, records)
        if err != nil {
            if _, ok := err.(*datahub.LimitExceededError); ok {
                fmt.Println("maybe qps exceed limit,retry")
                retryNum++
                time.Sleep(5 * time.Second)
                continue
            } else {
                fmt.Println("put record failed")
                fmt.Println(err)
                return
            }
        }
        fmt.Printf("put successful num is %d, put records failed num is %d\n", len(records)-result.FailedRecordCount, result.FailedRecordCount)
        for _, v := range result.FailedRecords {
            fmt.Println(v)
        }
        break
    }
    if retryNum >= maxReTry {
        fmt.Printf("put records failed ")
    }
}
// put data by shard
func putDataByShard() {
    shardId := "0"
    records := make([]datahub.IRecord, 3)
    record1 := datahub.NewBlobRecord([]byte("blob test1"), 0)
    records[0] = record1
    record2 := datahub.NewBlobRecord([]byte("blob test2"), 0)
    record2.SetAttribute("attribute", "test attribute")
    records[1] = record2
    record3 := datahub.NewBlobRecord([]byte("blob test3"), 0)
    records[2] = record3
    maxReTry := 3
    retryNum := 0
    for retryNum < maxReTry {
        if err := dh.PutRecordsByShard(projectName, blobTopicName, shardId, records); err != nil {
            if _, ok := err.(*datahub.LimitExceededError); ok {
                fmt.Println("maybe qps exceed limit,retry")
                retryNum++
                time.Sleep(5 * time.Second)
                continue
            } else {
                fmt.Println("put record failed")
                fmt.Println(err)
                return
            }
        }
    }
    if retryNum >= maxReTry {
        fmt.Printf("put records failed ")
    }else {
        fmt.Println("put record successful")
    }
}

除了數據本身以外,在進行數據發布時,還可以添加和數據相關的額外信息,例如數據采集場景等。添加方式為

record1 := datahub.NewTupleRecord(topic.RecordSchema, 0)
record1.SetAttribute("attribute","test attribute")
record2 := datahub.NewBlobRecord([]byte("blob test2"), 0)
record2.SetAttribute("attribute","test attribute")

訂閱數據

訂閱一個topic下的數據,同樣需要指定對應的shard,同時需要指定讀取游標位置,通過 getCursor 接口獲取。

說明

GetCursor(projectName, topicName, shardId string, ctype CursorType, param …int64) (*GetCursorResult, error)

  • 參數

    • projectName: project name

    • topicName: topic name

    • shardId: The id of the shard.

    • ctype: Which type used to get cursor.可以通過四種方式獲?。篛LDEST, LATEST, SEQUENCE, SYSTEM_TIME。

      • OLDEST: 表示獲取的cursor指向當前有效數據中時間最久遠的record

      • LATEST: 表示獲取的cursor指向當前最新的record

      • SEQUENCE: 表示獲取的cursor指向該序列的record

      • SYSTEM_TIME: 表示獲取的cursor指向該時間之后接收到的第一條record

    • param: Parameter used to get cursor.when use SEQUENCE and SYSTEM_TIME need to be set.

  • return

type GetCursorResult struct {
    Cursor     string `json:"Cursor"`
    RecordTime int64  `json:"RecordTime"`
    Sequence   int64  `json:"Sequence"`
}
  • error

    • ResourceNotFoundError

    • SeekOutOfRangeError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

    • ShardSealedError

  • 示例

func cursor(dh datahub.DataHub, projectName, topicName string) {
    shardId := "0"
    gr, err := dh.GetCursor(projectName, topicName, shardId, datahub.OLDEST)
    if err != nil {
        fmt.Println("get cursor failed")
        fmt.Println(err)
    }else{
        fmt.Println(gr)
    }
    gr, err = dh.GetCursor(projectName, topicName, shardId, datahub.LATEST)
    fmt.Println(err)
    fmt.Println(gr)
    var seq int64 = 10
    gr, err = dh.GetCursor(projectName, topicName, shardId, datahub.SEQUENCE, seq)
    if err != nil {
        fmt.Println("get cursor failed")
        fmt.Println(err)
    }else{
        fmt.Println(gr)
    }
}

從指定shard讀取數據,需要指定從哪個cursor開始讀,并指定讀取的上限數據條數,如果從cursor到shard結尾少于Limit條數的數據,則返回實際的條數的數據。

Tuple topic data

說明

GetTupleRecords(projectName, topicName, shardId, cursor string, limit int, recordSchema _RecordSchema) (_GetRecordsResult, error)

  • 參數

    • projectName: project name

    • topicName: topic name

    • shardId: The id of the shard.

    • cursor: The start cursor used to read data.

    • limit:Max record size to read.

    • recordSchema: RecordSchema for the topic.

  • return

type GetRecordsResult struct {
    NextCursor    string        `json:"NextCursor"`
    RecordCount   int           `json:"RecordCount"`
    StartSequence int64         `json:"StartSeq"`
    Records       []IRecord     `json:"Records"`
    RecordSchema  *RecordSchema `json:"-"`
}
  • error

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • 示例

func getTupleData() {
    shardId := "1"
    topic, err := dh.GetTopic(projectName, topicName)
    if err != nil {
        fmt.Println("get topic failed")
        return
    }
    fmt.Println("get topic successful")
    cursor, err := dh.GetCursor(projectName, topicName, shardId, datahub.OLDEST)
    if err != nil {
        fmt.Println("get cursor failed")
        fmt.Println(err)
        return
    }
    fmt.Println("get cursor successful")
    limitNum := 100
    maxReTry := 3
    retryNum := 0
    for retryNum < maxReTry {
        gr, err := dh.GetTupleRecords(projectName, topicName, shardId, cursor.Cursor, limitNum, topic.RecordSchema)
        if err != nil {
            if _, ok := err.(*datahub.LimitExceededError); ok {
                fmt.Println("maybe qps exceed limit,retry")
                retryNum++
                time.Sleep(5 * time.Second)
                continue
            } else {
                fmt.Println("get record failed")
                fmt.Println(err)
                return
            }
        }
        fmt.Println("get record successful")
        for _, record := range gr.Records {
            data, ok := record.(*datahub.TupleRecord)
            if !ok {
                fmt.Printf("record type is not TupleRecord, is %v\n", reflect.TypeOf(record))
            } else {
                fmt.Println(data.Values)
            }
        }
        break
    }
    if retryNum >= maxReTry {
        fmt.Printf("get records failed ")
    }
}

Blob topic data

說明

GetBlobRecords(projectName, topicName, shardId, cursor string, limit int) (*GetRecordsResult, error)

  • 參數

    • projectName: project name

    • topicName: topic name

    • shardId: The id of the shard.

    • cursor: The start cursor used to read data.

    • limit:Max record size to read.

  • return

type GetRecordsResult struct {
    NextCursor    string        `json:"NextCursor"`
    RecordCount   int           `json:"RecordCount"`
    StartSequence int64         `json:"StartSeq"`
    Records       []IRecord     `json:"Records"`
    RecordSchema  *RecordSchema `json:"-"`
}
  • error

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • 示例

func getBlobData() {
    shardId := "1"
    cursor, err := dh.GetCursor(projectName, blobTopicName, shardId, datahub.OLDEST)
    if err != nil {
        fmt.Println("get cursor failed")
        fmt.Println(err)
        return
    }
    fmt.Println("get cursor successful")
    limitNum := 100
    maxReTry := 3
    retryNum := 0
    for retryNum < maxReTry {
        gr, err := dh.GetBlobRecords(projectName, blobTopicName, shardId, cursor.Cursor, limitNum)
        if err != nil {
            if _, ok := err.(*datahub.LimitExceededError); ok {
                fmt.Println("maybe qps exceed limit,retry")
                retryNum++
                time.Sleep(5 * time.Second)
                continue
            } else {
                fmt.Println("get record failed")
                fmt.Println(err)
                return
            }
        }
        fmt.Println("get record successful")
        for _, record := range gr.Records {
            data, ok := record.(*datahub.BlobRecord)
            if !ok {
                fmt.Printf("record type is not TupleRecord, is %v\n", reflect.TypeOf(record))
            } else {
                fmt.Println(data.StoreData)
            }
        }
        break
    }
    if retryNum >= maxReTry {
        fmt.Printf("get records failed ")
    }
}

meter操作

metering info是對shard的資源占用情況的統計信息,一小時更新一次。

說明

GetMeterInfo(projectName, topicName, shardId string) (*GetMeterInfoResult, error)

  • 參數

    • projectName: project name

    • topicName: topic name

    • shardId: The id of the shard.

  • return

type GetMeterInfoResult struct {
    ActiveTime int64 `json:"ActiveTime"`
    Storage    int64 `json:"Storage"`
}
  • error- error

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • 示例

func meter(dh datahub.DataHub, projectName, topicName string) {
    shardId := "0"
    gmi, err := dh.GetMeterInfo(projectName, topicName, shardId)
    if err != nil {
        fmt.Println("get meter information failed")
        return
    }
    fmt.Println("get meter information successful")
    fmt.Println(gmi)
}

connector操作

DataHub Connector是把DataHub服務中的流式數據同步到其他云產品中的功能,目前支持將Topic中的數據實時/準實時同步到MaxCompute(原ODPS)、OSS(Object Storage Service,阿里云對象存儲服務)、ES(Elasticsearch)、ADS(AnalyticDB for MySQL,分析型數據庫MySQL版)、MYSQL、FC(Function Compute、函數計算)、OTS(Open Table Store、表格存儲)、DataHub中。用戶只需要向DataHub中寫入一次數據,并在DataHub服務中配置好同步功能,便可以在其他云產品中使用這份數據。這里所有的示例代碼均以MaxCompute為例。MaxCompute Config的配置信息可以參考同步數據到MaxCompute。datahub2.14.0版本之后將接口參數connectorType修改connectorId(createConnector除外),不過接口依舊兼容2.14.0之前版本,只需將參數connectorType轉為string作為參數即可。

  • 使用示例

1. gcr, err := dh.GetConnector(projectName, topicName, string(datahub.SinkOdps))

創建connector

說明

CreateConnector(projectName, topicName string, cType ConnectorType, columnFields []string, config interface{}) (CreateConnectorResult, error) CreateConnectorWithStartTime(projectName, topicName string, cType ConnectorType, columnFields []string, sinkStartTime int64, config interface{}) (CreateConnectorResult, error)

  • 參數

    • projectName: project name

    • topicName: topic name

    • cType: The type of connector which you want create.

    • columnFields: Which fields you want synchronize.

    • sinkStartTime: Start time for this job,

    • config: Detail config of specified connector type.

  • return

type CreateConnectorResult struct {
    ConnectorId string `json:"ConnectorId"`
 }
  • error

    • ResourceNotFoundError

    • ResourceExistError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • 示例

func createConnector(dh datahub.DataHub, projectName, topicName string) {
    odpsEndpoint := ""
    odpsProject := "datahub_test"
    odpsTable := "datahub_go_example"
    odpsAccessId := ""
    odpsAccessKey := "="
    odpsTimeRange := 60
    odpsPartitionMode := datahub.SystemTimeMode
    connectorType := datahub.SinkOdps
    odpsPartitionConfig := datahub.NewPartitionConfig()
    odpsPartitionConfig.AddConfig("ds", "%Y%m%d")
    odpsPartitionConfig.AddConfig("hh", "%H")
    odpsPartitionConfig.AddConfig("mm", "%M")
    sinkOdpsConfig := datahub.SinkOdpsConfig{
        Endpoint:        odpsEndpoint,
        Project:         odpsProject,
        Table:           odpsTable,
        AccessId:        odpsAccessId,
        AccessKey:       odpsAccessKey,
        TimeRange:       odpsTimeRange,
        PartitionMode:   odpsPartitionMode,
        PartitionConfig: *odpsPartitionConfig,
    }
    fileds := []string{"field1", "field2"}
    if err := dh.CreateConnector(projectName, topicName, connectorType, fileds, *sinkOdpsConfig); err != nil {
        fmt.Println("create odps connector failed")
        fmt.Println(err)
        return
    }
    fmt.Println("create odps connector successful")
}

列出connector

說明

ListConnector(projectName, topicName string) (*ListConnectorResult, error)

  • 參數

    • projectName: project name

    • topicName: topic name

  • return

type ListConnectorResult struct {
    ConnectorIds []string `json:"Connectors"`
}
  • error

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • 示例

func listConnector(dh datahub.DataHub, projectName, topicName string) {
    lc, err := dh.ListConnector(projectName, topicName)
    if err != nil {
        fmt.Println("get connector list failed")
        fmt.Println(err)
        return
    }
    fmt.Println("get connector list successful")
    fmt.Println(lc)
}

查詢connector

說明

GetConnector(projectName, topicName, connectorId string) (*GetConnectorResult, error)

  • 參數

    • projectName: project name

    • topicName: topic name

    • connectorId: The id of the connector

  • return

type GetConnectorResult struct {
    CreateTime     int64             `json:"CreateTime"`
    LastModifyTime int64             `json:"LastModifyTime"`
    ConnectorId    string            `json:"ConnectorId"`
    ClusterAddress string            `json:"ClusterAddress"`
    Type           ConnectorType     `json:"Type"`
    State          ConnectorState    `json:"State"`
    ColumnFields   []string          `json:"ColumnFields"`
    ExtraConfig    map[string]string `json:"ExtraInfo"`
    Creator        string            `json:"Creator"`
    Owner          string            `json:"Owner"`
    Config         interface{}       `json:"Config"`
}
  • error

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • 示例

func getConnector(dh datahub.DataHub, projectName, topicName, connectorId string) {
    gcr, err := dh.GetConnector(projectName, topicName, connectorId)
    if err != nil {
        fmt.Println("get odps conector failed")
        fmt.Println(err)
        return
    }
    fmt.Println("get odps conector successful")
    fmt.Println(*gcr)
}

更新connector配置

說明

UpdateConnector(projectName, topicName, connectorId string, config interface{}) error

  • 參數

    • projectName: project name

    • topicName: topic name

    • connectorId: The id of the connector.

    • config: Detail config of specified connector type.

  • return

  • error

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • 示例

func updateConnector(dh datahub.DataHub, projectName, topicName, connectorId string) {
    gc, err := dh.GetConnector(projectName, topicName, connectorId)
    if err != nil {
        fmt.Println("get odps connector failed")
        fmt.Println(err)
        return
    }
    config, ok := gc.Config.(datahub.SinkOdpsConfig)
    if !ok {
        fmt.Println("convert config to SinkOdpsConfig failed")
        return
    }
    // modify the config
    config.TimeRange = 200
    if err := dh.UpdateConnector(projectName, topicName, connectorId, config); err != nil {
        fmt.Println("update odps config failed")
        fmt.Println(err)
        return
    }
    fmt.Println("update odps config successful")
}

刪除connector

說明

DeleteConnector(projectName, topicName, connectorId string) error

  • 參數

    • projectName: project name

    • topicName: topic name

    • connectorId: The id of the connector.

  • return

  • error

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • 示例

 func deleteConnector(dh datahub.DataHub, projectName, topicName, connectorId string) {
     if err := dh.DeleteConnector(projectName, topicName, connectorId); err != nil {
         fmt.Println("delete odps connector failed")
         fmt.Println(err)
         return
     }
    fmt.Println("delete odps connector successful")
 }

查詢connector shard狀態

可以獲取某個topic下所有shard的狀態信息,也可以獲取topic下指定shard的狀態信息。

說明

GetConnectorShardStatus(projectName, topicName, connectorId string) (GetConnectorShardStatusResult, error) GetConnectorShardStatusByShard(projectName, topicName, connectorId, shardId string) (ConnectorShardStatusEntry, error)

  • 參數

    • projectName: project name

    • topicName: topic name

    • shardId: The id of the shard.

    • connectorId: The id of the connector.

  • return

// getConnectorShardStatus
type GetConnectorShardStatusResult struct {
    ShardStatus map[string]ConnectorShardStatusEntry `json:"ShardStatusInfos"`
}
// GetConnectorShardStatusByShard
type ConnectorShardStatusEntry struct {
    StartSequence    int64               `json:"StartSequence"`
    EndSequence      int64               `json:"EndSequence"`
    CurrentSequence  int64               `json:"CurrentSequence"`
    CurrentTimestamp int64               `json:"CurrentTimestamp"`
    UpdateTime       int64               `json:"UpdateTime"`
    State            ConnectorShardState `json:"State"`
    LastErrorMessage string              `json:"LastErrorMessage"`
    DiscardCount     int64               `json:"DiscardCount"`
    DoneTime         int64               `json:"DoneTime"`
    WorkerAddress    string              `json:"WorkerAddress"`
}
  • error

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • 示例

func getConnectorShardStatus(dh datahub.DataHub, projectName, topicName, connectorId string) {
    gcs, err := dh.GetConnectorShardStatus(projectName, topicName, connectorId)
    if err != nil {
        fmt.Println("get connector shard status failed")
        fmt.Println(err)
        return
    }
    fmt.Println("get connector shard status successful")
    for shard, status := range gcs.ShardStatus {
        fmt.Println(shard, status.State)
    }
    shardId := "0"
    gc, err := dh.GetConnectorShardStatusByShard(projectName, topicName, connectorId, shardId)
    if err != nil {
        fmt.Println("get connector shard status failed")
        fmt.Println(err)
        return
    }
    fmt.Println("get connector shard status successful")
    fmt.Println(*gc)
}

重啟connector shard

可以重啟topic下的所有shard,也可以重啟topic下的指定shard。

說明

ReloadConnector(projectName, topicName, connectorId string) error ReloadConnectorByShard(projectName, topicName, connectorId, shardId string) error

  • 參數

    • projectName: project name

    • topicName: topic name

    • connectorId: The id of the connector.

    • shardId: The id of the shard.

  • return

  • error

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • 示例

func reloadConnector(dh datahub.DataHub, projectName, topicName, connectorId string) {
    if err := dh.ReloadConnector(projectName, topicName, connectorId); err != nil {
        fmt.Println("reload connector shard failed")
        fmt.Println(err)
        return
    }
    fmt.Println("reload connector shard successful")
    shardId := "2"
    if err := dh.ReloadConnectorByShard(projectName, topicName, connectorId, shardId); err != nil {
        fmt.Println("reload connector shard failed")
        fmt.Println(err)
        return
    }
    fmt.Println("reload connector shard successful")
}

添加新field

可以給connector添加指定列,但要求datahub的topic中和odps都存在對應的列。

說明

AppendConnectorField(projectName, topicName, connectorId, fieldName string) error

  • 參數

    • projectName: project name

    • topicName: topic name

    • connectorId: The id of the connector.

    • fieldName: The name of the field.

  • return

  • error

    • ResourceNotFoundError

    • InvalidParameterError

  • 示例

func appendConnectorField(dh datahub.DataHub, projectName, topicName, connectorId string) {
    if err := dh.AppendConnectorField(projectName, topicName, connectorId, "field2"); err != nil {
        fmt.Println("append filed failed")
        fmt.Println(err)
        return
    }
    fmt.Println("append filed successful")
}

更新connector狀態

connector狀態分兩種,CONNECTOR_PAUSED和CONNECTOR_RUNNING,分別表示停止和運行中。

說明

UpdateConnectorState(projectName, topicName, connectorId string, state ConnectorState) error

  • 參數

    • projectName: project name

    • topicName: topic name

    • connectorId: The id of the connector.

    • state:The state of the connector which you want update.

  • return

  • error

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • 示例

func updateConnectorState(dh datahub.DataHub, projectName, topicName, connectorId string) {
    if err := dh.UpdateConnectorState(projectName, topicName, connectorId, datahub.ConnectorStopped); err != nil {
        fmt.Println("update connector state failed")
        fmt.Println(err)
        return
    }
    fmt.Println("update connector state successful")
    if err := dh.UpdateConnectorState(projectName, topicName, connectorId, datahub.ConnectorRunning); err != nil {
        fmt.Println("update connector state failed")
        fmt.Println(err)
        return
    }
    fmt.Println("update connector state successful")
}

更新connector點位信息

說明

UpdateConnectorOffset(projectName, topicName, connectorId, shardId string, offset ConnectorOffset) error

  • 參數

    • projectName: project name

    • topicName: topic name

    • shardId: The id of the shard.

    • connectorId: The id of the connector.

    • offset: The connector offset.

  • return

  • error

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • 示例

func updateConnectorOffset(dh datahub.DataHub, projectName, topicName, connectorId string) {
    shardId := "10"
    offset := datahub.ConnectorOffset{
        Timestamp: 1565864139000,
        Sequence:  104,
    }
    dh.UpdateConnectorState(projectName, topicName, connectorId, datahub.ConnectorStopped)
    defer dh.UpdateConnectorState(projectName, topicName, connectorId, datahub.ConnectorRunning)
    if err := dh.UpdateConnectorOffset(projectName, topicName, connectorId, shardId, offset); err != nil {
        fmt.Println("update connector offset failed")
        fmt.Println(err)
        return
    }
    fmt.Println("update connector offset successful")
}

查詢connector完成時間

只有MaxCompute可以查詢完成時間。

說明

GetConnectorDoneTime(projectName, topicName, connectorId string) (*GetConnectorDoneTimeResult, error)

  • 參數

    • projectName: project name

    • topicName: topic name

    • connectorId: The id of the connector.

  • return

1. type GetConnectorDoneTimeResult struct {
2.     DoneTime int64 `json:"DoneTime"`
3. }
  • error

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • 示例

func doneTime(dh datahub.DataHub, projectName, topicName, connectorId string) {
    gcd, err := dh.GetConnectorDoneTime(projectName, topicName, connectorId)
    if err != nil {
        fmt.Println("get connector done time failed")
        fmt.Println(err)
        return
    }
    fmt.Println("get connector done time successful")
    fmt.Println(gcd.DoneTime)
}

subscription操作

訂閱服務提供了服務端保存用戶消費點位的功能,只需要通過簡單配置和處理,就可以實現高可用的點位存儲服務。

創建subscription

說明

CreateSubscription(projectName, topicName, comment string) error

  • 參數

    • projectName: project name

    • topicName: topic name

    • comment: subscription comment

  • return

  • error

    • ResourceExistError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • 示例

func createSubscription() {
    csr, err := dh.CreateSubscription(projectName, topicName, "sub comment")
    if err != nil {
        fmt.Println("create subscription failed")
        fmt.Println(err)
        return
    }
    fmt.Println("create subscription successful")
    fmt.Println(*csr)
}

刪除subscription

說明

DeleteSubscription(projectName, topicName, subId string) error

  • 參數

    • projectName: project name

    • topicName: topic name

    • subId: The id of the subscription.

  • return

  • error

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • 示例

func delSubscription(dh datahub.DataHub, projectName, topicName string) {
    subId := "1565577384801DCN0O"
    if err := dh.DeleteSubscription(projectName, topicName, subId); err != nil {
        fmt.Println("delete subscription failed")
        return
    }
    fmt.Println("delete subscription successful")
}

查詢subscription

說明

GetSubscription(projectName, topicName, subId string) (*GetSubscriptionResult, error)

  • 參數

    • projectName: project name

    • topicName: topic name

    • subId: The id of the subscription.

  • return

 type GetSubscriptionResult struct {
     SubscriptionEntry
 }
  • error

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • 示例

func getSubscription(dh datahub.DataHub, projectName, topicName string) {
    subId := "1565577384801DCN0O"
    gs, err := dh.GetSubscription(projectName, topicName, subId)
    if err != nil {
        fmt.Println("get subscription failed")
        fmt.Println(err)
        return
    }
    fmt.Println("get subscription successful")
    fmt.Println(gs)
}

列出subscription

通過pageIndex和pageSize獲取指定范圍的subscription信息,如pageIndex=1, pageSize=10,獲取1-10個subscription; pageIndex=2, pageSize=5則獲取6-10的subscription。

說明

ListSubscription(projectName, topicName string, pageIndex, pageSize int) (*ListSubscriptionResult, error)

  • 參數

    • projectName: project name

    • topicName: topic name

    • pageIndex: The page index used to list subscriptions. - pageSize: The page size used to list subscriptions.

  • return

type ListSubscriptionResult struct {
    TotalCount    int64               `json:"TotalCount"`
    Subscriptions []SubscriptionEntry `json:"Subscriptions"`
}
  • error

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • 示例

func listSubscription(dh datahub.DataHub, projectName, topicName string) {
    pageIndex := 1
    pageSize := 5
    ls, err := dh.ListSubscription(projectName, topicName, pageIndex, pageSize)
    if err != nil {
        fmt.Println("get subscription list failed")
        fmt.Println(err)
        return
    }
    fmt.Println("get subscription list successful")
    for _, sub := range ls.Subscriptions {
        fmt.Println(sub)
    }
}

更新subscription

目前僅支持更新subscription comment

說明

UpdateSubscription(projectName, topicName, subId, comment string) error

  • 參數

    • projectName: project name

    • topicName: topic name

    • subId: The id of the subscription.

    • comment: subcription comment

  • return

  • error

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • 示例

func updateSubscription(dh datahub.DataHub, projectName, topicName string) {
    subId := "1565580329258VXSY8"
    if err := dh.UpdateSubscription(projectName, topicName, subId, "new sub comment"); err != nil {
        fmt.Println("update subscription comment failed")
        fmt.Println(err)
        return
    }
    fmt.Println("update subscription comment successful")
}

更新subscription狀態

subscription 有兩種狀態,SUB_OFFLINE 和 SUB_ONLINE,分別表示離線和在線。

說明

UpdateSubscriptionState(projectName, topicName, subId string, state SubscriptionState) error

  • 參數

    • projectName: project name

    • topicName: topic name

    • subId: The id of the subscription.

    • state: The state you want to change.

  • return

  • error

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • 示例

func updateSubState(dh datahub.DataHub, projectName, topicName string) {
    subId := "1565580329258VXSY8"
    if err := dh.UpdateSubscriptionState(projectName, topicName, subId, datahub.SUB_OFFLINE); err != nil {
        fmt.Println("update subscription state failed")
        fmt.Println(err)
        return
    }
    fmt.Println("update subscription state successful")
}

offset操作

一個subscription創建后,初始狀態是未消費的,要使用subscription服務提供的點位存儲功能,需要進行一些offset操作。

初始化offset

初始化subscrition是使用subscription進行點位操作的第一步。一個subscription不支持并行操作,如果需要在多個進程中消費同一份數據,則需要使用不同的subscription。調用OpenSubscriptionSession之后,獲取的點位信息中,SessionId會+1,并且之前的session失效,無法進行更新offset操作。

說明

OpenSubscriptionSession(projectName, topicName, subId string, shardIds []string) (*OpenSubscriptionSessionResult, error)

  • 參數

    • projectName: project name

    • topicName: topic name

    • subId: The id of the subscription.

    • shardIds: The id list of the shards.

  • return

type OpenSubscriptionSessionResult struct {
    Offsets map[string]SubscriptionOffset `json:"Offsets"`
}
// SubscriptionOffset
type SubscriptionOffset struct {
    Timestamp int64  `json:"Timestamp"`
    Sequence  int64  `json:"Sequence"`
    VersionId int64  `json:"Version"`
    SessionId *int64 `json:"SessionId"`
}
  • error

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • 示例

func openOffset(dh datahub.DataHub, projectName, topicName string) {
    subId := "1565580329258VXSY8"
    shardIds := []string{"0", "1", "2"}
    oss, err := dh.OpenSubscriptionSession(projectName, topicName, subId, shardIds)
    if err != nil {
        fmt.Println("open session failed")
        fmt.Println(err)
        return
    }
    fmt.Println("open session successful")
    fmt.Println(oss)
}

獲取offset

獲取subscription的當前點位信息。與OpenSubscriptionSession不同的是,GetSubscriptionOffse獲取的點位信息中SubscriptionOffset的SessionId為nil,是無法進行commit點位操作的,因此GetSubscriptionOffset一般用來查看點位信息。

說明

GetSubscriptionOffset(projectName, topicName, subId string, shardIds []string) (*GetSubscriptionOffsetResult, error)

  • 參數

    • projectName: project name

    • topicName: topic name

    • subId: The id of the subscription.

    • shardIds: The id list of the shards.

  • return

type OpenSubscriptionSessionResult struct {
    Offsets map[string]SubscriptionOffset `json:"Offsets"`
}
// SubscriptionOffset
type SubscriptionOffset struct {
    Timestamp int64  `json:"Timestamp"`
    Sequence  int64  `json:"Sequence"`
    VersionId int64  `json:"Version"`
    SessionId *int64 `json:"SessionId"`
}
  • error

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • 示例

func getOffset(dh datahub.DataHub, projectName, topicName string) {
    subId := "1565580329258VXSY8"
    shardIds := []string{"0", "1", "2"}
    gss, err := dh.GetSubscriptionOffset(projectName, topicName, subId, shardIds)
    if err != nil {
        fmt.Println("get session failed")
        fmt.Println(err)
        return
    }
    fmt.Println("get session successful")
    fmt.Println(gss)
}

更新offset

更新點位時會驗證versionId和sessionId,必須與當前session一致才會更新成功。更新點位時,需要同時設置Timestamp和Sequence,才會更新為有效點位,如果兩者不對應,則會更新點位到Timestamp對應的點位,建議更新點位時,選擇record中對應的Timestamp和Sequence進行點位更新。

說明

CommitSubscriptionOffset(projectName, topicName, subId string, offsets map[string]SubscriptionOffset) error

  • 參數

    • projectName: project name

    • topicName: topic name

    • subId: The id of the subscription.

    • offsets: The offset map of shards.

  • return

  • error

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • 示例

func updateOffset() {
    shardIds := []string{"0", "1", "2"}
    oss, err := dh.OpenSubscriptionSession(projectName, topicName, subId, shardIds)
    if err != nil {
        fmt.Println("open session failed")
        fmt.Println(err)
    }
    fmt.Println("open session successful")
    fmt.Println(oss)
    offset := oss.Offsets["0"]
    // set offset message
    offset.Sequence = 900
    offset.Timestamp = 1565593166690
    offsetMap := map[string]datahub.SubscriptionOffset{
        "0": offset,
    }
    if err := dh.CommitSubscriptionOffset(projectName, topicName, subId, offsetMap); err != nil {
        if _, ok := err.(*datahub.SubscriptionOfflineError); ok {
            fmt.Println("the subscription has offline")
        } else if _, ok := err.(*datahub.SubscriptionSessionInvalidError); ok {
            fmt.Println("the subscription is open elsewhere")
        } else if _, ok := err.(*datahub.SubscriptionOffsetResetError); ok {
            fmt.Println("the subscription is reset elsewhere")
        } else {
            fmt.Println(err)
        }
        fmt.Println("update offset failed")
        return
    }
    fmt.Println("update offset successful")
}

重置offset

重置offset可以將offset重置到某個時間點上,重置之后,并且獲取的offset信息中,VersionId會+1,之前的session失效,無法進行更新點位操作。

說明

ResetSubscriptionOffset(projectName, topicName, subId string, offsets map[string]SubscriptionOffset) error

  • 參數

    • projectName: project name

    • topicName: topic name

    • subId: The id of the subscription.

    • offsets: The offset map of shards.

  • return

  • error

    • ResourceNotFoundError

    • AuthorizationFailedError

    • DatahubClientError

    • InvalidParameterError

  • 示例

func resetOffset(dh datahub.DataHub, projectName, topicName string) {
    subId := "1565580329258VXSY8"
    offset := datahub.SubscriptionOffset{
        Timestamp: 1565593166690,
    }
    offsetMap := map[string]datahub.SubscriptionOffset{
        "1": offset,
    }
    if err := dh.ResetSubscriptionOffset(projectName, topicName, subId, offsetMap); err != nil {
        fmt.Println("reset offset failed")
        fmt.Println(err)
        return
    }
    fmt.Println("reset offset successful")
}

error類型

GO SDK對datahub的錯誤類型進行了整理,用戶可以使用類型斷言進行錯誤類型的判斷,然后根據錯誤的類型進行響應的處理。其中錯誤類型中,DatahubClientError、LimitExceededError、ServiceTemporaryUnavailableError 屬于可重試錯誤,除此之外,其余error屬于不可重試錯誤,而DatahubClientError中包含部分可重試錯誤,例如server busy,server unavailable等,因此建議遇到可重試error時,可以在代碼邏輯中添加重試邏輯,但應嚴格限制重試次數。

類名

錯誤碼

描述

InvalidParameterError

InvalidParameter, InvalidCursor

非法參數

ResourceNotFoundError

ResourceNotFound, NoSuchProject, NoSuchTopic, NoSuchShard, NoSuchSubscription, NoSuchConnector, NoSuchMeteringInfo

訪問的資源不存在(注:進行Split/Merge操作后,立即發送其他請求,有可能會拋出該異常 )。

ResourceExistError

ResourceAlreadyExist, ProjectAlreadyExist, TopicAlreadyExist, ConnectorAlreadyExist

資源已存在(創建時如果資源已存在,就會拋出這個異常。

SeekOutOfRangeError

SeekOutOfRange

getCursor時,給的sequence不在有效范圍內(通常數據已過期),或給的timestamp大于當前時間。

AuthorizationFailedError

Unauthorized

Authorization 簽名解析異常,檢查AK是否填寫正確。

NoPermissionError

NoPermission, OperationDenied

沒有權限,通常是RAM配置不正確,或沒有正確授權子賬號。

NewShardSealedError

InvalidShardOperation

shard 處于CLOSED狀態可讀不可寫,繼續往CLOSED的shard 寫數據,或讀到最后一條數據后繼續讀取,會拋出該異常。

LimitExceededError

LimitExceeded

接口使用超限,參考限制描述

SubscriptionOfflineError

SubscriptionOffline

訂閱處于下線狀態不可用。

SubscriptionSessionInvalidError

OffsetSessionChanged, OffsetSessionClosed

訂閱會話異常,使用訂閱時會建立一個session,用于提交點位,如果有其他客戶端使用該訂閱,會得到該異常。

SubscriptionOffsetResetError

OffsetReseted

訂閱點位被重置。

MalformedRecordError

MalformedRecord、ShardNotReady

非法的 Record 格式,可能的情況有:schema 不正確、包含非utf-8字符、客戶端使用pb而服務端不支持等。

ServiceTemporaryUnavailableError

-

一般是網絡問題,例如連接異常斷開,通常重試即可。

DatahubClientError

其他所有,并且是所有異常的基類

如排除以上異常情況,通常重試即可,但應限制重試次數。

DatahubClientError

datahub的基礎錯誤類型,所有的error都繼承了這個錯誤類型。datahub的錯誤類型除了已經定義的錯誤類型,其余錯誤均屬于DatahubClientError,其中包括服務器busy、服務器unavailable等可重試錯誤,用戶可以在自己的代碼邏輯中添加一些重試機制。

type DatahubClientError struct {
    StatusCode int    `json:"StatusCode"`   // Http status code
    RequestId  string `json:"RequestId"`    // Request-id to trace the request
    Code       string `json:"ErrorCode"`    // Datahub error code
    Message    string `json:"ErrorMessage"` // Error msg of the error code
}

error使用示例:

func example_error() {
    accessId := ""
    accessKey := ""
    endpoint := ""
    projectName := "datahub_go_test"
    maxRetry := 3
    dh := datahub.New(accessId, accessKey, endpoint)
    if err := dh.CreateProject(projectName, "project comment"); err != nil {
        if _, ok := err.(*datahub.InvalidParameterError); ok {
            fmt.Println("invalid parameter,please check your input parameter")
        } else if _, ok := err.(*datahub.ResourceExistError); ok {
            fmt.Println("project already exists")
        } else if _, ok := err.(*datahub.AuthorizationFailedError); ok {
            fmt.Println("accessId or accessKey err,please check your accessId and accessKey")
        } else if _, ok := err.(*datahub.LimitExceededError); ok {
            fmt.Println("limit exceed, so retry")
            for i := 0; i < maxRetry; i++ {
                // wait 5 seconds
                time.Sleep(5 * time.Second)
                if err := dh.CreateProject(projectName, "project comment"); err != nil {
                    fmt.Println("create project failed")
                    fmt.Println(err)
                } else {
                    fmt.Println("create project successful")
                    break
                }
            }
        } else {
            fmt.Println("unknown error")
            fmt.Println(err)
        }
    } else {
        fmt.Println("create project successful")
    }
}