表格存儲提供了單行讀取、批量讀取、范圍讀取、迭代讀取和并行讀取的查詢方式用于讀取數據表中數據。數據寫入到數據表后,您可以選擇所需數據查詢方式進行數據讀取。
如果需要了解表格存儲各場景的應用案例,請參見快速玩轉Tablestore入門與實戰。
查詢方式
表格存儲提供的數據讀取接口包括GetRow、BatchGetRow和GetRange。讀取數據時,請根據實際查詢場景使用相應查詢方式讀取數據。
當要讀取帶有自增主鍵列的表數據時,請確保已獲取到包含自增主鍵列值在內的完整主鍵。更多信息,請參見主鍵列自增。如果未記錄自增主鍵列的值,您可以使用范圍讀取數據按照第一個主鍵列確定范圍讀取數據。
查詢方式 | 說明 | 適用場景 |
調用GetRow接口讀取一行數據。 | 適用于能確定完整主鍵且要讀取行數較少的場景。 | |
調用BatchGetRow接口一次請求讀取多行數據或者一次對多張表進行讀取。 BatchGetRow操作由多個GetRow子操作組成,構造子操作的過程與使用GetRow接口時相同。 | 適用于能確定完整主鍵,且要讀取行數較多或者要讀取多個表中數據的場景。 | |
調用GetRange接口讀取一個范圍內的數據。 GetRange操作支持按照確定范圍進行正序讀取和逆序讀取,可以設置要讀取的行數。如果范圍較大,已掃描的行數或者數據量超過一定限制,會停止掃描,并返回已獲取的行和下一個主鍵信息。您可以根據返回的下一個主鍵信息,繼續發起請求,獲取范圍內剩余的行。 | 適用于能確定完整主鍵范圍或者主鍵前綴的場景。 重要 如果不能確定主鍵前綴,您也可以通過設置完整主鍵范圍均為虛擬點INF_MIN和INF_MAX進行全表數據掃描,但是執行此操作會消耗較多計算資源,請謹慎使用。 | |
通過createRangeIterator接口迭代讀取數據。 | 適用于能確定完整主鍵范圍或者主鍵前綴,且需要迭代讀取的場景。 | |
TableStoreReader是表格存儲Java SDK提供的工具類,封裝了BatchGetRow接口,可以實現并發查詢表中數據。同時支持多表查詢、查詢狀態統計、行級別回調和自定義配置功能。 | 適用于能確定完整主鍵,且要讀取行數較多或者要讀取多個表中數據的場景。 |
前提條件
已初始化OTSClient。具體操作,請參見初始化OTSClient。
已創建數據表并寫入數據。
讀取單行數據
調用GetRow接口讀取一行數據。讀取的結果可能有如下兩種:
如果該行存在,則返回該行的各主鍵列以及屬性列。
如果該行不存在,則返回中不包含行,并且不會報錯。
參數
參數 | 說明 |
tableName | 數據表名稱。 |
primaryKey | 行的主鍵。主鍵包括主鍵列名、主鍵類型和主鍵值。 重要 設置的主鍵個數和類型必須和表的主鍵個數和類型一致。 |
columnsToGet | 讀取的列集合,列名可以是主鍵列或屬性列。
說明
|
maxVersions | 最多讀取的版本數。 重要 maxVersions與timeRange必須至少設置一個。
|
timeRange | 讀取版本號范圍或特定版本號的數據。更多信息,請參見TimeRange。 重要 maxVersions與timeRange必須至少設置一個。
timestamp和 時間戳的單位為毫秒,最小值為0,最大值為 |
filter | 使用過濾器,在服務端對讀取結果再進行一次過濾,只返回符合過濾器中條件的數據行。更多信息,請參見過濾器。 說明 當columnsToGet和filter同時使用時,執行順序是先獲取columnsToGet指定的列,再在返回的列中進行條件過濾。 |
示例
讀取數據時,您可以指定要讀取的數據版本、要讀取的列、過濾器、正則過濾等。
讀取最新版本數據和指定列
以下示例用于讀取數據表中的一行數據,設置讀取最新版本的數據和讀取指定的列。
private static void getRow(SyncClient client, String pkValue) {
//構造主鍵。
PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
primaryKeyBuilder.addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString(pkValue));
PrimaryKey primaryKey = primaryKeyBuilder.build();
//讀取一行數據,設置數據表名稱。
SingleRowQueryCriteria criteria = new SingleRowQueryCriteria("<TABLE_NAME>", primaryKey);
//設置讀取最新版本。
criteria.setMaxVersions(1);
GetRowResponse getRowResponse = client.getRow(new GetRowRequest(criteria));
Row row = getRowResponse.getRow();
System.out.println("讀取完畢,結果為: ");
System.out.println(row);
//設置讀取某些列。
criteria.addColumnsToGet("Col0");
getRowResponse = client.getRow(new GetRowRequest(criteria));
row = getRowResponse.getRow();
System.out.println("讀取完畢,結果為:");
System.out.println(row);
}
讀取數據時使用過濾器
以下示例用于讀取數據表中的一行數據,設置讀取最新版本的數據以及根據Col0列的值過濾數據。
private static void getRow(SyncClient client, String pkValue) {
//構造主鍵。
PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
primaryKeyBuilder.addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString(pkValue));
PrimaryKey primaryKey = primaryKeyBuilder.build();
//讀取一行數據,設置數據表名稱。
SingleRowQueryCriteria criteria = new SingleRowQueryCriteria("<TABLE_NAME>", primaryKey);
//設置讀取最新版本。
criteria.setMaxVersions(1);
//設置過濾器,當Col0列的值為0時,返回該行。
SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter("Col0",
SingleColumnValueFilter.CompareOperator.EQUAL, ColumnValue.fromLong(0));
//如果Col0列不存在,則不返回該行。
singleColumnValueFilter.setPassIfMissing(false);
criteria.setFilter(singleColumnValueFilter);
GetRowResponse getRowResponse = client.getRow(new GetRowRequest(criteria));
Row row = getRowResponse.getRow();
System.out.println("讀取完畢,結果為: ");
System.out.println(row);
}
讀取數據時使用正則過濾
以下示例用于讀取數據表一行中Col1列的數據,并對該列的數據執行正則過濾。
private static void getRow(SyncClient client, String pkValue) {
//設置數據表名稱。
SingleRowQueryCriteria criteria = new SingleRowQueryCriteria("<TABLE_NAME>");
//構造主鍵。
PrimaryKey primaryKey = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString(pkValue))
.build();
criteria.setPrimaryKey(primaryKey);
// 設置讀取最新版本。
criteria.setMaxVersions(1);
// 設置過濾器,當cast<int>(regex(Col1)) > 100時,返回該行。
RegexRule regexRule = new RegexRule("t1:([0-9]+),", RegexRule.CastType.VT_INTEGER);
SingleColumnValueRegexFilter filter = new SingleColumnValueRegexFilter("Col1",
regexRule,SingleColumnValueRegexFilter.CompareOperator.GREATER_THAN, ColumnValue.fromLong(100));
criteria.setFilter(filter);
GetRowResponse getRowResponse = client.getRow(new GetRowRequest(criteria));
Row row = getRowResponse.getRow();
System.out.println("讀取完畢,結果為: ");
System.out.println(row);
}
批量讀取數據
調用BatchGetRow接口一次請求讀取多行數據,也支持一次對多張表進行讀取。BatchGetRow由多個GetRow子操作組成。構造子操作的過程與使用GetRow接口時相同。
BatchGetRow的各個子操作獨立執行,表格存儲會分別返回各個子操作的執行結果。
注意事項
批量讀取的所有行采用相同的參數條件,例如
ColumnsToGet=[colA]
,則要讀取的所有行都只讀取colA列。由于批量讀取可能存在部分行失敗的情況,失敗行的錯誤信息在返回的BatchGetRowResponse中,但并不拋出異常。因此調用BatchGetRow接口時,需要檢查返回值,可通過BatchGetRowResponse的isAllSucceed方法判斷是否所有行都獲取成功;通過BatchGetRowResponse的getFailedRows方法獲取失敗行的信息。
BatchGetRow操作單次支持讀取的最大行數為100行。
參數
更多信息,請參見讀取單行數據參數。
示例
以下示例用于讀取10行,設置版本條件、要讀取的列、過濾器等。
private static void batchGetRow(SyncClient client) {
//設置數據表名稱。
MultiRowQueryCriteria multiRowQueryCriteria = new MultiRowQueryCriteria("<TABLE_NAME>");
//加入10個要讀取的行。
for (int i = 0; i < 10; i++) {
PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
primaryKeyBuilder.addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString("pk" + i));
PrimaryKey primaryKey = primaryKeyBuilder.build();
multiRowQueryCriteria.addRow(primaryKey);
}
//添加條件。
multiRowQueryCriteria.setMaxVersions(1);
multiRowQueryCriteria.addColumnsToGet("Col0");
multiRowQueryCriteria.addColumnsToGet("Col1");
SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter("Col0",
SingleColumnValueFilter.CompareOperator.EQUAL, ColumnValue.fromLong(0));
singleColumnValueFilter.setPassIfMissing(false);
multiRowQueryCriteria.setFilter(singleColumnValueFilter);
BatchGetRowRequest batchGetRowRequest = new BatchGetRowRequest();
//BatchGetRow支持讀取多個表的數據,一個multiRowQueryCriteria對應一個表的查詢條件,可以添加多個multiRowQueryCriteria。
batchGetRowRequest.addMultiRowQueryCriteria(multiRowQueryCriteria);
BatchGetRowResponse batchGetRowResponse = client.batchGetRow(batchGetRowRequest);
System.out.println("是否全部成功:" + batchGetRowResponse.isAllSucceed());
System.out.println("讀取完畢,結果為: ");
for (BatchGetRowResponse.RowResult rowResult : batchGetRowResponse.getSucceedRows()) {
System.out.println(rowResult.getRow());
}
if (!batchGetRowResponse.isAllSucceed()) {
for (BatchGetRowResponse.RowResult rowResult : batchGetRowResponse.getFailedRows()) {
System.out.println("失敗的行:" + batchGetRowRequest.getPrimaryKey(rowResult.getTableName(), rowResult.getIndex()));
System.out.println("失敗原因:" + rowResult.getError());
}
/**
* 可以通過createRequestForRetry方法再構造一個請求對失敗的行進行重試。此處只給出構造重試請求的部分。
* 推薦的重試方法是使用SDK的自定義重試策略功能,支持對batch操作的部分行錯誤進行重試。設置重試策略后,調用接口處無需增加重試代碼。
*/
BatchGetRowRequest retryRequest = batchGetRowRequest.createRequestForRetry(batchGetRowResponse.getFailedRows());
}
}
詳細代碼請參見BatchGetRow@GitHub。
范圍讀取數據
調用GetRange接口讀取一個范圍內的數據。
GetRange操作支持按照確定范圍進行正序讀取和逆序讀取,可以設置要讀取的行數。如果范圍較大,已掃描的行數或者數據量超過一定限制,會停止掃描,并返回已獲取的行和下一個主鍵信息。您可以根據返回的下一個主鍵信息,繼續發起請求,獲取范圍內剩余的行。
表格存儲表中的行都是按照主鍵排序的,而主鍵是由全部主鍵列按照順序組成的,所以不能理解為表格存儲會按照某列主鍵排序,這是常見的誤區。
注意事項
GetRange操作遵循最左匹配原則,讀取數據時,依次比較第一主鍵列到第四主鍵列。例如表的主鍵包括PK1、PK2、PK3三個主鍵列,讀取數據時,優先比較PK1是否在開始主鍵與結束主鍵的范圍內,如果PK1在設置的主鍵范圍內,則不會再比較其他的主鍵,返回在PK1主鍵范圍內的數據;如果PK1在設置的主鍵邊界上,則繼續比較PK2是否在開始主鍵與結束主鍵的范圍內,以此類推。關于范圍查詢原理的更多信息,請參見GetRange范圍查詢詳解。
GetRange操作可能在如下情況停止執行并返回數據。
掃描的行數據大小之和達到4 MB。
掃描的行數等于5000。
返回的行數等于最大返回行數。
當前剩余的預留讀吞吐量已全部使用,余量不足以讀取下一條數據。
當使用GetRange掃描的數據量較大時,表格存儲每次請求僅會掃描一次(行數大于5000或者大小大于4 MB停止掃描),超過限制的數據不會繼續返回,需要通過翻頁繼續獲取后面的數據。
參數
參數 | 說明 |
tableName | 數據表名稱。 |
direction | 讀取方向。
假設同一表中有兩個主鍵A和B,A小于B,如果正序讀取 |
inclusiveStartPrimaryKey | 本次范圍讀取的起始主鍵和結束主鍵,起始主鍵和結束主鍵需要是有效的主鍵或者是由INF_MIN和INF_MAX類型組成的虛擬點,虛擬點的列數必須與主鍵相同。 其中INF_MIN表示無限小,任何類型的值都比它大;INF_MAX表示無限大,任何類型的值都比它小。
數據表中的行按主鍵從小到大排序,讀取范圍是一個左閉右開的區間,正序讀取時,返回的是大于等于起始主鍵且小于結束主鍵的所有的行。 |
exclusiveEndPrimaryKey | |
limit | 數據的最大返回行數,此值必須大于 0。 表格存儲按照正序或者逆序返回指定的最大返回行數后即結束該操作的執行,即使該區間內仍有未返回的數據。此時可以通過返回結果中的nextStartPrimaryKey記錄本次讀取到的位置,用于下一次讀取。 |
columnsToGet | 讀取的列集合,列名可以是主鍵列或屬性列。
說明
|
maxVersions | 最多讀取的版本數。 重要 maxVersions與timeRange必須至少設置一個。
|
timeRange | 讀取版本號范圍或特定版本號的數據。更多信息,請參見TimeRange。 重要 maxVersions與timeRange必須至少設置一個。
timestamp和 時間戳的單位為毫秒,最小值為0,最大值為 |
filter | 使用過濾器,在服務端對讀取結果再進行一次過濾,只返回符合過濾器中條件的數據行。更多信息,請參見過濾器。 說明 當columnsToGet和filter同時使用時,執行順序是先獲取columnsToGet指定的列,再在返回的列中進行條件過濾。 |
nextStartPrimaryKey | 根據返回結果中的nextStartPrimaryKey判斷數據是否全部讀取。
|
示例
按照確定范圍讀取數據
以下示例用于按照確定范圍進行正序讀取,判斷nextStartPrimaryKey是否為空,讀取完范圍內的全部數據。
private static void getRange(SyncClient client, String startPkValue, String endPkValue) {
//設置數據表名稱。
RangeRowQueryCriteria rangeRowQueryCriteria = new RangeRowQueryCriteria("<TABLE_NAME>");
//設置起始主鍵。
PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
primaryKeyBuilder.addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString(startPkValue));
rangeRowQueryCriteria.setInclusiveStartPrimaryKey(primaryKeyBuilder.build());
//設置結束主鍵。
primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
primaryKeyBuilder.addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString(endPkValue));
rangeRowQueryCriteria.setExclusiveEndPrimaryKey(primaryKeyBuilder.build());
rangeRowQueryCriteria.setMaxVersions(1);
System.out.println("GetRange的結果為:");
while (true) {
GetRangeResponse getRangeResponse = client.getRange(new GetRangeRequest(rangeRowQueryCriteria));
for (Row row : getRangeResponse.getRows()) {
System.out.println(row);
}
//如果NextStartPrimaryKey不為null,則繼續讀取。
if (getRangeResponse.getNextStartPrimaryKey() != null) {
rangeRowQueryCriteria.setInclusiveStartPrimaryKey(getRangeResponse.getNextStartPrimaryKey());
} else {
break;
}
}
}
按照第一個主鍵列確定范圍讀取數據
以下示例用于按照第一個主鍵列確定范圍、第二主鍵列從最小值(INF_MIN)到最大值(INF_MAX)進行正序讀取,判斷nextStartPrimaryKey是否為null,讀取完范圍內的全部數據。
private static void getRange(SyncClient client, String startPkValue, String endPkValue) {
//設置數據表名稱。
RangeRowQueryCriteria rangeRowQueryCriteria = new RangeRowQueryCriteria("<TABLE_NAME>");
//設置起始主鍵,以兩個主鍵為例。
PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
primaryKeyBuilder.addPrimaryKeyColumn("pk1", PrimaryKeyValue.fromString(startPkValue));//確定值。
primaryKeyBuilder.addPrimaryKeyColumn("pk2", PrimaryKeyValue.INF_MIN);//最小值。
rangeRowQueryCriteria.setInclusiveStartPrimaryKey(primaryKeyBuilder.build());
//設置結束主鍵。
primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
primaryKeyBuilder.addPrimaryKeyColumn("pk1", PrimaryKeyValue.fromString(endPkValue));//確定值。
primaryKeyBuilder.addPrimaryKeyColumn("pk2", PrimaryKeyValue.INF_MAX);//最大值。
rangeRowQueryCriteria.setExclusiveEndPrimaryKey(primaryKeyBuilder.build());
rangeRowQueryCriteria.setMaxVersions(1);
System.out.println("GetRange的結果為:");
while (true) {
GetRangeResponse getRangeResponse = client.getRange(new GetRangeRequest(rangeRowQueryCriteria));
for (Row row : getRangeResponse.getRows()) {
System.out.println(row);
}
//如果nextStartPrimaryKey不為null,則繼續讀取。
if (getRangeResponse.getNextStartPrimaryKey() != null) {
rangeRowQueryCriteria.setInclusiveStartPrimaryKey(getRangeResponse.getNextStartPrimaryKey());
} else {
break;
}
}
}
按照確定范圍讀取數據并對指定列使用正則過濾
以下示例用于讀取主鍵范圍為["pk:2020-01-01.log", "pk:2021-01-01.log")
時Col1列的數據,并對該列的數據執行正則過濾。
private static void getRange(SyncClient client) {
//設置數據表名稱。
RangeRowQueryCriteria criteria = new RangeRowQueryCriteria("<TABLE_NAME>");
//設置主鍵范圍為["pk:2020-01-01.log", "pk:2021-01-01.log"),讀取范圍為左閉右開的區間。
PrimaryKey pk0 = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString("2020-01-01.log"))
.build();
PrimaryKey pk1 = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString("2021-01-01.log"))
.build();
criteria.setInclusiveStartPrimaryKey(pk0);
criteria.setExclusiveEndPrimaryKey(pk1);
//設置讀取最新版本。
criteria.setMaxVersions(1);
//設置過濾器,當cast<int>(regex(Col1)) > 100時,返回該行。
RegexRule regexRule = new RegexRule("t1:([0-9]+),", RegexRule.CastType.VT_INTEGER);
SingleColumnValueRegexFilter filter = new SingleColumnValueRegexFilter("Col1",
regexRule,SingleColumnValueRegexFilter.CompareOperator.GREATER_THAN,ColumnValue.fromLong(100));
criteria.setFilter(filter);
while (true) {
GetRangeResponse resp = client.getRange(new GetRangeRequest(criteria));
for (Row row : resp.getRows()) {
// do something
System.out.println(row);
}
if (resp.getNextStartPrimaryKey() != null) {
criteria.setInclusiveStartPrimaryKey(resp.getNextStartPrimaryKey());
} else {
break;
}
}
}
詳細代碼請參見GetRange@GitHub。
迭代讀取數據
以下示例用于通過createRangeIterator接口迭代讀取exampletable表中的數據。
private static void getRangeByIterator(SyncClient client, String startPkValue, String endPkValue) {
//設置數據表名稱。
RangeIteratorParameter rangeIteratorParameter = new RangeIteratorParameter("<TABLE_NAME>");
//設置起始主鍵。
PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
primaryKeyBuilder.addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString(startPkValue));
rangeIteratorParameter.setInclusiveStartPrimaryKey(primaryKeyBuilder.build());
//設置結束主鍵。
primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();
primaryKeyBuilder.addPrimaryKeyColumn("pk", PrimaryKeyValue.fromString(endPkValue));
rangeIteratorParameter.setExclusiveEndPrimaryKey(primaryKeyBuilder.build());
rangeIteratorParameter.setMaxVersions(1);
Iterator<Row> iterator = client.createRangeIterator(rangeIteratorParameter);
System.out.println("使用Iterator進行GetRange的結果為:");
while (iterator.hasNext()) {
Row row = iterator.next();
System.out.println(row);
}
}
詳細代碼請參見GetRangeByIterator@GitHub。
并發讀取數據
TableStoreReader是表格存儲Java SDK提供的工具類,封裝了BatchGetRow接口,可以實現并發查詢表中數據。同時支持多表查詢、查詢狀態統計、行級別回調和自定義配置功能。
表格存儲從Java SDK 5.16.1版本開始支持此功能,請確保使用了正確的SDK版本。關于Java SDK歷史迭代版本的更多信息,請參見Java SDK歷史迭代版本。
快速上手
構造TableStoreReader。
String endpoint = "<ENDPOINT>"; String accessKeyId = System.getenv("OTS_AK_ENV"); String accessKeySecret = System.getenv("OTS_SK_ENV"); String instanceName = "<INSTANCE_NAME>"; AsyncClientInterface client = new AsyncClient(endpoint, accessKeyId, accessKeySecret, instanceName); TableStoreReaderConfig config = new TableStoreReaderConfig(); ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(1024)) TableStoreReader reader = new DefaultTableStoreReader(client, config, executor, null);
構造查詢請求。
將要查詢的數據緩存到內存中,支持一次添加一條或多條數據。
PrimaryKey pk1 = PrimaryKeyBuilder.createPrimaryKeyBuilder() .addPrimaryKeyColumn("pk1", PrimaryKeyValue.fromLong(0)) .addPrimaryKeyColumn("pk2", PrimaryKeyValue.fromLong(0)) .build(); //新增一列查詢表中,pk1的屬性列。 Future<ReaderResult> readerResult = reader.addPrimaryKeyWithFuture("<TABLE_NAME1>", pk1); //也可以使用List一次添加多列。 List<PrimaryKey> primaryKeyList = new ArrayList<PrimaryKey>(); Future<ReaderResult> readerResult = reader.addPrimaryKeysWithFuture("<TABLE_NAME2>", primaryKeyList);
查詢數據。
將內存中緩存的查詢發送出去。您可以通過同步阻塞方式或者異步方式查詢數據。
同步阻塞方式
reader.flush();
異步方式
reader.send();
獲取查詢結果。
//打印成功查詢和失敗查詢的信息。 for (RowReadResult success : readerResult.get().getSucceedRows()) { System.out.println(success.getRowResult()); } for (RowReadResult fail : readerResult.get().getFailedRows()) { System.out.println(fail.getRowResult()); }
關閉TableStoreReader。
reader.close(); //根據實際需要關閉client和executor。 client.shutdown(); executor.shutdown();
配置項
您可以通過修改TableStoreReaderConfig
自定義配置TableStoreReader。
參數 | 說明 |
checkTableMeta | 是否在添加查詢行時檢查表的結構。默認值為true。 如果在添加查詢行時不需要檢查表結構,請設置此參數為false。 |
bucketCount | Reader內存中的緩存分桶數。默認值為4。 |
bufferSize | 每個分桶的RingBuffer緩沖區大小,默認值為1024。 |
concurrency | 調用batchGetRow的最大并發度。默認值為10。 |
maxBatchRowsCount | 每次batchGetRow的最大請求行數。默認值為100,最大值為100。 |
defaultMaxVersions | 默認情況下,getRow查詢的最大版本數。默認值為1。 |
flushInterval | 自動flush緩存中查詢請求的間隔。默認值為10000。單位為毫秒。 |
logInterval | 自動打印任務狀態的間隔。默認值為10000。單位為毫秒 |
指定查詢條件
您可以指定表級別的查詢參數,例如查詢的最大版本數,要查詢的列、時間范圍等。
//查詢表的col1列過去60秒的最多10個版本。
//設置數據表名稱。
RowQueryCriteria criteria = new RowQueryCriteria("<TABLE_NAME>");
//設置要返回的列。
criteria.addColumnsToGet("col1");
//設置返回的最大版本數。
criteria.setMaxVersions(10);
criteria.setTimeRange(new TimeRange(System.currentTimeMillis() - 60 * 1000, System.currentTimeMillis()));
reader.setRowQueryCriteria(criteria);
完整示例
public class TableStoreReaderDemo {
private static final String endpoint = "<ENDPOINT>";
private static final String accessKeyId = System.getenv("OTS_AK_ENV");
private static final String accessKeySecret = System.getenv("OTS_SK_ENV");
private static final String instanceName = "<INSTANCE_NAME>";
private static AsyncClientInterface client;
private static ExecutorService executor;
private static AtomicLong succeedRows = new AtomicLong();
private static AtomicLong failedRows = new AtomicLong();
public static void main(String[] args) throws ExecutionException, InterruptedException {
/**
* 步驟一:構造TableStoreReader。
*/
//構造AsyncClient。
client = new AsyncClient(endpoint, accessKeyId, accessKeySecret, instanceName);
//構造reader配置類。
TableStoreReaderConfig config = new TableStoreReaderConfig();
{
//以下參數均有默認值,可不配置。
//向reader添加要查詢的數據前,會先檢查表的結構。
config.setCheckTableMeta(true);
//一次請求的最大請求行數,上限為100。
config.setMaxBatchRowsCount(100);
//默認情況下,獲取的columns最大版本數。
config.setDefaultMaxVersions(1);
//發送請求的總并發數。
config.setConcurrency(16);
//內存分桶數。
config.setBucketCount(4);
//將緩存數據全部發送的時間間隔。
config.setFlushInterval(10000);
//日志記錄reader狀態的時間間隔。
config.setLogInterval(10000);
}
//構造用于發送請求的executor。
ThreadFactory threadFactory = new ThreadFactory() {
private final AtomicInteger counter = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "reader-" + counter.getAndIncrement());
}
};
executor = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue(1024), threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
//構造reader的回調函數。
TableStoreCallback<PrimaryKeyWithTable, RowReadResult> callback = new TableStoreCallback<PrimaryKeyWithTable, RowReadResult>() {
@Override
public void onCompleted(PrimaryKeyWithTable req, RowReadResult res) {
succeedRows.incrementAndGet();
}
@Override
public void onFailed(PrimaryKeyWithTable req, Exception ex) {
failedRows.incrementAndGet();
}
};
TableStoreReader reader = new DefaultTableStoreReader(client, config, executor, callback);
/**
* 步驟二:構造查詢請求。
*/
//向內存添加一列要查詢的數據。
PrimaryKey pk1 = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("pk1", PrimaryKeyValue.fromLong(0))
.addPrimaryKeyColumn("pk2", PrimaryKeyValue.fromLong(0))
.build();
reader.addPrimaryKey("<TABLE_NAME1>", pk1);
//向內存添加一列要查詢的數據,并獲取查詢結果Future。
PrimaryKey pk2 = PrimaryKeyBuilder.createPrimaryKeyBuilder()
.addPrimaryKeyColumn("pk1", PrimaryKeyValue.fromLong(0))
.addPrimaryKeyColumn("pk2", PrimaryKeyValue.fromLong(0))
.build();
Future<ReaderResult> readerResult = reader.addPrimaryKeyWithFuture("<TABLE_NAME2>", pk2);
/**
* 步驟三:查詢數據。
*/
//異步將內存中的數據發送出去。
reader.send();
/**
* 步驟四:獲取查詢結果。
*/
//打印成功查詢和失敗查詢的信息。
for (RowReadResult success : readerResult.get().getSucceedRows()) {
System.out.println(success.getRowResult());
}
for (RowReadResult fail : readerResult.get().getFailedRows()) {
System.out.println(fail.getRowResult());
}
/**
* 步驟五:關閉TableStoreReader。
*/
reader.close();
client.shutdown();
executor.shutdown();
}
}