日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

并發導出數據

當使用場景中不關心整個結果集的順序時,您可以使用并發導出數據功能以更快的速度將命中的數據全部返回。

重要

表格存儲Java SDK5.6.0版本開始支持并發導出數據功能。使用并發導出數據功能時,請確保獲取了正確的Java SDK版本。關于Java SDK歷史迭代版本的更多信息,請參見Java SDK歷史迭代版本

前提條件

參數

參數

說明

tableName

數據表名稱。

indexName

多元索引名稱。

scanQuery

query

多元索引的查詢語句。支持精確查詢、模糊查詢、范圍查詢、地理位置查詢、嵌套查詢等,功能和Search接口一致。

limit

掃描數據時一次能返回的數據行數。

maxParallel

最大并發數。請求支持的最大并發數由用戶數據量決定。數據量越大,支持的并發數越多,每次任務前可以通過ComputeSplits API進行獲取。

currentParallelId

當前并發ID。取值范圍為[0, maxParallel)。

token

用于翻頁功能。ParallelScan請求結果中有下一次進行翻頁的token,使用該token可以接著上一次的結果繼續讀取數據。

aliveTime

ParallelScan的當前任務有效時間,也是token的有效時間。默認值為60,建議使用默認值,單位為秒。如果在有效時間內沒有發起下一次請求,則不能繼續讀取數據。持續發起請求會刷新token有效時間。

說明

動態修改schema中的切換索引、服務端單臺機器故障、服務端負載均衡等均會導致Session提前過期,此時需要重新創建Session。

columnsToGet

指定分組結果中需要返回的列名,可以通過將列名加入Columns來實現。

如果需要返回多元索引中的所有列,則可以使用更簡潔的ReturnAllFromIndex實現。

重要

此處不能使用ReturnAll。

sessionId

本次并發掃描數據任務的sessionId。創建Session可以通過ComputeSplits API來創建,同時獲得本次任務支持的最大并發數。

示例

請根據實際選擇單并發掃描數據和多線程并發掃描數據。

單并發掃描數據

相對于多并發掃描數據,單并發掃描數據的代碼更簡單,單并發代碼無需關心currentParallelIdmaxParallel參數。單并發使用方式的整體吞吐比Search接口方式高,但是比多線程多并發使用方式的吞吐低。

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.model.ComputeSplitsRequest;
import com.alicloud.openservices.tablestore.model.ComputeSplitsResponse;
import com.alicloud.openservices.tablestore.model.Row;
import com.alicloud.openservices.tablestore.model.SearchIndexSplitsOptions;
import com.alicloud.openservices.tablestore.model.iterator.RowIterator;
import com.alicloud.openservices.tablestore.model.search.ParallelScanRequest;
import com.alicloud.openservices.tablestore.model.search.ParallelScanResponse;
import com.alicloud.openservices.tablestore.model.search.ScanQuery;
import com.alicloud.openservices.tablestore.model.search.SearchRequest.ColumnsToGet;
import com.alicloud.openservices.tablestore.model.search.query.MatchAllQuery;
import com.alicloud.openservices.tablestore.model.search.query.Query;
import com.alicloud.openservices.tablestore.model.search.query.QueryBuilders;

public class Test {

