并發(fā)導(dǎo)出數(shù)據(jù)
當(dāng)使用場(chǎng)景中不關(guān)心整個(gè)結(jié)果集的順序時(shí),您可以使用并發(fā)導(dǎo)出數(shù)據(jù)功能以更快的速度將命中的數(shù)據(jù)全部返回。
前提條件
已初始化OTSClient。具體操作,請(qǐng)參見(jiàn)初始化OTSClient。
已創(chuàng)建數(shù)據(jù)表并寫(xiě)入數(shù)據(jù)。具體操作,請(qǐng)參見(jiàn)創(chuàng)建數(shù)據(jù)表和寫(xiě)入數(shù)據(jù)。
已在數(shù)據(jù)表上創(chuàng)建多元索引。具體操作,請(qǐng)參見(jiàn)創(chuàng)建多元索引。
參數(shù)
參數(shù) | 說(shuō)明 | |
TableName | 數(shù)據(jù)表名稱(chēng)。 | |
IndexName | 多元索引名稱(chēng)。 | |
ScanQuery | Query | 多元索引的查詢(xún)語(yǔ)句。支持精確查詢(xún)、模糊查詢(xún)、范圍查詢(xún)、地理位置查詢(xún)、嵌套查詢(xún)等,功能和Search接口一致。 |
Limit | 掃描數(shù)據(jù)時(shí)一次能返回的數(shù)據(jù)行數(shù)。 | |
MaxParallel | 最大并發(fā)數(shù)。請(qǐng)求支持的最大并發(fā)數(shù)由用戶(hù)數(shù)據(jù)量決定。數(shù)據(jù)量越大,支持的并發(fā)數(shù)越多,每次任務(wù)前可以通過(guò)ComputeSplits API進(jìn)行獲取。 | |
CurrentParallelId | 當(dāng)前并發(fā)ID。取值范圍為[0, MaxParallel)。 | |
Token | 用于翻頁(yè)功能。ParallelScan請(qǐng)求結(jié)果中有下一次進(jìn)行翻頁(yè)的Token,使用該Token可以接著上一次的結(jié)果繼續(xù)讀取數(shù)據(jù)。 | |
AliveTime | ParallelScan的當(dāng)前任務(wù)有效時(shí)間,也是Token的有效時(shí)間。默認(rèn)值為60,建議使用默認(rèn)值,單位為秒。如果在有效時(shí)間內(nèi)沒(méi)有發(fā)起下一次請(qǐng)求,則不能繼續(xù)讀取數(shù)據(jù)。持續(xù)發(fā)起請(qǐng)求會(huì)刷新Token有效時(shí)間。 說(shuō)明 動(dòng)態(tài)修改schema中的切換索引、服務(wù)端單臺(tái)機(jī)器故障、服務(wù)端負(fù)載均衡等均會(huì)導(dǎo)致Session提前過(guò)期,此時(shí)需要重新創(chuàng)建Session。 | |
ColumnsToGet | 指定分組結(jié)果中需要返回的列名,可以通過(guò)將列名加入Columns來(lái)實(shí)現(xiàn)。 如果需要返回多元索引中的所有列,則可以使用更簡(jiǎn)潔的ReturnAllFromIndex實(shí)現(xiàn)。 重要 此處不能使用ReturnAll。 | |
SessionId | 本次并發(fā)掃描數(shù)據(jù)任務(wù)的SessionId。創(chuàng)建Session可以通過(guò)ComputeSplits API來(lái)創(chuàng)建,同時(shí)獲得本次任務(wù)支持的最大并發(fā)數(shù)。 |
示例
以下示例用于單并發(fā)掃描數(shù)據(jù)。
/// <summary>
/// ParallelScan單并發(fā)掃描數(shù)據(jù)。
/// </summary>
public class ParallelScan
{
public static void ParallelScanwithSingleThread(OTSClient otsClient)
{
SearchIndexSplitsOptions options = new SearchIndexSplitsOptions
{
IndexName = IndexName
};
ComputeSplitsRequest computeSplitsRequest = new ComputeSplitsRequest
{
TableName = TableName,
SplitOptions = options
};
ComputeSplitsResponse computeSplitsResponse = otsClient.ComputeSplits(computeSplitsRequest);
MatchAllQuery matchAllQuery = new MatchAllQuery();
ScanQuery scanQuery = new ScanQuery();
scanQuery.AliveTime = 60;
scanQuery.Query = matchAllQuery;
scanQuery.MaxParallel = computeSplitsResponse.SplitsSize;
scanQuery.Limit = 10;
ParallelScanRequest parallelScanRequest = new ParallelScanRequest();
parallelScanRequest.TableName = TableName;
parallelScanRequest.IndexName = IndexName;
parallelScanRequest.ScanQuery = scanQuery;
parallelScanRequest.ColumnToGet = new ColumnsToGet { ReturnAllFromIndex = true };
parallelScanRequest.SessionId = computeSplitsResponse.SessionId;
int total = 0;
List<Row> result = new List<Row>();
ParallelScanResponse parallelScanResponse = otsClient.ParallelScan(parallelScanRequest);
while (parallelScanResponse.NextToken != null)
{
List<Row> rows = new List<Row>(parallelScanResponse.Rows);
total += rows.Count;
result.AddRange(rows);
parallelScanRequest.ScanQuery.Token = parallelScanResponse.NextToken;
parallelScanResponse = otsClient.ParallelScan(parallelScanRequest);
}
foreach (Row row in result)
{
Console.WriteLine(JsonConvert.SerializeObject(row));
}
Console.WriteLine("Total Row Count: {0}", total);
}
}