高速數據導入API
高速數據導入API服務端為gRPC服務器,使用gRPC協議(protobuf)來定義客戶端接口及其消息交換格式,您可以提交請求列出云原生數據倉庫 AnalyticDB PostgreSQL 版數據庫中的表,或向特定的表寫入數據。
使用限制
云原生數據倉庫 AnalyticDB PostgreSQL 版6.0實例需為v6.6.0及以上版本。云原生數據倉庫 AnalyticDB PostgreSQL 版7.0實例需為v7.0.3及以上版本。AnalyticDB PostgreSQL版Serverless模式實例暫不支持。
高速數據導入API目前僅支持INSERT、MERGE(UPSERT)、UPDATE三種語法,暫不支持DELETE與READ。
使用MERGE(UPSERT)或UPDATE時,需要云原生數據倉庫 AnalyticDB PostgreSQL 版表有主鍵索引。
請勿在多個并發連接中使用高速數據導入API更新相同主鍵的數據,會觸發全局死鎖錯誤,造成部分數據更新失敗。
開啟實時數據服務
在控制臺左上角,選擇實例所在地域。
找到目標實例,單擊實例ID。
在控制臺左側導航欄單擊實時數據消費,單擊左上角開啟實時數據服務。
在彈出的對話框中填寫名稱及服務描述并單擊確定。開通完成后,可在控制臺看到服務狀態和連接信息。
說明服務規格當前不可選,默認為8CU。
使用高速數據導入API
在開通實時數據服務功能后,系統會提供一個用于接收數據的gRPC Server,您可以登錄云原生數據倉庫AnalyticDB PostgreSQL版控制臺,在實時數據消費頁面查看Server端的連接信息。
高速數據導入API使用protobuf,數據結構(消息)和支持的操作(服務)通過.proto
文件定義,.proto
文件為一個普通的文本文件,有關此數據序列化框架的詳細信息,請參見protocol buffer語言指南。
高速數據導入API的.proto
文件定義了客戶端可調用的方法,可以從云原生數據倉庫 AnalyticDB PostgreSQL 版數據庫端獲取元數據信息,并向其寫入數據。
由于高速數據導入API使用了gRPC協議,即Protocol Buffer格式類型進行數據導入。由于云原生數據倉庫 AnalyticDB PostgreSQL 版支持比Protobuf更多的數據類型,因此在導入數據時需要做數據映射。數據映射關系,請參見附錄:數據類型映射。
將高速導入API服務的定義復制并粘貼到一個名為adbpg.proto
的文件中,使用protoc命令編譯對應語言客戶端即可。具體使用方式,請參見protocol buffer使用指南。
syntax = "proto3";
import "google/protobuf/empty.proto";
import "google/protobuf/struct.proto";
import "google/protobuf/timestamp.proto";
package api;
option java_multiple_files = true;
// Connect service Request message
message ConnectRequest {
string Host = 1; // Host address of ADBPG master; must be accessible from gpss server system
int32 Port = 2; // ADBPG master port
string Username = 3; // User or role name that gpss uses to access ADBPG
string Password = 4; // User password
string DB = 5; // Database name
bool UseSSL = 6; // Use SSL or not; ignored, use the gpss config file to config SSL
}
// Connect service Response message
message Session {
string ID = 1; // Id of client connection to gpss
}
// Operation mode
enum Operation {
Insert = 0; // Insert all data into table; behavior of duplicate key or data depends upon the constraints of the target table.
Merge = 1; // Insert and Update
Update = 2; // Update the value of "UpdateColumns" if "MatchColumns" match
Read = 3; // Not supported
}
// Required parameters of the Insert operation
message InsertOption {
repeated string InsertColumns = 1; // Names of the target table columns the insert operation should update; used in 'INSERT INTO', useful for partial loading
bool TruncateTable = 2; // Truncate table before loading?
int64 ErrorLimitCount = 4; // Error limit count; used by external table
int32 ErrorLimitPercentage = 5; // Error limit percentage; used by external table
}
// Required parameters of the Update operation
message UpdateOption {
repeated string MatchColumns = 1; // Names of the target table columns to compare when determining to update or not
repeated string UpdateColumns = 2; // Names of the target table columns to update if MatchColumns match
string Condition = 3; // Optional additional match condition; SQL syntax and used after the 'WHERE' clause
int64 ErrorLimitCount = 4; // Error limit count; used by external table
int32 ErrorLimitPercentage = 5; // Error limit percentage; used by external table
}
// Required parameters of the Merge operation
// Merge operation creates a session-level temp table in StagingSchema
message MergeOption {
repeated string InsertColumns = 1;
repeated string MatchColumns = 2;
repeated string UpdateColumns = 3;
string Condition = 4;
int64 ErrorLimitCount = 5;
int32 ErrorLimitPercentage = 6;
}
message ReadOption {
string MagicCode = 1;
}
// Open service Request message
message OpenRequest {
Session Session = 1; // Session ID returned by Connect
string SchemaName = 2; // Name of the ADBPG Database schema
string TableName = 3; // Name of the ADBPG Database table
string PreSQL = 4; // SQL to execute before gpss loads the data
string PostSQL = 5; // SQL to execute after gpss loads the data
int32 Timeout = 6; // Time to wait before aborting the operation (seconds); not supported
string Encoding = 7; // Encoding of text data; not supported
string StagingSchema = 8; // Schema in which gpss creates external and temp tables; default is to create these tables in the same schema as the target table
oneof Option { // Identify the type of write operation to perform
InsertOption InsertOption = 100;
UpdateOption UpdateOption = 101;
MergeOption MergeOption = 102;
}
}
message DBValue {
oneof DBType {
int32 Int32Value = 1;
int64 Int64Value = 2;
float Float32Value = 5;
double Float64Value = 6;
string StringValue = 7; // Includes types whose values are presented as string but are not a real string type in ADBPG; for example: macaddr, time with time zone, box, etc.
bytes BytesValue = 8;
google.protobuf.Timestamp TimeStampValue = 10; // Time without timezone
google.protobuf.NullValue NullValue = 11;
}
}
message Row {
repeated DBValue Columns = 1;
}
message RowData {
bytes Data = 1; // A single protobuf-encoded Row
}
// Write service Request message
message WriteRequest {
Session Session = 1;
repeated RowData Rows = 2; // The data to load into the target table
}
message ReadRequest {
string MagicCode = 1;
}
message ReadResponse {
string Data = 1;
}
// Close service Response message
message TransferStats { // Status of the data load operation
int64 SuccessCount = 1; // Number of rows successfully loaded
int64 ErrorCount = 2; // Number of error lines if Errorlimit is not reached
repeated string ErrorRows = 3; // Number of rows with incorrectly-formatted data; not supported
}
// Close service Request message
message CloseRequest {
Session session = 1;
int32 MaxErrorRows = 2; // -1: returns all, 0: nothing, >0: max rows
bool Abort = 3;
}
// ListSchema service request message
message ListSchemaRequest {
Session Session = 1;
}
message Schema {
string Name = 1;
string Owner = 2;
}
// ListSchema service response message
message Schemas {
repeated Schema Schemas = 1;
}
// ListTable service request message
message ListTableRequest {
Session Session = 1;
string Schema = 2; // 'public' is the default if no Schema is provided
}
// DescribeTable service request message
message DescribeTableRequest {
Session Session = 1;
string SchemaName = 2;
string TableName = 3;
}
enum RelationType {
Table = 0;
View = 1;
Index = 2;
Sequence = 3;
Special = 4;
Other = 255;
}
message TableInfo {
string Name = 1;
RelationType Type = 2;
}
// ListTable service response message
message Tables {
repeated TableInfo Tables = 1;
}
// DescribeTable service response message
message Columns {
repeated ColumnInfo Columns = 1;
}
message ColumnInfo {
string Name = 1; // Column name
string DatabaseType = 2; // ADBPG data type
bool HasLength = 3; // Contains length information?
int64 Length = 4; // Length if HasLength is true
bool HasPrecisionScale = 5; // Contains precision or scale information?
int64 Precision = 6;
int64 Scale = 7;
bool HasNullable = 8; // Contains Nullable constraint?
bool Nullable = 9;
}
service Gpss {
// Establish a connection to ADBPG Database; returns a Session object
rpc Connect(ConnectRequest) returns (Session) {}
// Disconnect, freeing all resources allocated for a session
rpc Disconnect(Session) returns (google.protobuf.Empty) {}
// Prepare and open a table for write
rpc Open(OpenRequest) returns(google.protobuf.Empty) {}
// Write data to table
rpc Write(WriteRequest) returns(google.protobuf.Empty) {}
// Close a write operation
rpc Close(CloseRequest) returns(TransferStats) {}
// List all available schemas in a database
rpc ListSchema(ListSchemaRequest) returns (Schemas) {}
// List all tables and views in a schema
rpc ListTable(ListTableRequest) returns (Tables) {}
// Decribe table metadata(column name and column type)
rpc DescribeTable(DescribeTableRequest) returns (Columns) {}
// Not supported
rpc Read(ReadRequest) returns (ReadResponse) {}
}
高速導入API使用樣例
下文提供了使用Java和Go語言,利用高速數據導入API讀寫云原生數據倉庫 AnalyticDB PostgreSQL 版數據的示例。
Java
package client;
import api.*;
import com.google.longrunning.GetOperationRequest;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import com.google.protobuf.TimestampProto;
import io.grpc.*;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.concurrent.TimeUnit;
public class GrpcClient {
/** using for adbpgss environment start */
final static String lissHost = "<LISS_HOST>"; //Server端的連接地址。
final static int lissPort = "<LISS_PORT>"; //Server端口號。
final static String schemaName = "<TARGET_SCHEMA_NAME>"; //需要寫入數據表的Schema。
final static String tableName = "<TARGET_TABLE_NAME>"; //需要寫入數據的表名。
final static String adbpgMasterHost = "<ADBPG_HOSTNAME>";
final static int adbpgMasterPort = "<ADBPG_PORT>";
final static String adbpgUserName = "<ADBPG_USERNAME>";
final static String adbpgPasswd = "<ADBPG_PASSWORD>";
final static String dbname = "<TARGET_DB>";
/** using for adbpgss environment end */
public static void buildRowDataAllTypes(ArrayList<RowData> rows, final int ROWNUM, int initcount, String col7) {
for (int row = 0; row < ROWNUM; row++) {
// create a row builder
api.Row.Builder builder = api.Row.newBuilder();
// create builders for each column, in order, and set values - text, int, text
api.DBValue.Builder colbuilder1 = api.DBValue.newBuilder(); // a integer
colbuilder1.setInt32Value(row);
builder.addColumns(colbuilder1.build());
api.DBValue.Builder colbuilder2 = api.DBValue.newBuilder(); // b serial
colbuilder2.setInt32Value(row);
builder.addColumns(colbuilder2.build());
api.DBValue.Builder colbuilder3 = api.DBValue.newBuilder(); // c bigint
colbuilder3.setInt64Value(1000);
builder.addColumns(colbuilder3.build());
api.DBValue.Builder colbuilder4 = api.DBValue.newBuilder(); // d bigserial
colbuilder4.setInt64Value(row);
builder.addColumns(colbuilder4.build());
api.DBValue.Builder colbuilder5 = api.DBValue.newBuilder(); // e real
colbuilder5.setFloat32Value(row);
builder.addColumns(colbuilder5.build());
api.DBValue.Builder colbuilder6 = api.DBValue.newBuilder(); // f double
colbuilder6.setFloat64Value(row);
builder.addColumns(colbuilder6.build());
api.DBValue.Builder colbuilder7 = api.DBValue.newBuilder(); // g text
colbuilder7.setStringValue(col7);
builder.addColumns(colbuilder7.build());
api.DBValue.Builder colbuilder8 = api.DBValue.newBuilder(); // h bytea
colbuilder8.setBytesValue(ByteString.copyFrom((col7).getBytes()));
builder.addColumns(colbuilder8.build());
api.DBValue.Builder colbuilder9 = api.DBValue.newBuilder(); // i timestamp without time zone
colbuilder9.setTimeStampValue(Timestamp.newBuilder()
.setSeconds(System.currentTimeMillis() /1000)
.setNanos((int) ((System.currentTimeMillis() %1000) *1000000))
.build());
builder.addColumns(colbuilder9.build());
api.DBValue.Builder colbuilder10 = api.DBValue.newBuilder(); // j timestamp with time zone
colbuilder10.setTimeStampValue(Timestamp.newBuilder()
.setSeconds(System.currentTimeMillis() /1000)
.setNanos((int) ((System.currentTimeMillis() %1000) *1000000))
.build());
builder.addColumns(colbuilder10.build());
// build the row
RowData.Builder rowbuilder = RowData.newBuilder().setData(builder.build().toByteString());
// add the row
rows.add(rowbuilder.build());
// columninfo -> row -> rowdata
}
}
public static void logMessage(String message) {
SimpleDateFormat sdf = new SimpleDateFormat();
sdf.applyPattern("yyyy-MM-dd HH:mm:ss a");
System.out.println(sdf.format(new Date()) + " " + message);
}
public static void listSchema(GpssGrpc.GpssBlockingStub bStub, Session mSession) {
ListSchemaRequest lReq = ListSchemaRequest.newBuilder()
.setSession(mSession)
.build();
Schemas schemas = bStub.listSchema(lReq);
for(Schema schema: schemas.getSchemasList()) {
logMessage("Got schemas:" + schema.getName());
}
}
public static Session buildSession(GpssGrpc.GpssBlockingStub bStub, String gpMasterHost, int gpMasterPort,
String gpRoleName, String gpPasswd, String dbname) {
/** create a connect request builder */
logMessage("Starting create session...");
ConnectRequest connReq = ConnectRequest.newBuilder()
.setHost(gpMasterHost)
.setPort(gpMasterPort)
.setUsername(gpRoleName)
.setPassword(gpPasswd)
.setDB(dbname)
.setUseSSL(false)
.build();
assert bStub != null;
return bStub.connect(connReq);
}
public static void listTable(GpssGrpc.GpssBlockingStub bStub, Session mSession) {
ListTableRequest ltReq = ListTableRequest.newBuilder()
.setSession(mSession)
.setSchema("public")
.build();
Tables tables = bStub.listTable(ltReq);
logMessage("Got tables, size:" + tables.getTablesList().size());
for(TableInfo table: tables.getTablesList()) {
logMessage("Got table:" + table.getName() + " type:" + table.getType());
}
}
public static void openForInsert(GpssGrpc.GpssBlockingStub bStub, Session mSession, String schemaName, String tableName) {
/** open a table for write */
Integer errLimit = 25;
Integer errPct = 25;
// create an insert option builder
InsertOption iOpt = InsertOption.newBuilder()
.setErrorLimitCount(errLimit)
.setErrorLimitPercentage(errPct)
.setTruncateTable(true)
.addInsertColumns("a")
.addInsertColumns("b")
.addInsertColumns("c")
.addInsertColumns("d")
.addInsertColumns("e")
.addInsertColumns("f")
.addInsertColumns("g")
.addInsertColumns("h")
.addInsertColumns("i")
.addInsertColumns("j")
.build();
// create an open request builder
OpenRequest oReq = OpenRequest.newBuilder()
.setSession(mSession)
.setSchemaName(schemaName)
.setTableName(tableName)
//.setPreSQL("")
//.setPostSQL("")
//.setEncoding("")
//.setTimeout(5)
//.setStagingSchema("")
.setInsertOption(iOpt)
.build();
// use the blocking stub to call the Open service; it returns nothing
bStub.open(oReq);
}
public static void openForMerge(GpssGrpc.GpssBlockingStub bStub, Session mSession, String schemaName, String tableName) {
/** open a table for write */
Integer errLimit = 25;
Integer errPct = 25;
// create an insert option builder
MergeOption mOpt = MergeOption.newBuilder()
.setErrorLimitCount(errLimit)
.setErrorLimitPercentage(errPct)
.addInsertColumns("a")
.addInsertColumns("b")
.addInsertColumns("c")
.addInsertColumns("d")
.addInsertColumns("e")
.addInsertColumns("f")
.addInsertColumns("g")
.addInsertColumns("h")
.addInsertColumns("i")
.addInsertColumns("j")
.addMatchColumns("a") // match columns
.addMatchColumns("b") // match columns
.addUpdateColumns("g") // update columns
.addUpdateColumns("h")
// .setCondition("into_table.a>5")
.build();
// create an open request builder
OpenRequest oReq = OpenRequest.newBuilder()
.setSession(mSession)
.setSchemaName(schemaName)
.setTableName(tableName)
// .setPreSQL("")
//.setPostSQL("")
//.setEncoding("")
.setTimeout(5)
//.setStagingSchema("")
.setMergeOption(mOpt)
.build();
// use the blocking stub to call the Open service; it returns nothing
bStub.open(oReq);
}
public static void openForUpdate(GpssGrpc.GpssBlockingStub bStub, Session mSession, String schemaName, String tableName) {
/** open a table for write */
Integer errLimit = 25;
Integer errPct = 25;
// create an insert option builder
UpdateOption uOpt = UpdateOption.newBuilder()
.setErrorLimitCount(errLimit)
.setErrorLimitPercentage(errPct)
.addMatchColumns("a") // match columns
.addUpdateColumns("b") // update columns
.addUpdateColumns("c")
.addUpdateColumns("g")
.setCondition("into_table.a>5")
.build();
// create an open request builder
OpenRequest oReq = OpenRequest.newBuilder()
.setSession(mSession)
.setSchemaName(schemaName)
.setTableName(tableName)
// .setPreSQL("")
//.setPostSQL("")
//.setEncoding("")
.setTimeout(5)
//.setStagingSchema("")
.setUpdateOption(uOpt)
.build();
// use the blocking stub to call the Open service; it returns nothing
bStub.open(oReq);
}
public static void describeTable(GpssGrpc.GpssBlockingStub bStub, Session mSession) {
DescribeTableRequest dReq = DescribeTableRequest.newBuilder()
.setSession(mSession)
.setSchemaName(schemaName)
.setTableName(tableName)
.build();
Columns columns = bStub.describeTable(dReq);
for(ColumnInfo columnInfo: columns.getColumnsList()) {
logMessage("Column:" + columnInfo.getName() +
" DatabaseType:" + columnInfo.getDatabaseType() +
" HasLength:" + columnInfo.getHasLength() +
" Length:" + columnInfo.getLength() +
" HasPrecisionScale:" + columnInfo.getHasPrecisionScale() +
" Precision:" + columnInfo.getPrecision() +
" Scale:" + columnInfo.getScale() +
" HasNullable:" + columnInfo.getHasNullable() +
" Nullable:" + columnInfo.getNullable()
);
}
}
public static void main(String[] args) throws InterruptedException {
ManagedChannel channel = null;
GpssGrpc.GpssBlockingStub bStub = null;
Session mSession = null;
try {
// connect to GPSS gRPC service instance; create a channel and a blocking stub
channel = ManagedChannelBuilder.forAddress(lissHost, lissPort).usePlaintext().build();
bStub = GpssGrpc.newBlockingStub(channel);
// use the blocking stub to call the Connect service
mSession = buildSession(bStub, adbpgMasterHost, adbpgMasterPort, adbpgUserName, adbpgPasswd, dbname);
logMessage("Got session id:" + mSession.getID());
/** list schema */
listSchema(bStub, mSession);
/** list tables */
listTable(bStub, mSession);
/** describe public.loaninfo */
describeTable(bStub, mSession);
/** open a table for write */
Integer errLimit = 25;
Integer errPct = 25;
// create an insert option builder、
openForInsert(bStub, mSession, schemaName, tableName);
// openForMerge(bStub, mSession, schemaName, tableName);
// openForUpdate(bStub, mSession, schemaName, tableName);
/** do the write stuff */
ArrayList<RowData> rows = new ArrayList<>();
buildRowData(rows, 20, 0, "upsert");
// create a write request builder
WriteRequest wReq = WriteRequest.newBuilder()
.setSession(mSession)
.addAllRows(rows)
.build();
// use the blocking stub to call the Write service; it returns nothing
logMessage("Starting write row data...");
bStub.write(wReq);
// Thread.sleep(60 * 60 * 1000);
/** create a close request builder */
TransferStats tStats = null;
CloseRequest cReq = CloseRequest.newBuilder()
.setSession(mSession)
//.setMaxErrorRows(15)
.setAbort(true)
.build();
/** use the blocking stub to call the Close service */
tStats = bStub.close(cReq);
/** display the result to stdout */
logMessage("CloseRequest tStats: " + tStats.toString());
} catch (Exception e) {
e.printStackTrace();
} finally {
assert bStub != null;
assert mSession != null;
/** use the blocking stub to call the Disconnect service */
bStub.disconnect(mSession);
// shutdown the channel
channel.shutdown().awaitTermination(7, TimeUnit.SECONDS);
}
}
}
Go
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
"liss_client/pb"
)
const (
lisssshost = "<liss_server_hostname>"
lissport = "<liss_server_port>"
lissTargetAddress = lisssshost + ":" + lissport
adbpghost = "<adbpg_hostname>"
adbpgport = "<adbpg_port>"
adbpgusername = "<USERNAME>"
adbpgpassword = "<PASSWORD>"
adbpgDB = "<TARGET_DATABASE>"
adbpgSchemaName = "<TARGET_SCHEMA_NAME>"
adbpgTableName = "<TARGET_TABLE_NAME>"
)
func checkErr(err error) {
if err != nil {
panic(err)
}
}
func main() {
// Prepare the new session
conn, err := grpc.Dial(lissTargetAddress, grpc.WithTransportCredentials(insecure.NewCredentials()))
checkErr(err)
defer conn.Close()
client := pb.NewGpssClient(conn)
session, err := client.Connect(context.Background(), &pb.ConnectRequest{
Host: adbpghost,
Port: adbpgport,
Username: adbpgusername,
Password: adbpgpassword,
DB: adbpgDB,
})
checkErr(err)
fmt.Println("=====================================")
// List all the schemas of the specified database
schema, err := client.ListSchema(context.Background(), &pb.ListSchemaRequest{Session: session})
for schemaSeq, schemaName := range schema.Schemas {
fmt.Println("schemaSeq: ", schemaSeq, ", schemaName: ", schemaName.Name, ", schemaOnwer: ", schemaName.Owner)
}
// List all the tables of the specified schema
fmt.Println("=====================================")
tables, err := client.ListTable(context.Background(), &pb.ListTableRequest{Session: session, Schema: adbpgSchemaName})
checkErr(err)
for _, table := range tables.Tables {
fmt.Println("TableName: ", table.Name, ", TableType: ", table.Type)
}
// describe a table
fmt.Println("=====================================")
tableName := adbpgTableName
columns, err := client.DescribeTable(context.Background(), &pb.DescribeTableRequest{
Session: session,
SchemaName: adbpgSchemaName,
TableName: tableName,
})
checkErr(err)
for _, column := range columns.Columns {
fmt.Println("ColumnName: ", column.Name,
", DatabaseType: ", column.DatabaseType,
", HasLength: ", column.HasLength,
", Length: ", column.Length,
", HasPrecisionScale: ", column.HasPrecisionScale,
", Precision: ", column.Precision,
", Scale: ", column.Scale,
", HasNullable: ", column.GetHasNullable(),
", Nullable: ", column.GetNullable())
}
// insert data
fmt.Println("=====================================")
openForInsert(client, session, adbpgSchemaName, tableName)
rowsData := buildRows(10, "Insert operation")
writeRequest := pb.WriteRequest{Session: session, Rows: rowsData}
_, err = client.Write(context.Background(), &writeRequest)
checkErr(err)
transferStats, err := client.Close(context.Background(), &pb.CloseRequest{Session: session})
checkErr(err)
fmt.Println("TransferStats: ", transferStats.String())
// merge data
fmt.Println("=====================================")
openForMerge(client, session, adbpgSchemaName, tableName)
rowsData = buildRows(20, "Merge operation")
writeRequest = pb.WriteRequest{Session: session, Rows: rowsData}
_, err = client.Write(context.Background(), &writeRequest)
checkErr(err)
transferStats, err = client.Close(context.Background(), &pb.CloseRequest{Session: session})
checkErr(err)
fmt.Println("TransferStats: ", transferStats.String())
// update data
fmt.Println("=====================================")
openForUpdate(client, session, adbpgSchemaName, tableName)
rowsData = buildRows(20, "Update operation")
writeRequest = pb.WriteRequest{Session: session, Rows: rowsData}
_, err = client.Write(context.Background(), &writeRequest)
checkErr(err)
transferStats, err = client.Close(context.Background(), &pb.CloseRequest{Session: session})
checkErr(err)
fmt.Println("TransferStats: ", transferStats.String())
// disconnect after all operations are done
fmt.Println("=====================================")
client.Disconnect(context.Background(), session)
}
func openForInsert(client pb.GpssClient, msession *pb.Session, mSchemaName string, mTableName string) {
var errLimit int64 = 25
var errPct int32 = 25
insertCols := []string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"}
iOption := pb.InsertOption{
ErrorLimitCount: errLimit,
ErrorLimitPercentage: errPct,
TruncateTable: true, // true: truncate data before insert operation
InsertColumns: insertCols,
}
oRequest := pb.OpenRequest{
Session: msession,
Option: &pb.OpenRequest_InsertOption{InsertOption: &iOption},
SchemaName: adbpgSchemaName,
TableName: adbpgTableName,
StagingSchema: adbpgSchemaName,
}
_, err := client.Open(context.Background(), &oRequest)
checkErr(err)
}
func openForMerge(client pb.GpssClient, msession *pb.Session, mSchemaName string, mTableName string) {
var errLimit int64 = 25
var errPct int32 = 25
insertCols := []string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"}
matchCols := []string{"a", "b"}
updateCols := []string{"g"}
mOption := pb.MergeOption{
ErrorLimitCount: errLimit,
ErrorLimitPercentage: errPct,
InsertColumns: insertCols,
MatchColumns: matchCols,
UpdateColumns: updateCols,
}
oRequest := pb.OpenRequest{
Session: msession,
Option: &pb.OpenRequest_MergeOption{MergeOption: &mOption},
SchemaName: adbpgSchemaName,
TableName: adbpgTableName,
StagingSchema: adbpgSchemaName,
}
_, err := client.Open(context.Background(), &oRequest)
checkErr(err)
}
func openForUpdate(client pb.GpssClient, msession *pb.Session, mSchemaName string, mTableName string) {
var errLimit int64 = 25
var errPct int32 = 25
matchCols := []string{"a", "b"}
updateCols := []string{"g"}
uOption := pb.UpdateOption{
ErrorLimitCount: errLimit,
ErrorLimitPercentage: errPct,
MatchColumns: matchCols,
UpdateColumns: updateCols,
Condition: "into_table.a>15", // into_table is the alias of mTableName
}
oRequest := pb.OpenRequest{
Session: msession,
Option: &pb.OpenRequest_UpdateOption{UpdateOption: &uOption},
SchemaName: adbpgSchemaName,
TableName: adbpgTableName,
StagingSchema: adbpgSchemaName,
}
_, err := client.Open(context.Background(), &oRequest)
checkErr(err)
}
func buildRows(rownum int, desc string) []*pb.RowData {
var rowsData []*pb.RowData
// a for loop to create 1000 rows
for i := 0; i < rownum; i++ {
var columns []*pb.DBValue
columns = append(columns,
&pb.DBValue{DBType: &pb.DBValue_Int32Value{Int32Value: int32(i)}},
&pb.DBValue{DBType: &pb.DBValue_Int32Value{Int32Value: int32(i)}},
&pb.DBValue{DBType: &pb.DBValue_Int64Value{Int64Value: int64(i)}},
&pb.DBValue{DBType: &pb.DBValue_Int64Value{Int64Value: int64(i)}},
&pb.DBValue{DBType: &pb.DBValue_Float32Value{Float32Value: float32(i)}},
&pb.DBValue{DBType: &pb.DBValue_Float64Value{Float64Value: float64(i)}},
&pb.DBValue{DBType: &pb.DBValue_StringValue{StringValue: desc}},
&pb.DBValue{DBType: &pb.DBValue_BytesValue{BytesValue: []byte(desc)}},
&pb.DBValue{DBType: &pb.DBValue_TimeStampValue{TimeStampValue: timestamppb.Now()}},
&pb.DBValue{DBType: &pb.DBValue_TimeStampValue{TimeStampValue: timestamppb.Now()}},
)
row := pb.Row{Columns: columns}
data, _ := proto.Marshal(&row)
// covert row to []bytes
rowData := pb.RowData{Data: data}
rowsData = append(rowsData, &rowData)
}
return rowsData
}
附錄:數據類型映射
gRPC數據類型 | AnalyticDB PostgreSQL數據類型 |
Int32Value | Integer或Serial |
Int64Value | Bigint或Bigserial |
Float32Value | Real |
Float64Value | Double |
StringValue | Text (any kind of data) |
BytesValue | Bytea |
TimeStampValue | Time或Timestamp (without time zone) |