    public static List<Row> scanQuery(final SyncClient client) {
        String tableName = "<TableName>";
        String indexName = "<IndexName>";
        //獲取sessionId和本次請求支持的最大并發數。
        ComputeSplitsRequest computeSplitsRequest = new ComputeSplitsRequest();
        computeSplitsRequest.setTableName(tableName);
        computeSplitsRequest.setSplitsOptions(new SearchIndexSplitsOptions(indexName));
        ComputeSplitsResponse computeSplitsResponse = client.computeSplits(computeSplitsRequest);
        byte[] sessionId = computeSplitsResponse.getSessionId();
        int splitsSize = computeSplitsResponse.getSplitsSize();
        /*
         *  創建并發掃描數據請求。
         */
        ParallelScanRequest parallelScanRequest = new ParallelScanRequest();
        parallelScanRequest.setTableName(tableName);
        parallelScanRequest.setIndexName(indexName);
        ScanQuery scanQuery = new ScanQuery();
        //該query決定了掃描出的數據范圍,可用于構建嵌套的復雜的query。
        Query query = new MatchAllQuery();
        scanQuery.setQuery(query);

        //設置單次請求返回的數據行數。
        scanQuery.setLimit(2000);
        parallelScanRequest.setScanQuery(scanQuery);
        ColumnsToGet columnsToGet = new ColumnsToGet();
        columnsToGet.setColumns(Arrays.asList("col_1", "col_2"));
        parallelScanRequest.setColumnsToGet(columnsToGet);
        parallelScanRequest.setSessionId(sessionId);

        /*
         * 使用builder模式創建并發掃描數據請求,功能與前面一致。
         */
        ParallelScanRequest parallelScanRequestByBuilder = ParallelScanRequest.newBuilder()
            .tableName(tableName)
            .indexName(indexName)
            .scanQuery(ScanQuery.newBuilder()
                .query(QueryBuilders.matchAll())
                .limit(2000)
                .build())
            .addColumnsToGet("col_1", "col_2")
            .sessionId(sessionId)
            .build();
        List<Row> result = new ArrayList<>();

        /*
         * 使用原生的API掃描數據。
         */
        {
            ParallelScanResponse parallelScanResponse = client.parallelScan(parallelScanRequest);
            //下次請求的ScanQuery的token。
            byte[] nextToken = parallelScanResponse.getNextToken();
            //獲取數據。
            List<Row> rows = parallelScanResponse.getRows();
            result.addAll(rows);
            while (nextToken != null) {
                //設置token。
                parallelScanRequest.getScanQuery().setToken(nextToken);
                //繼續掃描數據。
                parallelScanResponse = client.parallelScan(parallelScanRequest);
                //獲取數據。
                rows = parallelScanResponse.getRows();
                result.addAll(rows);
                nextToken = parallelScanResponse.getNextToken();
            }
        }

        /*
         *  推薦方式。
         *  使用iterator方式掃描所有匹配數據。使用方式上更簡單,速度和前面方法一致。
         */
        {
            RowIterator iterator = client.createParallelScanIterator(parallelScanRequestByBuilder);
            while (iterator.hasNext()) {
                Row row = iterator.next();
                result.add(row);
                //獲取具體的值。
                String col_1 = row.getLatestColumn("col_1").getValue().asString();
                long col_2 = row.getLatestColumn("col_2").getValue().asLong();
            }
        }

        /*
         * 關于失敗重試的問題,如果本函數的外部調用者有重試機制或者不需要考慮失敗重試問題,可以忽略此部分內容。
         * 為了保證可用性,遇到任何異常均推薦進行任務級別的重試,重新開始一個新的ParallelScan任務。
         * 異常分為如下兩種:
         * 1、服務端Session異常OTSSessionExpired。
         * 2、調用者客戶端網絡等異常。
         */
        try {
            //正常處理邏輯。
            {
                RowIterator iterator = client.createParallelScanIterator(parallelScanRequestByBuilder);
                while (iterator.hasNext()) {
                    Row row = iterator.next();
                    //處理row,內存足夠大時可直接放到list中。
                    result.add(row);
                }
            }
        } catch (Exception ex) {
            //重試。
            {
                result.clear();
                RowIterator iterator = client.createParallelScanIterator(parallelScanRequestByBuilder);
                while (iterator.hasNext()) {
                    Row row = iterator.next();
                    //處理row,內存足夠大時可直接放到list中。
                    result.add(row);
                }
            }
        }
        return result;
    }
}

多線程并發掃描數據

import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.model.ComputeSplitsRequest;
import com.alicloud.openservices.tablestore.model.ComputeSplitsResponse;
import com.alicloud.openservices.tablestore.model.Row;
import com.alicloud.openservices.tablestore.model.SearchIndexSplitsOptions;
import com.alicloud.openservices.tablestore.model.iterator.RowIterator;
import com.alicloud.openservices.tablestore.model.search.ParallelScanRequest;
import com.alicloud.openservices.tablestore.model.search.ScanQuery;
import com.alicloud.openservices.tablestore.model.search.query.QueryBuilders;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;

