本文為您介紹流式數據通道SDK接口的使用方法。
使用說明
您可以基于MaxCompute Studio通過Java SDK使用MaxCompute流式數據通道服務。
您可以使用以下配置在MaxCompute Studio上添加指定版本的pom依賴。
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-sdk-core</artifactId>
<version>0.36.4-public</version>
</dependency>
接口介紹
不同版本的SDK在使用上有差別,詳情請參見SDK Java Doc。
接口 | 描述 |
TableTunnel | 訪問MaxCompute Tunnel服務的入口類。您可以通過外網或阿里云內網環境對MaxCompute及其Tunnel進行訪問。 |
TableTunnel.StreamUploadSession | 上傳數據會話。 |
TableTunnel.StreamRecordPack | 數據緩存會話。 |
StreamUploadSession
public interface StreamUploadSession {
/**
* 設置P2P模式。
* @param mode
*/
public void setP2pMode(boolean mode);
/**
* 獲取Session ID。
* @return Session ID
*/
public String getId();
/**
* 獲取表結構。
*/
public TableSchema getSchema();
/**
* 創建一個無壓縮{@Link StreamRecordPack}對象。
* @return StreamRecordPack對象。
*/
public StreamRecordPack newRecordPack() throws IOException;
/**
* 創建一個壓縮{@Link StreamRecordPack}對象。
* @param compressOption 數據傳輸壓縮選項。
* @return StreamRecordPack對象。
*/
public StreamRecordPack newRecordPack(CompressOption compressOption) throws IOException, TunnelException;
/**
* 創建一個{@Link Record}對象。
* @return Record對象。
*/
public Record newRecord();
}
StreamRecordPack
public interface StreamRecordPack {
/**
* 追加一條記錄。
* @param record
*/
public void append(Record record) throws IOException;
/**
* @return 返回當前pack存儲的記錄數。
*/
public long getRecordCount();
/**
* 注意:由于在寫到內存緩沖區前,數據會經過多層緩沖區。
* 因此這個值的變化并不是連續的,有可能出現追加數據后,getDataSize不變的場景。
* @return 返回當前pack存儲數據的大小。
*/
public long getDataSize();
/**
* 數據發送到服務端。
* pack對象在flush成功以后可以復用。
* @return traceId。
* @throws IOException。
*/
public String flush() throws IOException;
/**
* 數據發送到服務端。
* pack對象在flush成功以后可以復用。
* @param flushOption 設置write參數 {@link FlushOption}。
* @return flush result。
* @throws IOException。
*/
public FlushResult flush(FlushOption flushOption) throws IOException;
}
使用建議
在使用上述接口時,您可以借鑒如下實踐結果實現性能最大化:
未開啟壓縮場景下,控制每個RecordPack的數據量超過4 MB可以達到最佳性能。
開啟壓縮場景下,控制每個RecordPack的數據量超過1 MB可以達到最佳性能。
文檔內容是否對您有幫助?