Shard操作
Shard操作分為Shard水平擴展和Shard分裂合并兩種模式,應用場景如下
Shard水平擴展不允許合并Shard,分裂合并方式則允許
使用kafka方式消費Topic必須開啟Shard水平擴展
開啟Shard水平擴展后,key range無法使用, 所有Shard的BeginHashKey與EndHashKey是一樣的,無法按HashKey和PartitionKey方式寫入數據,需要自定義在應用層hash取模,并且需要注意擴容導致的寫入shard發生變化
Shard水平擴展模式
DataHub支持Topic Shard水平擴展,創建Topic時開啟Shard擴展模式即可
步驟一
開啟Shard擴展模式
步驟二
點擊圖標(如下圖所示),修改Shard數量
步驟三
查看水平擴展后的Shard
shard分裂合并
DataHub 支持為Topic動態擴容/縮容,通過SplitShard/MergeShard來實現。
使用場景
DataHub具有服務彈性伸縮功能,用戶可根據實時的流量調整Shard數量,來應對突發性的流量增長或達到節約資源的目的。例如在雙11大促期間,大部分Topic數據流量會激增,平時的Shard數量可能完全無法滿足這樣的流量增長,此時可以對其中一些Shard進行Split操作,一變二,二變四,最大可擴容至256個Shard,按目前的流控限制足以達到1280MB/s的流量。在雙11大促后,流量下降,多余的Shard會占用沒有必要的quota,因此可以進行Merge操作,每兩個Shard合并為一個,直到合適為止。
Shard屬性
可以通過ListShard接口獲取所有Shard的信息,每個Shard擁有如下屬性, 樣例:
{
"ShardId": "string",
"State": "string",
"ClosedTime": uint64,
"BeginHashKey": "string",
"EndHashKey": "string",
"ParentShardIds": [string,string,],
"LeftShardId": "string",
"RightShardId": "string"
}
SplitShard
指定一個128 bit的HashKey以及一個ShardID,通過SDK或者Console進行操作.SplitShard操作會將指定的Shard分裂為兩個ChildShard,并且返回ChildShard的ID以及Key信息,同時Parent Shard會被置為CLOSED狀態。例如,Split之前存在如下一個Shard:
ShardId:0 Status:ACTIVE BeginHashKey:00000000000000000000000000000000
EndHashKey:FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF
通過SDK進行Split操作:
String shardId = "0";
SplitShardRequest req = new SplitShardRequest(projectName, topicName, shardId, "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA");
SplitShardResult resp = client.splitShard(req);
最終將會變成如下3個Shard:
ShardId:0 Status:CLOSED BeginHashKey:00000000000000000000000000000000
EndHashKey:FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF
ShardId:1 Status:ACTIVE BeginHashKey:00000000000000000000000000000000
EndHashKey:AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
ShardId:2 Status:ACTIVE BeginHashKey:AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
EndHashKey:FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF
MergeShard
指定兩個相鄰的ShardID,通過SDK或者Console進行操作.MergeShard操作會將指定的兩個Shard合并為一個新的Shard,并且返回新Shard的ID以及Key信息,同時兩個ParentShard會被置為CLOSED狀態。例如,Merge之前存在如下兩個Shard:
ShardId:0 Status:ACTIVE BeginHashKey:00000000000000000000000000000000
EndHashKey:7FFFFFFFFFFFFFFF7FFFFFFFFFFFFFFF
ShardId:1 Status:ACTIVE BeginHashKey:7FFFFFFFFFFFFFFF7FFFFFFFFFFFFFFF
EndHashKey:FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF
通過SDK進行Merge操作:
String shardId = "0";
String adjacentShardId = "1";
MergeShardRequest req = new MergeShardRequest(projectName, topicName, shardId, adjacentShardId);
MergeShardResult resp = client.mergeShard(req);
最終將會變成如下3個Shard:
ShardId:0 Status:CLOSED BeginHashKey:00000000000000000000000000000000
EndHashKey:7FFFFFFFFFFFFFFF7FFFFFFFFFFFFFFF
ShardId:1 Status:CLOSED BeginHashKey:7FFFFFFFFFFFFFFF7FFFFFFFFFFFFFFF
EndHashKey:FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF
ShardId:2 Status:ACTIVE BeginHashKey:00000000000000000000000000000000
EndHashKey:FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF
注意事項
當Shard進行Merge/Split后會被置為CLOSED狀態,該狀態可以繼續消費讀取數據,但是不可寫入,也不可再次進行Merge/Split操作,當到達Topic的lifecycle后該Shard會被回收。如果配置了Connector,對應任務會在復制完該Shard數據后自動掛起,待該Shard回收后會自動刪除任務。Topic在進行Merge/Split后新的Shard需要等待變為ACTIVE狀態后方可正常使用,通常不會超過5秒。