public class Test {

    public static void scanQueryWithMultiThread(final SyncClient client, String tableName, String indexName) throws InterruptedException {
        // 獲取機器的CPU數量。
        final int cpuProcessors = Runtime.getRuntime().availableProcessors();
        // 指定客戶端多線程的并發數量。建議和客戶端的CPU核數一致,避免客戶端壓力太大,影響查詢性能。
        final Semaphore semaphore = new Semaphore(cpuProcessors);

        // 獲取sessionId和本次請求支持的最大并發數。
        ComputeSplitsRequest computeSplitsRequest = new ComputeSplitsRequest();
        computeSplitsRequest.setTableName(tableName);
        computeSplitsRequest.setSplitsOptions(new SearchIndexSplitsOptions(indexName));
        ComputeSplitsResponse computeSplitsResponse = client.computeSplits(computeSplitsRequest);
        final byte[] sessionId = computeSplitsResponse.getSessionId();
        final int maxParallel = computeSplitsResponse.getSplitsSize();

        // 業務統計行數使用。
        AtomicLong rowCount = new AtomicLong(0);
        /*
         *  為了使用一個函數實現多線程功能,此處構建一個內部類繼承Thread來使用多線程。
         *  您也可以構建一個正常的外部類,使代碼更有條理。
         */
        final class ThreadForScanQuery extends Thread {
            private final int currentParallelId;

            private ThreadForScanQuery(int currentParallelId) {
                this.currentParallelId = currentParallelId;
                this.setName("ThreadForScanQuery:" + maxParallel + "-" + currentParallelId);  // 設置線程名稱。
            }

            @Override
            public void run() {
                System.out.println("start thread:" + this.getName());
                try {
                    // 正常處理邏輯。
                    {
                        ParallelScanRequest parallelScanRequest = ParallelScanRequest.newBuilder()
                                .tableName(tableName)
                                .indexName(indexName)
                                .scanQuery(ScanQuery.newBuilder()
                                        .query(QueryBuilders.range("col_long").lessThan(10_0000)) // 此處的query決定了獲取什么數據。
                                        .limit(2000)
                                        .currentParallelId(currentParallelId)
                                        .maxParallel(maxParallel)
                                        .build())
                                .addColumnsToGet("col_long", "col_keyword", "col_bool")  // 設置要返回的多元索引中的部分字段,或者使用下行注釋的內容獲取多元索引中全部數據。
                                //.returnAllColumnsFromIndex(true)
                                .sessionId(sessionId)
                                .build();
                        // 使用Iterator形式獲取所有數據。
                        RowIterator ltr = client.createParallelScanIterator(parallelScanRequest);
                        long count = 0;
                        while (ltr.hasNext()) {
                            Row row = ltr.next();
                            // 增加自定義的處理邏輯,此處代碼以統計行數為例介紹。
                            count++;
                        }
                        rowCount.addAndGet(count);
                        System.out.println("thread[" + this.getName() + "] finished. this thread get rows:" + count);
                    }
                } catch (Exception ex) {
                    // 如果有異常,此處需要考慮重試正常處理邏輯。
                } finally {
                    semaphore.release();
                }
            }
        }

        // 多個線程同時執行,currentParallelId取值范圍為[0, maxParallel)。
        List<ThreadForScanQuery> threadList = new ArrayList<ThreadForScanQuery>();
        for (int currentParallelId = 0; currentParallelId < maxParallel; currentParallelId++) {
            ThreadForScanQuery thread = new ThreadForScanQuery(currentParallelId);
            threadList.add(thread);
        }

        // 同時啟動。
        for (ThreadForScanQuery thread : threadList) {
            // 利用semaphore限制同時啟動的線程數量,避免客戶端瓶頸。
            semaphore.acquire();
            thread.start();
        }

        // 主線程阻塞等待所有線程完成任務。
        for (ThreadForScanQuery thread : threadList) {
            thread.join();
        }
        System.out.println("all thread finished! total rows:" + rowCount.get());
    }
}