本文通過案例為您介紹如何通過DataWorks OpenAPI完成數據開發。
背景信息
假設一個簡單的場景,開發人員想把RDS庫里面的數據同步到一張MaxCompute分區表中,然后在自建系統的頁面上展示經過數據分析后的報表數據,我們可以通過DataWorks OpenAPI去完成整個鏈路的實現。具體步驟如下所示。
前提條件
已安裝DataWorks OpenAPI SDK,詳情請參考安裝Java SDK。
除了Java語言,我們還支持Python,PHP,C#,Go等語言支持,請您根據實際開發環境進行安裝。
注意事項
默認情況下我們不需要顯式去指定DataWorks OpenAPI的EndPoint,但是如果aliyun-java-sdk-core版本偏低的情況下可能會找不到DataWorks OpenAPI的Endpoint,這時候可在不升級版本的情況下通過使用如下代碼進行請求。
// Please ensure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are set.http://bestwisewords.com/document_detail/378657.html
IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
DefaultProfile.addEndpoint("cn-shanghai","dataworks-public", "dataworks.cn-shanghai.aliyuncs.com");
IAcsClient client = new DefaultAcsClient(profile);
如上代碼是顯式地指定了DataWorks OpenAPI的EndPoint,dataworks.${regionId}.aliyuncs.com
這樣的域名格式在公網環境下可訪問,但是有些用戶需要在VPC環境下調用OpenAPI,那么則需要把域名dataworks.${regionId}.aliyuncs.com
變更成dataworks-vpc.${regionId}.aliyuncs.com
,這樣在VPC網絡環境下即使不能訪問公網也能請求到DataWorks OpenAPI。RegionId(地域ID),詳情請參見地域和可用區。
步驟一:創建RDS數據源
集成租戶API可創建引擎、創建數據源、查看項目空間等信息。在這個業務場景中,MaxCompute分區表存在于MaxCompute引擎中,在DataWorks控制臺創建MaxCompute工作空間后會自動創建好MaxCompute引擎的數據源,所以只需要使用CreateConnection創建好RDS數據源即可。
CreateConnectionRequest createRequest = new CreateConnectionRequest();
createRequest.setProjectId(-1L);
createRequest.setName("TEST_CONNECTION");
createRequest.setConnectionType("MYSQL");
createRequest.setEnvType(1);
createRequest.setContent("{\"password\":\"12345\"}");
Long connectionId;
try {
CreateConnectionResponse createResponse = client.getAcsResponse(createRequest);
Assert.assertNotNull(createResponse.getData());
connectionId = createResponse.getData();
UpdateConnectionRequest updateRequest = new UpdateConnectionRequest();
updateRequest.setConnectionId(connectionId);
updateRequest.setDescription("1");
UpdateConnectionResponse acsResponse = client.getAcsResponse(updateRequest);
Assert.assertTrue(acsResponse.getData());
DeleteConnectionRequest deleteRequest = new DeleteConnectionRequest();
deleteRequest.setConnectionId(connectionId);
DeleteConnectionResponse deleteResponse = client.getAcsResponse(deleteRequest);
Assert.assertTrue(deleteResponse.getData());
} catch (ClientException e) {
e.printStackTrace();
Assert.fail();
}
UpdateConnection和DeleteConnection可分別修改和刪除數據源信息。
對項目空間的成員進行管理的API集是CreateProjectMember、DeleteProjectMember、RemoveProjectMemberFromRole、ListProjectMembers。
步驟二:任務開發和發布調度
集成數據開發API可管理文件,并對文件進行提交和發布后生成周期任務,周期任務會定時調度運行,創建不同類型的文件是根據FileType這個字段決定的,目前已支持非常多的FileType,通過運維中心的API ListProgramTypeCount可獲取所有已支持的節點。
// Please ensure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are set.http://bestwisewords.com/document_detail/378657.html
IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
IAcsClient client = new DefaultAcsClient(profile);
CreateFileRequest createFileRequest = new CreateFileRequest();
createFileRequest.setFileType(DefaultNodeType.ODPS_SQL.getCode());
createFileRequest.setInputList(projectIdentifier+"_root");
createFileRequest.setContent(content);
createFileRequest.setFileName("create_file_" + caseId);
createFileRequest.setFileFolderPath("業務流程/POP接口測試/MaxCompute/test_folder_3");
createFileRequest.setFileDescription("create file " + caseId);
createFileRequest.setRerunMode("ALL_ALLOWED");
CreateFileResponse createFileResponse = getAcsResponse(createFileRequest);
content字段存儲SQL腳本、Shell腳本、數據集成的腳本代碼,數據集成的腳本格式,請參見通過腳本模式配置離線同步任務。使用CreateFile創建完腳本后,如需修改可使用UpdateFile、DeleteFile進行管理。與頁面上操作流程一致的是完成文件開發后需要提交和發布文件才會生成周期實例,這里要注意的是需要輪詢SubmitFile返回的DeploymentId,只有當GetDeployment返回的狀態是完成時(status.finished()
)才表示部署成功。
// Please ensure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are set.http://bestwisewords.com/document_detail/378657.html
IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
IAcsClient client = new DefaultAcsClient(profile);
SubmitFileRequest request = new SubmitFileRequest();
request.setFileId(fileId);
request.setComment("submit file");
SubmitFileResponse submitFileResponse = getAcsResponse(submitFileRequest);
//檢查提交結果
DeploymentStatus status = null;
GetDeploymentResponse.Data.Deployment deployment = null;
int retryTimes = 0;
while (retryTimes < 6) {
GetDeploymentRequest getDeploymentRequest = getDeploymentRequest(submitFileResponse.getData());
GetDeploymentResponse getDeploymentResponse = getAcsResponse(getDeploymentRequest);
LOGGER.info("Deployment status got - RequestId[{}]", getDeploymentResponse.getRequestId());
Assert.assertNotNull(getDeploymentResponse.getData());
deployment = getDeploymentResponse.getData().getDeployment();
Assert.assertNotNull(deployment);
Assert.assertTrue(deployment.getName().equalsIgnoreCase(baseId));
Assert.assertTrue(deployment.getCreatorId().equalsIgnoreCase(baseId));
Assert.assertTrue(deployment.getHandlerId().equalsIgnoreCase(baseId));
Assert.assertEquals((int) deployment.getFromEnvironment(), DatastudioEnvironment.LOCAL.value());
Assert.assertEquals((int) deployment.getToEnvironment(), DatastudioEnvironment.DEV.value());
Assert.assertTrue(StringUtils.isBlank(deployment.getErrorMessage()));
status = Enums.find(DeploymentStatus.class, deployment.getStatus());
Assert.assertNotNull(status);
if (status.finished()) {
LOGGER.info("Deployment finished - FinalStatus[{}]", status);
break;
}
LOGGER.info("Deployment not finished. Sleep for a while. - CurrentStatus[{}]", status);
retryTimes++;
SleepUtils.seconds(10L);
}
// Please ensure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are set.http://bestwisewords.com/document_detail/378657.html
IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
IAcsClient client = new DefaultAcsClient(profile);
SubmitFileRequest request = new SubmitFileRequest();
request.setFileId(fileId);
request.setComment("submit file");
SubmitFileResponse submitFileResponse = getAcsResponse(submitFileRequest);
//檢查提交結果
DeploymentStatus status = null;
GetDeploymentResponse.Data.Deployment deployment = null;
int retryTimes = 0;
while (retryTimes < 6) {
GetDeploymentRequest getDeploymentRequest = getDeploymentRequest(submitFileResponse.getData());
GetDeploymentResponse getDeploymentResponse = getAcsResponse(getDeploymentRequest);
LOGGER.info("Deployment status got - RequestId[{}]", getDeploymentResponse.getRequestId());
Assert.assertNotNull(getDeploymentResponse.getData());
deployment = getDeploymentResponse.getData().getDeployment();
Assert.assertNotNull(deployment);
Assert.assertTrue(deployment.getName().equalsIgnoreCase(baseId));
Assert.assertTrue(deployment.getCreatorId().equalsIgnoreCase(baseId));
Assert.assertTrue(deployment.getHandlerId().equalsIgnoreCase(baseId));
Assert.assertEquals((int) deployment.getFromEnvironment(), DatastudioEnvironment.LOCAL.value());
Assert.assertEquals((int) deployment.getToEnvironment(), DatastudioEnvironment.DEV.value());
Assert.assertTrue(StringUtils.isBlank(deployment.getErrorMessage()));
status = Enums.find(DeploymentStatus.class, deployment.getStatus());
Assert.assertNotNull(status);
if (status.finished()) {
LOGGER.info("Deployment finished - FinalStatus[{}]", status);
break;
}
LOGGER.info("Deployment not finished. Sleep for a while. - CurrentStatus[{}]", status);
retryTimes++;
SleepUtils.seconds(10L);
}
如果是在標準模式的項目下開發,提交完成后,還需要發布文件才能最終提交到調度成為周期任務。發布文件使用DeployFile,和提交文件一樣,也需要使用GetDeployment輪詢部署狀態。
DeployFileRequest request = new DeployFileRequest();
request.setFileId(fileId);
request.setComment("deploy file");
DeployFileResponse deployFileResponse = getAcsResponse(deployFileRequest);
//檢查發布部署結果
DeploymentStatus status = null;
GetDeploymentResponse.Data.Deployment deployment = null;
int retryTimes = 0;
while (retryTimes < 6) {
GetDeploymentRequest getDeploymentRequest = getDeploymentRequest(deploymentId);
GetDeploymentResponse getDeploymentResponse = getAcsResponse(getDeploymentRequest);
LOGGER.info("Deployment status got - RequestId[{}]", getDeploymentResponse.getRequestId());
Assert.assertNotNull(getDeploymentResponse.getData());
deployment = getDeploymentResponse.getData().getDeployment();
Assert.assertNotNull(deployment);
LOGGER.info("Deployment information got - DeploymentId[{}] - DeploymentDetail[{}]",
deploymentId, new Gson().toJson(deployment));
Assert.assertTrue(deployment.getCreatorId().equalsIgnoreCase(baseId));
Assert.assertTrue(StringUtils.isBlank(deployment.getErrorMessage()));
status = Enums.find(DeploymentStatus.class, deployment.getStatus());
Assert.assertNotNull(status);
if (status.finished()) {
LOGGER.info("Deployment finished - FinalStatus[{}]", status);
break;
}
LOGGER.info("Deployment not finished. Sleep for a while. - CurrentStatus[{}]", status);
retryTimes++;
SleepUtils.seconds(10L);
}
數據開發API除了可對文件管理外,還能管理文件夾、資源、函數。
步驟三:配置運維監控
通過API完成周期任務的生產之后,會在DataWorks平臺每天生成調度實例并被定時調度運行,使用運維中心API可對周期任務和周期實例進行運維操作,可通過GetNode、GetInstance、ListInstances等API查看周期任務和周期實例,監控實例運行情況。示例如下。
GetInstanceRequest request = new GetInstanceRequest();
request.setInstanceId(INSTANCE_ID);
request.setProjectEnv(PROJECT_ENV);
try {
GetInstanceResponse response = client.getAcsResponse(request);
Object data = ReturnModelParser.parse("getInstanceSuccess", gson.toJson(response));
BizInstanceDto bizInstanceDto = GsonUtils.jsonToBean(data.toString(), BizInstanceDto.class);
Assert.assertEquals("NOT_RUN", bizInstanceDto.getStatus().toString());
Assert.assertEquals(1590416703313L, bizInstanceDto.getModifyTime().getTime());
Assert.assertEquals(INSTANCE_ID, bizInstanceDto.getInstanceId());
Assert.assertEquals("DAILY", bizInstanceDto.getDagType().toString());
Assert.assertEquals("kzh", bizInstanceDto.getNodeName());
Assert.assertEquals("", bizInstanceDto.getParamValues());
Assert.assertEquals(1590416703313L, bizInstanceDto.getCreateTime().getTime());
Assert.assertEquals(1590422400000L, bizInstanceDto.getCycTime().getTime());
Assert.assertEquals(338450167L, bizInstanceDto.getDagId().longValue());
Assert.assertEquals(1590336000000L, bizInstanceDto.getBizdate().getTime());
Assert.assertEquals(33115L, bizInstanceDto.getNodeId().longValue());
} catch (Exception e) {
e.printStackTrace();
Assert.fail();
}
如果實例運行異常可通過RestartInstance、SetSuccessInstance、SuspendInstance、ResumeInstance處理。使用CreateRemind、UpdateRemind等API可創建自定義報警規則,確保每天基線順利產出,一旦異常可告警通知到人工介入。
CreateRemindRequest createRemindRequest = new CreateRemindRequest();
createRemindRequest.setRemindName("REMIND_CREATE_TEST");
createRemindRequest.setRemindUnit(PopRemindUnit.NODE.name());
createRemindRequest.setRemindType(RemindType.ERROR.name());
createRemindRequest.setAlertUnit(PopAlertUnit.OTHER.name());
createRemindRequest.setDndEnd("08:00");
createRemindRequest.setNodeIds("-1");
createRemindRequest.setMaxAlertTimes(1);
createRemindRequest.setAlertInterval(1800);
createRemindRequest.setAlertMethods(PopAlertMethod.MAIL.name());
createRemindRequest.setAlertTargets(MosadConstants.POP_UID);
try {
CreateRemindResponse createResponse = client.getAcsResponse(createRemindRequest);
MosadReturnModelParser.parse("createRemindTest", gson.toJson(createResponse));
Assert.assertTrue(createResponse.getData() > 0);
} catch (Exception ex) {
ex.printStackTrace();
return;
}
運維中心主要提供周期任務、手動業務流程、基線查詢、告警配置和查詢等相關API。
步驟四:配置數據質量監控
在這個業務場景中,我們通過前面介紹的API已經可以每天定時把數據從RDS同步到MaxCompute的表中了。如果我們擔心產生臟數據或者數據缺失影響到線上業務,可通過數據質量API,集成DataWorks數據質量監控能力,當表數據產出異常時,可以立刻觸發給規則訂閱人。
CreateQualityRuleRequest request = new CreateQualityRuleRequest();
request.setBlockType(0);
request.setComment("test-createTemplateRuleSuccess");
request.setCriticalThreshold("50");
request.setEntityId(entityId);
request.setOperator("abs");
request.setPredictType(0);
request.setProjectName(PROJECT_NAME);
request.setProperty("table_count");
request.setPropertyType("table");
request.setRuleName("createTemplateRuleSuccess");
request.setRuleType(0);
request.setTemplateId(7);
request.setWarningThreshold("10");
try {
CreateQualityRuleResponse response = client.getAcsResponse(request);
Object data = ReturnModelParser.parse("createTemplateRuleSuccess", gson.toJson(response));
Long templateRuleId = Long.parseLong(data.toString());
Assert.assertTrue(templateRuleId > 0);
return templateRuleId;
} catch (Exception e) {
e.printStackTrace();
Assert.assertFalse(true);
return null;
}
CreateQualityRule、GetQualityFollower、CreateQualityRelativeNode等數據質量API集可管理數據質量規則,具體詳情請參見對應的API。
步驟五:生成數據服務API
我們通過元數據API完成了表創建,通過數據開發API完成文件和周期任務創建,通過數據質量和運維中心API配置好了監控規則,MaxCompute分區表數據亦可順利產生,這時候我們還需要最后一個步驟把MaxCompute分區表的數據通過數據服務OpenAPI生成一個數據服務API向系統提供數據服務。
CreateDataServiceApiRequest createRequest = new CreateDataServiceApiRequest();
createRequest.setTenantId(tenantId);
createRequest.setProjectId(projectId);
createRequest.setApiMode(apiMode);
createRequest.setApiName(apiName);
createRequest.setApiPath(apiPath);
createRequest.setApiDescription("test");
createRequest.setGroupId(groupId);
createRequest.setVisibleRange(visibleRange);
createRequest.setTimeout(10000);
createRequest.setProtocols(protocols);
createRequest.setRequestMethod(requestMethod);
createRequest.setResponseContentType(responseType);
CreateDataServiceApiResponse createResponse = client.getAcsResponse(createRequest);
Long apiId = createResponse.getData();
Assert.assertNotNull(apiId);
GetDataServiceApiRequest getRequest = new GetDataServiceApiRequest();
getRequest.setTenantId(tenantId);
getRequest.setProjectId(projectId);
getRequest.setApiId(apiId);
GetDataServiceApiResponse getResponse = client.getAcsResponse(getRequest);
GetDataServiceApiResponse.Data data = getResponse.getData();
Assert.assertEquals(apiId, data.getApiId());
Assert.assertEquals(0L, data.getFolderId().longValue());
使用CreateDataServiceApi、PublishDataServiceApi可把表數據轉換成數據服務API,那么整個數據生產鏈路就完成了,集成以上的DataWorks OpenAPI即完成了本地系統和云上系統的無縫對接。