本文中含有需要您注意的重要提示信息,忽略該信息可能對您的業務造成影響,請務必仔細閱讀。
使用SDK進行數據投遞前,您需要了解使用數據湖投遞功能的注意事項、接口等信息。創建投遞任務后,表格存儲數據表中的數據會自動投遞到OSS Bucket中存儲。
注意事項
目前支持使用數據湖投遞功能的地域有華東1(杭州)、華東2(上海)、華北2(北京)和華北3(張家口)。
數據湖投遞不支持同步刪除操作,表格存儲中的刪除操作在數據投遞時會被忽略,已投遞到OSS中的數據不會被刪除。
新建數據投遞任務時存在最多1分鐘的初始化時間。
數據同步存在延遲,寫入速率穩定時,延遲在3分鐘內。數據同步的P99延遲在10分鐘內。
說明P99延遲表示過去10秒內最慢的1%的請求的平均延遲。
前提條件
在對象存儲服務側已完成如下操作:
已開通OSS服務且在表格存儲實例所在地域創建Bucket,詳情請參見開通OSS服務。
說明數據湖投遞支持投遞到和表格存儲相同地域的任意OSS Bucket中。如需投遞到其他數倉存儲(例如MaxCompute),請提交工單申請。
在表格存儲服務側已完成如下操作:
在訪問控制RAM服務側完成如下操作:
已創建RAM用戶并為RAM用戶授予管理表格存儲權限(AliyunOTSFullAccess)。具體操作,請參見創建RAM用戶和為RAM用戶授權。
警告阿里云賬號AccessKey泄露會威脅您所有資源的安全。建議您使用RAM用戶AccessKey進行操作,可以有效降低AccessKey泄露的風險。
已為RAM用戶創建AccessKey。具體操作,請參見創建AccessKey。
已配置訪問憑證。具體操作,請參見配置訪問憑證。
接口
接口 | 說明 |
CreateDeliveryTask | 創建一個投遞任務。 |
ListDeliveryTask | 列出一個數據表所有的投遞任務信息。 |
DescribeDeliveryTask | 查詢投遞任務描述信息。 |
DeleteDeliveryTask | 刪除一個投遞任務。 |
參數
參數 | 說明 |
tableName | 數據表名稱。 |
taskName | 投遞任務名稱。 名稱只能包含英文小寫字母(a~z)、數字和短橫線(-),開頭和結尾必須為英文小寫字母或數字,且長度為3~16字符。 |
taskConfig | 投遞任務配置,包括如下選項:
|
taskType | 投遞任務的類型,包括如下選項:
|
使用
您可以通過Java SDK、Go SDK實現數據湖投遞功能。此處以Java SDK為例介紹數據投遞湖的操作。
以下示例用于為數據表創建投遞任務。
import com.alicloud.openservices.tablestore.ClientException;
import com.alicloud.openservices.tablestore.SyncClient;
import com.alicloud.openservices.tablestore.TableStoreException;
import com.alicloud.openservices.tablestore.model.delivery.*;
public class DeliveryTask {
public static void main(String[] args) {
final String endPoint = "https://yourinstancename.cn-hangzhou.ots.aliyuncs.com";
final String accessKeyId = System.getenv("OTS_AK_ENV");
final String accessKeySecret = System.getenv("OTS_SK_ENV");
final String instanceName = "yourinstancename";
SyncClient client = new SyncClient(endPoint, accessKeyId, accessKeySecret, instanceName);
try {
createDeliveryTask(client);
System.out.println("end");
} catch (TableStoreException e) {
System.err.println("操作失敗,詳情:" + e.getMessage() + e.getErrorCode() + e.toString());
System.err.println("Request ID:" + e.getRequestId());
} catch (ClientException e) {
System.err.println("請求失敗,詳情:" + e.getMessage());
} finally {
client.shutdown();
}
}
private static void createDeliveryTask(SyncClient client){
String tableName = "sampleTable";
String taskName = "sampledeliverytask";
OSSTaskConfig taskConfig = new OSSTaskConfig();
taskConfig.setOssPrefix("sampledeliverytask/year=$yyyy/month=$MM");
taskConfig.setOssBucket("datadeliverytest");
taskConfig.setOssEndpoint("oss-cn-hangzhou.aliyuncs.com");
taskConfig.setOssStsRole("acs:ram::17************45:role/aliyunserviceroleforotsdatadelivery");
//eventColumn為可選配置,指定按某一列數據的時間進行分區。如果不設置此參數,則按數據寫入表格存儲的時間進行分區。
EventColumn eventColumn = new EventColumn("Col1", EventTimeFormat.RFC1123);
taskConfig.setEventTimeColumn(eventColumn);
taskConfig.addParquetSchema(new ParquetSchema("PK1", "PK1", DataType.UTF8));
taskConfig.addParquetSchema(new ParquetSchema("PK2", "PK2", DataType.BOOL));
taskConfig.addParquetSchema(new ParquetSchema("Col1", "Col1", DataType.UTF8));
CreateDeliveryTaskRequest request = new CreateDeliveryTaskRequest();
request.setTableName(tableName);
request.setTaskName(taskName);
request.setTaskConfig(taskConfig);
request.setTaskType(DeliveryTaskType.BASE_INC);
CreateDeliveryTaskResponse response = client.createDeliveryTask(request);
System.out.println("resquestID: "+ response.getRequestId());
System.out.println("traceID: " + response.getTraceId());
System.out.println("create delivery task success");
}
}