當您有批量操作、提高命令執行性能等需求時,您可以使用Redis管道傳輸(Pipeline,后面稱為Pipeline)機制。Pipeline可以將多個命令同時發給服務端,減少網絡延遲,并提高性能。云數據庫Tair(兼容 Redis)支持原生Redis Pipeline。
Pipeline簡介
通常情況下,客戶端與Redis服務端通信時采用的是Ping-pong網絡交互模式,Ping-pong模式是指客戶端(Client)發送一個命令后會等待命令的執行結果,在客戶端收到服務器端(Server)返回的結果后,再發送下一個命令,以此類推。
Redis也支持Pipeline模式,不同于Ping-pong模式,Pipeline模式類似流水線的工作模式:客戶端發送一個命令后無需等待執行結果,會繼續發送其他命令;在全部請求發送完畢后,客戶端關閉請求,開始接收響應,收到執行結果后再與之前發送的命令按順序進行一一匹配。在Pipeline模式的具體實現中,大部分Redis客戶端采用批處理的方式,即一次發送多個命令,在接收完所有命令執行結果后再返回給上層業務。
下圖為Ping-pong模式與Pipeline模式的網絡通信示意圖。
使用Pipeline可通過降低網絡往返時延(Round-trip time,簡稱RTT),減少read()
和write()
的系統調用以及進程上下文切換次數,以提升程序的執行效率與性能。
Pipeline在某些場景下非常有效,例如有多個操作命令需要被迅速提交至服務器端,但用戶并不依賴每個操作返回的響應結果,對結果響應也無需立即獲得,那么Pipeline就可以用來作為優化性能的批處理工具。
使用Pipeline時客戶端將獨占與服務器端的連接,此期間將不能進行其他“非Pipeline”類型操作,直至Pipeline被關閉;如果要同時執行其他操作,可以為Pipeline操作單獨建立一個連接,將其與常規操作分開。
更多信息,請參見Redis pipeline。
注意事項
Pipeline不能保證原子性。
Pipeline模式只是將客戶端發送命令的方式改為發送批量命令,而服務端在處理批量命令的數據流時,仍然是解析出多個單命令并按順序執行,各個命令相互獨立,即服務端仍有可能在該過程中執行其他客戶端的命令。如需保證原子性,請使用事務或Lua腳本。
若Pipeline執行過程中發生錯誤,不支持回滾。
Pipeline沒有事務的特性,如待執行命令的前后存在依賴關系,請勿使用Pipeline。
說明某些客戶端(例如redis-py)在實現Pipeline時使用事務命令MULTI、EXEC進行偽裝,請您在使用過程中關注Pipeline與事務的區別,否則可能會產生報錯,關于事務的限制請參見Redis transactions。
由于服務端以及部分客戶端存在緩存區限制,建議單次Pipeline中不要使用過多的命令。
Pipeline的本質為客戶端與服務端的交互模式,與服務端的架構無關,因此集群架構代理模式、集群架構直連模式以及讀寫分離架構實例均支持Pipeline。
說明由于集群架構本身具有一定限制,例如不支持在單個命令中訪問跨Slot的Key、當訪問到不屬于本節點的數據時會產生
-MOVED
錯誤等,請在集群架構中使用Pipeline時確保Pipeline內部的命令符合集群架構的可執行條件,具體限制請參見集群架構與讀寫分離架構實例的命令限制。
代碼示例
性能對比
如下代碼將演示使用Pipeline與不使用Pipeline的性能對比。
package pipeline.kvstore.aliyun.com;
import java.util.Date;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
public class RedisPipelinePerformanceTest {
static final String host = "xxxxxx.m.cnhza.kvstore.aliyuncs.com";
static final int port = 6379;
static final String password = "password";
public static void main(String[] args) {
Jedis jedis = new Jedis(host, port);
//ApsaraDB for Redis的實例密碼
String authString = jedis.auth(password);// password
if (!authString.equals("OK")) {
System.err.println("AUTH Failed: " + authString);
jedis.close();
return;
}
//連續執行多次命令操作
final int COUNT=5000;
String key = "KVStore-Tanghan";
// 1 ---不使用pipeline操作---
jedis.del(key);//初始化key
Date ts1 = new Date();
for (int i = 0; i < COUNT; i++) {
//發送一個請求,并接收一個響應(Send Request and Receive Response)
jedis.incr(key);
}
Date ts2 = new Date();
System.out.println("不用Pipeline > value為:"+jedis.get(key)+" > 操作用時:" + (ts2.getTime() - ts1.getTime())+ "ms");
//2 ----對比使用pipeline操作---
jedis.del(key);//初始化key
Pipeline p1 = jedis.pipelined();
Date ts3 = new Date();
for (int i = 0; i < COUNT; i++) {
//發出請求 Send Request
p1.incr(key);
}
//接收響應 Receive Response
p1.sync();
Date ts4 = new Date();
System.out.println("使用Pipeline > value為:"+jedis.get(key)+" > 操作用時:" + (ts4.getTime() - ts3.getTime())+ "ms");
jedis.close();
}
}
在輸入了正確的云數據庫Tair(兼容 Redis)實例訪問地址和密碼之后,運行以上Java程序,輸出結果如下。從中可以看出使用pipeline的性能要快的多。
不用Pipeline > value為:5000 > 操作用時:5844ms
使用Pipeline > value為:5000 > 操作用時:78ms
響應數據(Response)的處理方式
在Jedis中使用Pipeline時,對于響應數據(Response)的處理有兩種方式,詳情請參見以下代碼示例。
package pipeline.kvstore.aliyun.com;
import java.util.List;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.Response;
public class PipelineClientTest {
static final String host = "xxxxxxxx.m.cnhza.kvstore.aliyuncs.com";
static final int port = 6379;
static final String password = "password";
public static void main(String[] args) {
Jedis jedis = new Jedis(host, port);
// 實例密碼
String authString = jedis.auth(password);// password
if (!authString.equals("OK")) {
System.err.println("AUTH Failed: " + authString);
jedis.close();
return;
}
String key = "KVStore-Test1";
jedis.del(key);//初始化
// -------- 方法1
Pipeline p1 = jedis.pipelined();
System.out.println("-----方法1-----");
for (int i = 0; i < 5; i++) {
p1.incr(key);
System.out.println("Pipeline發送請求");
}
// 發送請求完成,開始接收響應
System.out.println("發送請求完成,開始接收響應");
List<Object> responses = p1.syncAndReturnAll();
if (responses == null || responses.isEmpty()) {
jedis.close();
throw new RuntimeException("Pipeline error: 沒有接收到響應");
}
for (Object resp : responses) {
System.out.println("Pipeline接收響應Response: " + resp.toString());
}
System.out.println();
//-------- 方法2
System.out.println("-----方法2-----");
jedis.del(key);//初始化
Pipeline p2 = jedis.pipelined();
//需要先聲明Response
Response<Long> r1 = p2.incr(key);
System.out.println("Pipeline發送請求");
Response<Long> r2 = p2.incr(key);
System.out.println("Pipeline發送請求");
Response<Long> r3 = p2.incr(key);
System.out.println("Pipeline發送請求");
Response<Long> r4 = p2.incr(key);
System.out.println("Pipeline發送請求");
Response<Long> r5 = p2.incr(key);
System.out.println("Pipeline發送請求");
try{
r1.get(); //此時還未開始接收響應,所以此操作會出錯
}catch(Exception e){
System.out.println(" <<< Pipeline error:還未開始接收響應 >>> ");
}
// 發送請求完成,開始接收響應
System.out.println("發送請求完成,開始接收響應");
p2.sync();
System.out.println("Pipeline接收響應Response: " + r1.get());
System.out.println("Pipeline接收響應Response: " + r2.get());
System.out.println("Pipeline接收響應Response: " + r3.get());
System.out.println("Pipeline接收響應Response: " + r4.get());
System.out.println("Pipeline接收響應Response: " + r5.get());
jedis.close();
}
}
在輸入了正確的云數據庫Tair(兼容 Redis)實例訪問地址和密碼之后,運行以上Java程序,輸出結果如下:
-----方法1-----
Pipeline發送請求
Pipeline發送請求
Pipeline發送請求
Pipeline發送請求
Pipeline發送請求
發送請求完成,開始接收響應
Pipeline接收響應Response: 1
Pipeline接收響應Response: 2
Pipeline接收響應Response: 3
Pipeline接收響應Response: 4
Pipeline接收響應Response: 5
-----方法2-----
Pipeline發送請求
Pipeline發送請求
Pipeline發送請求
Pipeline發送請求
Pipeline發送請求
<<< Pipeline error:還未開始接收響應 >>>
發送請求完成,開始接收響應
Pipeline接收響應Response: 1
Pipeline接收響應Response: 2
Pipeline接收響應Response: 3
Pipeline接收響應Response: 4
Pipeline接收響應Response: 5