日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

并發導出數據

當使用場景中不關心整個結果集的順序時,您可以使用并發導出數據功能以更快的速度將命中的數據全部返回。

前提條件

參數

參數

說明

table_name

數據表名稱。

index_name

多元索引名稱。

scan_query

query

多元索引的查詢語句。支持精確查詢、模糊查詢、范圍查詢、地理位置查詢、嵌套查詢等,功能和Search接口一致。

limit

掃描數據時一次能返回的數據行數。

max_parallel

最大并發數。請求支持的最大并發數由用戶數據量決定。數據量越大,支持的并發數越多,每次任務前可以通過ComputeSplits API進行獲取。

current_parallel_id

當前并發ID。取值范圍為[0, max_parallel)。

token

用于翻頁功能。ParallelScan請求結果中有下一次進行翻頁的token,使用該token可以接著上一次的結果繼續讀取數據。

alive_time

ParallelScan的當前任務有效時間,也是token的有效時間。默認值為60,建議使用默認值,單位為秒。如果在有效時間內沒有發起下一次請求,則不能繼續讀取數據。持續發起請求會刷新token有效時間。

說明

動態修改schema中的切換索引、服務端單臺機器故障、服務端負載均衡等均會導致Session提前過期,此時需要重新創建Session。

columns_to_get

指定分組結果中需要返回的列名,可以通過將列名加入columns來實現。

如果需要返回多元索引中的所有列,則可以通過設置return_typeRETURN_ALL_FROM_INDEX實現。

session_id

本次并發掃描數據任務的sessionId。創建Session可以通過ComputeSplits API來創建,同時獲得本次任務支持的最大并發數。

示例

以下示例用于并發導出數據。

def fetch_rows_per_thread(query, session_id, current_thread_id, max_thread_num):
    token = None

    while True:
        try:
            scan_query = ScanQuery(query, limit = 20, next_token = token, current_parallel_id = current_thread_id,
                                   max_parallel = max_thread_num, alive_time = 30)

            response = client.parallel_scan(
                table_name, index_name, scan_query, session_id,
                columns_to_get = ColumnsToGet(return_type=ColumnReturnType.ALL_FROM_INDEX))

            for row in response.rows:
                print("%s:%s" % (threading.currentThread().name, str(row)))

            if len(response.next_token) == 0:
                break
            else:
                token = response.next_token
        except OTSServiceError as e:
            print (e)
        except OTSClientError as e:
            print (e)

def parallel_scan(table_name, index_name):
    response = client.compute_splits(table_name, index_name)

    query = TermQuery('d', 0.1)

    params = []
    for i in range(response.splits_size):
        params.append((([query, response.session_id, i, response.splits_size], None)))

    pool = threadpool.ThreadPool(response.splits_size)
    requests = threadpool.makeRequests(fetch_rows_per_thread, params)
    [pool.putRequest(req) for req in requests]
    pool.wait()