場景示例
更新時(shí)間:
本文介紹運(yùn)行任務(wù)并查看結(jié)果、重試失敗文件的完整流程。
場景一: 運(yùn)行任務(wù)
以下示例用于創(chuàng)建通道、創(chuàng)建代理、創(chuàng)建數(shù)據(jù)地址、創(chuàng)建遷移任務(wù),運(yùn)行任務(wù)并查看運(yùn)行結(jié)果。
說明
在不使用代理的情況下,請忽略下面代碼中的創(chuàng)建通道和代理的部分。使用代理前需要先部署代理,詳情請參見 代理管理。
package sample;
import com.aliyun.hcs_mgw20240626.Client;
import com.aliyun.hcs_mgw20240626.models.*;
import com.google.gson.Gson;
import java.util.LinkedList;
import java.util.List;
public class Demo {
/** 強(qiáng)烈建議不要把AccessKey ID和AccessKey Secret保存到工程代碼里,否則可能導(dǎo)致AccessKey泄露,威脅您賬號下所有資源的安全。*/
static String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
static String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
/** 填寫主賬號ID。*/
static String userId = "11470***876***55";
private static final String AVAILABLE = "available";
private static final String JOBFINISH = "IMPORT_JOB_FINISHED";
public static void main(String[] args) {
// 填寫數(shù)據(jù)地址名稱。
String addressName = "exampleaddress";
try {
/** 步驟1:初始化 */
com.aliyun.teaopenapi.models.Config config = new com.aliyun.teaopenapi.models.Config();
// 這里以北京區(qū)域?yàn)槔? config.setEndpoint("mgw.cn-beijing.aliyuncs.com");
config.setRegionId(accessKeyId);
config.setAccessKeyId(accessKeyId);
config.setAccessKeySecret(accessKeySecret);
Client client = new Client(config);
//* 創(chuàng)建通道和代理,代理可以創(chuàng)建多個關(guān)聯(lián)一個通道。 */
CreateTunnelRequest createTunnelRequest = new CreateTunnelRequest();
CreateTunnelInfo createTunnelInfo = new CreateTunnelInfo();
TunnelQos qos = new TunnelQos();
// MaxBandwidth, MaxQps 默認(rèn)為0, 表示沒有限制, MaxBandWidth的單位是bit,請按照實(shí)際需求填寫。
qos.setMaxBandwidth(1073741824L);
qos.setMaxQps(1000);
createTunnelInfo.setTunnelQos(qos);
createTunnelRequest.setImportTunnel(createTunnelInfo);
CreateTunnelResponse createTunnelResponse = client.createTunnel(userId, createTunnelRequest);
String tunnelId = createTunnelResponse.headers.get("x-oss-import-tunnel-id");
// 填寫代理名稱。
String agentName = "exampleagent";
// 使用網(wǎng)絡(luò),公網(wǎng)請?zhí)顚憄ublic, 專線或者VPN請?zhí)顚憊pc。
String agentEndpoint = "public";
// 部署方式,目前僅支持填寫default。
String deployMethod = "default";
CreateAgentRequest createAgentRequest = new CreateAgentRequest();
CreateAgentInfo createAgentInfo = new CreateAgentInfo();
createAgentInfo.setName(agentName);
createAgentInfo.setTunnelId(tunnelId);
createAgentInfo.setAgentEndpoint(agentEndpoint);
createAgentInfo.setDeployMethod(deployMethod);
createAgentRequest.setImportAgent(createAgentInfo);
client.createAgent(userId, createAgentRequest);
/** 步驟2:創(chuàng)建源地址和目的地址并驗(yàn)證可用性,這里以創(chuàng)建源為oss,目的為oss的地址為例,其它源示例請參見數(shù)據(jù)地址篇創(chuàng)建數(shù)據(jù)地址小節(jié)。 */
String srcAddress = "examplesrcaddress";
String destAddress = "exampledestaddress";
AddressDetail addrssdetail = new AddressDetail();
addrssdetail.addressType = "oss";
// 以下參數(shù)請根據(jù)實(shí)際值填寫。
addrssdetail.bucket = "examplebucket";
addrssdetail.prefix = "***/";
addrssdetail.regionId = "oss-cn-beijing";
addrssdetail.role = "rolename_xxxxx";
CreateAddressRequest createAddressRequest = new CreateAddressRequest();
CreateAddressInfo createAddressInfo = new CreateAddressInfo();
createAddressInfo.name = addressName;
createAddressInfo.setAddressDetail(addrssdetail);
createAddressRequest.setImportAddress(createAddressInfo);
client.createAddress(userId, createAddressRequest);
// 驗(yàn)證源端數(shù)據(jù)地址可用性。
VerifyAddressResponse verifyAddressResponse = client.verifyAddress(userId, srcAddress);
if (!AVAILABLE.equals(verifyAddressResponse.body.verifyAddressResponse.status)) {
System.out.printf("job status is %s", verifyAddressResponse.body.verifyAddressResponse.status);
return;
}
addrssdetail = new AddressDetail();
addrssdetail.addressType = "oss";
// 以下參數(shù)請根據(jù)實(shí)際值填寫。
addrssdetail.bucket = "examplebucket";
addrssdetail.prefix = "***/";
addrssdetail.regionId = "oss-cn-beijing";
addrssdetail.role = "rolename_xxxxx";
createAddressRequest = new CreateAddressRequest();
createAddressInfo = new CreateAddressInfo();
createAddressInfo.name = addressName;
createAddressInfo.setAddressDetail(addrssdetail);
createAddressRequest.setImportAddress(createAddressInfo);
client.createAddress(userId, createAddressRequest);
verifyAddressResponse = client.verifyAddress(userId, destAddress);
if (!AVAILABLE.equals(verifyAddressResponse.body.verifyAddressResponse.status)) {
System.out.printf("job status is %s", verifyAddressResponse.body.verifyAddressResponse.status);
return;
}
/** 步驟3:創(chuàng)建并且啟動任務(wù) */
// 填寫任務(wù)名稱。
String jobName = "examplejob";
CreateJobInfo createJobInfo = new CreateJobInfo();
createJobInfo.name = jobName;
createJobInfo.srcAddress = srcAddress;
createJobInfo.destAddress = destAddress;
/*
overwriteMode和transferMode需要組合使用,具體組合含義如下
always,all 全覆蓋
always,lastmodified 根據(jù)文件最后修改時(shí)間覆蓋
never,all 不覆蓋
*/
/* 文件覆蓋方式, 可能得值為 1.never 2.always */
createJobInfo.overwriteMode = "always";
/* 文件傳輸?shù)姆绞? 可能得值為 1.changed 2.all 3.lastmodified */
createJobInfo.transferMode = "lastmodified";
// 如果maxImportTaskQps值為0或者不設(shè)置,會被設(shè)置為默認(rèn)值;MaxBandWidth值為0或者不設(shè)置,會被設(shè)置為默認(rèn)值,maxBandWidth值單位為bits。
ImportQos importQos = new ImportQos();
// maxBandWidth,maxImportTaskQps 根據(jù)實(shí)際需求值填寫。
importQos.maxBandWidth = 1073741824L;
importQos.maxImportTaskQps = 1000L;
createJobInfo.setImportQos(importQos);
// 配置調(diào)度規(guī)則,具體參數(shù)含義請參看API文檔。
ScheduleRule scheduleRule = new ScheduleRule();
// maxScheduleCount,startCronExpression,suspendCronExpression 根據(jù)實(shí)際需求值填寫。
scheduleRule.maxScheduleCount = 2L;
scheduleRule.startCronExpression = "0 0 10 * * ?";
scheduleRule.suspendCronExpression = "0 0 14 * * ?";
createJobInfo.setScheduleRule(scheduleRule);
// 配置過濾規(guī)則,具體參數(shù)含義請參看API文檔。
FilterRule filterRule = new FilterRule();
// 文件過濾器,根據(jù)實(shí)際需求值填寫。
KeyFilters keyFilters = new KeyFilters();
KeyFilterItem includekeyFilterItem = new KeyFilterItem();
List<String> includeRegex = new LinkedList<>();
includeRegex.add(".*.jpg");
includeRegex.add(".*.jpeg");
includekeyFilterItem.setRegex(includeRegex);
KeyFilterItem excludekeyFilterItem = new KeyFilterItem();
List<String> excludeRegex = new LinkedList<>();
excludeRegex.add(".*.txt");
excludeRegex.add(".*.js");
excludekeyFilterItem.setRegex(excludeRegex);
keyFilters.setIncludes(includekeyFilterItem);
keyFilters.setExcludes(excludekeyFilterItem);
filterRule.setKeyFilters(keyFilters);
// 時(shí)間過濾器, 時(shí)間格式遵循UTC時(shí)間格式,根據(jù)實(shí)際需求值填寫。
LastModifiedFilters lastModifiedFilters = new LastModifiedFilters();
LastModifyFilterItem includeLastModifyFilterItem = new LastModifyFilterItem();
List<TimeFilter> includeRegexs = new LinkedList<>();
TimeFilter includeTimeFilter = new TimeFilter();
includeTimeFilter.startTime = "2006-01-01T00:00:00Z";
includeTimeFilter.endTime = "2006-12-31T23:59:59Z";
includeRegexs.add(includeTimeFilter);
includeLastModifyFilterItem.setTimeFilter(includeRegexs);
LastModifyFilterItem excludeLastModifyFilterItem = new LastModifyFilterItem();
List<TimeFilter> excludeRegexs = new LinkedList<>();
TimeFilter excludeTimeFilter = new TimeFilter();
excludeTimeFilter.startTime = "2008-01-01T00:00:00Z";
excludeTimeFilter.endTime = "2008-12-31T23:59:59Z";
excludeRegexs.add(excludeTimeFilter);
excludeLastModifyFilterItem.setTimeFilter(excludeRegexs);
lastModifiedFilters.setIncludes(includeLastModifyFilterItem);
lastModifiedFilters.setExcludes(excludeLastModifyFilterItem);
filterRule.setLastModifiedFilters(lastModifiedFilters);
createJobInfo.setFilterRule(filterRule);
CreateJobRequest createJobRequest = new CreateJobRequest();
createJobRequest.setImportJob(createJobInfo);
client.createJob(userId, createJobRequest);
UpdateJobRequest updateJobRequest = new UpdateJobRequest();
UpdateJobInfo updateJobInfo = new UpdateJobInfo();
updateJobInfo.setStatus("IMPORT_JOB_LAUNCHING");
updateJobRequest.setImportJob(updateJobInfo);
client.updateJob(userId, jobName, updateJobRequest);
/** 步驟4:循環(huán)查看當(dāng)前任務(wù)狀態(tài) */
for(;;) {
GetJobResponse response = client.getJob(userId, jobName, new GetJobRequest());
if ("IMPORT_JOB_INTERRUPTED".equals(response.getBody().importJob.status)) {
System.out.println("job is interrupted");
return;
}
if ("IMPORT_JOB_FINISHED".equals(response.getBody().importJob.status)) {
System.out.println("job is finished");
break;
}
Thread.sleep(6000);
}
/** 步驟5:任務(wù)結(jié)束后查看結(jié)果。 */
ListJobHistoryRequest listJobHistoryRequest = new ListJobHistoryRequest();
ListJobHistoryResponse listJobHistoryResp = client.listJobHistory(userId, jobName, listJobHistoryRequest);
System.out.println(new Gson().toJson(listJobHistoryResp.body.jobHistoryList.jobHistory))
} catch (Exception e) {
e.printStackTrace();
}
}
}
場景二:重試失敗文件
遷移任務(wù)運(yùn)行完成后,可能有失敗文件,遷移服務(wù)會為這些失敗文件構(gòu)造一份失敗文件列表,以下示例先獲取失敗文件列表的詳情,然后據(jù)此創(chuàng)建一個新的數(shù)據(jù)地址,再創(chuàng)建一個重試子任務(wù)。通過這種方式,您能夠重新遷移這些失敗的文件。
package sample;
import com.aliyun.hcs_mgw20240626.Client;
import com.aliyun.hcs_mgw20240626.models.*;
public class Demo {
/** 強(qiáng)烈建議不要把AccessKey ID和AccessKey Secret保存到工程代碼里,否則可能導(dǎo)致AccessKey泄露,威脅您賬號下所有資源的安全。*/
static String accessKeyId = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
static String accessKeySecret = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
/** 填寫主賬號ID。*/
static String userId = "11470***876***55";
private static final String AVAILABLE = "available";
private static final String JOBFINISH = "IMPORT_JOB_FINISHED";
private static final String INTERRUPT = "Interrupt";
private static final String NONEED = "NoNeed";
private static final String READY = "Ready";
private static final String SYSTEM = "SYSTEM";
private static final String LAUNCHING = "IMPORT_JOB_LAUNCHING";
public static void main(String[] args) {
try {
/** 步驟1:初始化 */
com.aliyun.teaopenapi.models.Config config = new com.aliyun.teaopenapi.models.Config();
// 這里以北京區(qū)域?yàn)槔? config.setEndpoint("mgw.cn-beijing.aliyuncs.com");
config.setRegionId(accessKeyId);
config.setAccessKeyId(accessKeyId);
config.setAccessKeySecret(accessKeySecret);
Client client = new Client(config);
// 下列參數(shù)值請根據(jù)實(shí)際需求填寫。
int runtimeId = 1;
String jobName = "examplejob";
String srcAddress = "examplesrcaddress";
String destAddress = "exampledestaddress";
/** 步驟2:循環(huán)查看任務(wù)狀態(tài),查看任務(wù)是否結(jié)束或者中斷。 */
for(;;) {
GetJobResponse response = client.getJob(userId, jobName, new GetJobRequest());
if ("IMPORT_JOB_INTERRUPTED".equals(response.getBody().importJob.status)) {
System.out.println("job is interrupted");
return;
}
if ("IMPORT_JOB_FINISHED".equals(response.getBody().importJob.status)) {
System.out.println("job is finished");
break;
}
Thread.sleep(6000);
}
/** 步驟3:ListJobHistory,查看是否有失敗文件。 */
ListJobHistoryRequest listJobHistoryRequest = new ListJobHistoryRequest();
listJobHistoryRequest.runtimeId = runtimeId;
ListJobHistoryResponse listJobHistoryResponse = client.listJobHistory(userId, jobName, listJobHistoryRequest);
if (listJobHistoryResponse.body.jobHistoryList.jobHistory.size() < 1) {
System.out.println("list job history length less than 1");
return;
}
if (listJobHistoryResponse.body.jobHistoryList.jobHistory.get(0).failedCount <= 0) {
System.out.println("job no need retry");
return;
}
/** 步驟4:循環(huán)GetJobResult,等待Ready。 */
for (;;) {
GetJobResultRequest getJobResultRequest = new GetJobResultRequest();
getJobResultRequest.setRuntimeId(runtimeId);
GetJobResultResponse getJobResultResponse = client.getJobResult(userId, jobName, getJobResultRequest);
if ("NoNeed".equals(getJobResultResponse.body.importJobResult.readyRetry)) {
System.out.println("job no need retry");
return;
}
if ("Ready".equals(getJobResultResponse.body.importJobResult.readyRetry)) {
break;
}
Thread.sleep(6000);
}
retry(client, jobName, srcAddress, destAddress, runtimeId);
} catch (Exception e) {
e.printStackTrace();
}
}
static void retry(Client client, String jobName, String srcAddress, String destAddress, int runtimeId) throws Exception {
/** 步驟5:根據(jù)失敗文件清單信息創(chuàng)建新的數(shù)據(jù)地址并且驗(yàn)證是否可用。 */
GetJobResultRequest getJobResultRequest = new GetJobResultRequest();
getJobResultRequest.runtimeId = runtimeId;
GetJobResultResponse getJobResultResponse = client.getJobResult(userId, jobName, getJobResultRequest);
GetJobResponse getJobResponse = client.getJob(userId, jobName, new GetJobRequest());
GetAddressResponse getAddressResponse = client.getAddress(userId, srcAddress);
CreateAddressRequest request = new CreateAddressRequest();
CreateAddressInfo info = new CreateAddressInfo();
long time = System.currentTimeMillis();
info.name = srcAddress + "_" + time + "_retry";
AddressDetail detail = new AddressDetail();
detail.addressType = getJobResultResponse.body.importJobResult.addressType;
detail.invLocation = getJobResultResponse.body.importJobResult.invLocation;
detail.invBucket = getJobResultResponse.body.importJobResult.invBucket;
detail.invAccessId = SYSTEM;
detail.invAccessSecret = SYSTEM;
detail.invPath = getJobResultResponse.body.importJobResult.invPath;
detail.invRegionId = getJobResultResponse.body.importJobResult.invRegionId;
detail.domain = getAddressResponse.body.importAddress.addressDetail.domain;
detail.regionId = getAddressResponse.body.importAddress.addressDetail.regionId;
detail.accessId = getAddressResponse.body.importAddress.addressDetail.accessId;
detail.accessSecret = getAddressResponse.body.importAddress.addressDetail.accessSecret;
detail.agentList = getAddressResponse.body.importAddress.addressDetail.agentList;
detail.role = getAddressResponse.body.importAddress.addressDetail.role;
detail.prefix = getAddressResponse.body.importAddress.getAddressDetail().prefix;
detail.invRole = getAddressResponse.body.importAddress.addressDetail.invRole;
detail.bucket = getAddressResponse.body.importAddress.addressDetail.bucket;
info.setAddressDetail(detail);
request.setImportAddress(info);
client.createAddress(userId, request);
VerifyAddressResponse verifyAddressResponse = client.verifyAddress(userId, info.name);
if (!verifyAddressResponse.body.verifyAddressResponse.status.equals(AVAILABLE)) {
throw new Exception("retry srcaddress unavailable");
}
/** 步驟6:創(chuàng)建重試子任務(wù)并啟動。 */
CreateJobRequest createJobRequest = new CreateJobRequest();
CreateJobInfo createJobInfo = new CreateJobInfo();
createJobInfo.name = jobName + "_" + time + "_retry";
createJobInfo.transferMode = getJobResponse.body.importJob.transferMode;
createJobInfo.overwriteMode = getJobResponse.body.importJob.overwriteMode;
createJobInfo.srcAddress = info.name;
createJobInfo.destAddress = destAddress;
createJobInfo.parentVersion = getJobResultResponse.body.importJobResult.version;
createJobInfo.filterRule = getJobResponse.body.importJob.filterRule;
createJobInfo.audit = getJobResponse.body.importJob.audit;
createJobInfo.createReport = getJobResponse.body.importJob.createReport;
createJobRequest.setImportJob(createJobInfo);
client.createJob(userId, createJobRequest);
UpdateJobRequest updateJobRequest = new UpdateJobRequest();
UpdateJobInfo updateJobInfo = new UpdateJobInfo();
updateJobInfo.status = LAUNCHING;
updateJobRequest.setImportJob(updateJobInfo);
client.updateJob(userId, createJobInfo.name, updateJobRequest);
}
}
文檔內(nèi)容是否對您有幫助?