日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

通過OpenAPI創建、修改、刪除離線同步任務

本文為您介紹如何使用OpenAPI創建、修改、刪除數據集成同步任務,同步來源端數據至去向端。

前提條件

使用限制

  • DataWorks當前僅支持使用OpenAPI創建數據集成離線同步任務。
  • 調用CreateDISyncTask創建數據集成同步任務,僅支持使用腳本模式配置同步任務內容,詳情請參見通過腳本模式配置離線同步任務
  • DataWorks暫不支持使用OpenAPI創建業務流程,您需要使用現有的業務流程創建數據同步任務。

配置環境依賴及賬號認證

  • 配置Maven依賴。
    1. 打開Maven項目下的pom.xml文件,添加aliyun-java-sdk-core
      <dependency>
       <groupId>com.aliyun</groupId>
       <artifactId>aliyun-java-sdk-core</artifactId>
       <version>4.5.20</version>
      </dependency>
    2. 打開Maven項目下的pom.xml文件,添加aliyun-java-sdk-dataworks-public
      <dependency>
        <groupId>com.aliyun</groupId>
        <artifactId>aliyun-java-sdk-dataworks-public</artifactId>
        <version>3.3.18</version>
      </dependency>
    3. 打開Maven項目,添加credentials-java(推薦使用Credentials最新版本)。
      <dependency>
          <groupId>com.aliyun</groupId>
          <artifactId>credentials-java</artifactId>
          <version>0.2.11</version>
      </dependency>
      說明 通過阿里云Credentials工具托管AK,關于Credentials工具使用說明,詳情請參見:身份驗證配置
  • 客戶端認證。
    使用OpenAPI創建數據同步任務前,需要調用如下語句對登錄阿里云的賬號相關信息進行認證。如果阿里云的賬號信息認證通過,則繼續執行后續任務,如果認證不通過,則該調用會報錯,您需要根據實際報錯處理相關問題。
    // Please ensure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are set.http://bestwisewords.com/document_detail/378657.html
            IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));

配置流程

完成上述配置環境依賴及賬號認證后,您可以通過OpenAPI調用相關接口,創建數據同步任務,同步來源端數據至去向端。配置流程如下:
  1. 創建數據集成任務。
  2. 配置任務的調度依賴。
  3. 提交數據集成任務。
  4. 發布同步任務至生產環境。

配置步驟

  1. 創建數據集成任務。
    調用CreateDISyncTask接口,創建數據集成任務。如下代碼僅示例部分參數的配置,更多參數詳情請參見CreateDISyncTask
    public void createFile() throws ClientException{
            CreateDISyncTaskRequest request = new CreateDISyncTaskRequest();
            request.setProjectId(181565L);
            request.setTaskType("DI_OFFLINE");
            request.setTaskContent("{\"type\":\"job\",\"version\":\"2.0\",\"steps\":[{\"stepType\":\"mysql\",\"parameter\":{\"envType\":1,\"datasource\":\"dh_mysql\",\"column\":[\"id\",\"name\"],\"tableComment\":\"same表comment\",\"connection\":[{\"datasource\":\"dh_mysql\",\"table\":[\"same\"]}],\"where\":\"\",\"splitPk\":\"id\",\"encoding\":\"UTF-8\"},\"name\":\"Reader\",\"category\":\"reader\"},{\"stepType\":\"odps\",\"parameter\":{\"partition\":\"pt=${bizdate}\",\"truncate\":true,\"datasource\":\"odps_first\",\"envType\":1,\"column\":[\"id\",\"name\"],\"emptyAsNull\":false,\"tableComment\":\"same表comment\",\"table\":\"same\"},\"name\":\"Writer\",\"category\":\"writer\"}],\"setting\":{\"errorLimit\":{\"record\":\"\"},\"speed\":{\"throttle\":false,\"concurrent\":2}},\"order\":{\"hops\":[{\"from\":\"Reader\",\"to\":\"Writer\"}]}}");
            request.setTaskParam("{\"FileFolderPath\":\"業務流程/new_biz/數據集成\",\"ResourceGroup\":\"S_res_group_280749521950784_1602767279794\"}");
            request.setTaskName("new_di_task_0607_1416");
    
    
            String regionId = "cn-hangzhou";
            // Please ensure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are set.http://bestwisewords.com/document_detail/378657.html
            IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
            DefaultProfile.addEndpoint("cn-hangzhou","dataworks-public","dataworks.cn-hangzhou.aliyuncs.com");
            IAcsClient client;
    
            client = new DefaultAcsClient(profile);
    
            CreateDISyncTaskResponse response1 = client.getAcsResponse(request);
            Gson gson1 = new Gson();
    
            System.out.println(gson1.toJson(response1));
    }
  2. 調用UpdateFile接口,更新調度參數。
    請求參數配置詳情如下表所示:
    名稱類型是否必選示例值描述
    ActionStringUpdateFile

    要執行的操作。

    FileFolderPathString業務流程/第一個業務流程/數據集成/文件夾1/文件夾2

    文件所在的路徑。

    ProjectIdLong10000

    DataWorks工作空間的ID。您可以登錄DataWorks控制臺,進入工作空間管理頁面獲取ID。

    FileNameStringods_user_info_d

    文件的名稱。您可以通過重新設置FileName的值來修改文件名稱。

    例如,使用ListFiles接口查詢目標目錄下的文件ID,通過UpdateFile接口,輸入查詢的文件ID至FileId參數,并配置FileName的參數值,即可修改相應文件的名稱。

    FileDescriptionString這里是文件描述

    文件的描述。

    ContentStringSELECT "1";

    文件代碼內容,不同代碼類型(fileType)的文件,代碼格式不同。您可以在運維中心,右鍵單擊對應類型的任務,選擇查看代碼,查看具體的代碼格式。

    AutoRerunTimesInteger3

    文件出錯后,自動重跑的次數。

    AutoRerunIntervalMillisInteger120000

    出錯自動重跑時間間隔,單位為毫秒。最大為1800000毫秒(30分鐘)。

    該參數與DataWorks控制臺中,數據開發任務的“調度配置>時間屬性>出錯自動重跑”的”重跑間隔“配置對應。

    控制臺中“重跑間隔”的時間單位為分鐘,請在調用時注意轉換時間。

    RerunModeStringALL_ALLOWED

    重跑屬性。取值如下:

    • ALL_ALLOWED:運行成功或失敗后皆可重跑。
    • FAILURE_ALLOWED:運行成功后不可重跑,運行失敗后可以重跑。
    • ALL_DENIED:運行成功或失敗皆不可重跑。

    該參數與DataWorks控制臺中,數據開發任務的“調度配置>時間屬性>重跑屬性”配置內容對應。

    StopBooleanfalse

    是否暫停調度,取值如下:

    • true:暫停調度。
    • false:不暫停調度。

    該參數與DataWorks控制臺中,數據開發任務的“調度配置>時間屬性>調度類型”配置為”暫停調度“時對應。

    ParaValueStringx=a y=b z=c

    調度參數。

    該參數與DataWorks控制臺中,數據開發任務的“調度配置>參數”對應。您可以參考調度參數配置。

    StartEffectDateLong936923400000

    開始自動調度的毫秒時間戳。

    該參數與DataWorks控制臺中,數據開發任務的“調度配置>時間屬性>生效日期”配置的開始時間的毫秒時間戳對應。

    EndEffectDateLong4155787800000

    停止自動調度的時間戳,單位為毫秒。

    該參數與DataWorks控制臺中,數據開發任務的“調度配置>時間屬性>生效日期”配置的結束時間的毫秒時間戳對應。

    CronExpressString00 00-59/5 1-23 * * ?

    周期調度的cron表達式,該參數與DataWorks控制臺中,數據開發任務的“調度配置>時間屬性>cron表達式”對應。配置完“調度周期”及“定時調度時間”后,DataWorks會自動生成相應cron表達式。

    示例如下:

    • 每天凌晨5點30分定時調度:00 30 05 * * ?
    • 每個小時的第15分鐘定時調度:00 15 * * * ?
    • 每隔十分鐘調度一次:00 00/10 * * * ?
    • 每天8點到17點,每隔十分鐘調度一次:00 00-59/10 8-23 * * * ?
    • 每月的1日0點20分自動調度:00 20 00 1 * ?
    • 從1月1日0點10分開始,每過3個月調度一次:00 10 00 1 1-12/3 ?
    • 每周二、周五的0點5分自動調度:00 05 00 * * 2,5

    由于DataWorks調度系統的規則,cron表達式有以下限制:

    • 最短調度間隔時間為5分鐘。
    • 每天最早調度時間為0點5分。
    CycleTypeStringNOT_DAY

    調度周期的類型,包括NOT_DAY(分鐘、小時)和DAY(日、周、月)。

    該參數與DataWorks控制臺中,數據開發任務的“調度配置>時間屬性>調度周期”對應。

    DependentTypeStringUSER_DEFINE

    依賴上一周期的方式。取值如下:

    • SELF:依賴項選擇本節點。
    • CHILD:依賴項選擇一級子節點。
    • USER_DEFINE:依賴項選擇其他節點。
    • NONE:未選擇依賴項,即不會依賴上一周期。
    DependentNodeIdListString5,10,15,20

    當DependentType參數配置為USER_DEFINE時,用于設置當前文件具體依賴的節點ID。依賴多個節點時,使用英文逗號(,)分隔。

    該參數與DataWorks控制臺中,數據開發任務的“調度配置>調度依賴”配置為“上一周期”后,依賴項選擇”其他節點“時配置的內容對應。

    InputListStringproject_root,project.file1,project.001_out

    文件依賴的上游文件的輸出名稱。依賴多個輸出時,使用英文逗號(,)分隔。

    該參數與DataWorks控制臺中,數據開發任務的“調度配置>調度依賴“選擇”同周期“時的”父節點輸出名稱”對應。

    ProjectIdentifierStringdw_project

    DataWorks工作空間的名稱。您可以登錄DataWorks控制臺,進入工作空間配置頁面獲取工作空間名稱。

    該參數與ProjectId參數,二者必須設置其一,用來確定本次API調用操作的DataWorks工作空間。

    FileIdLong100000001

    文件的ID。您可以調用ListFiles接口獲取文件ID。

    OutputListStringdw_project.ods_user_info_d

    文件的輸出。

    該參數與DataWorks控制臺中,數據開發任務的“調度配置>調度依賴“選擇”同周期“時的”本節點的輸出名稱”對應。

    ResourceGroupIdentifierStringdefault_group

    文件發布成任務后,任務執行時對應的資源組。您可以調用ListResourceGroups獲取工作空間可用的資源組列表。

    ConnectionNameStringodps_first

    文件對應任務執行時,任務使用的數據源標識符。您可以調用ListDataSources獲取可用的數據源列表。

    OwnerString18023848927592

    文件所有者的用戶ID。

    AutoParsingBooleantrue

    文件是否開啟自動解析功能。取值如下:

    • true:文件會自動解析代碼。
    • false:文件不會自動解析代碼。

    該參數與DataWorks控制臺中,數據開發任務的“調度配置>調度依賴”選擇“同周期”時的“代碼解析”對應。

    SchedulerTypeStringNORMAL

    調度的類型,取值如下:

    • NORMAL:正常調度任務。
    • MANUAL:手動任務,不會被日常調度,對應手動業務流程下的節點。
    • PAUSE:暫停任務。
    • SKIP:空跑任務,被日常調度,但啟動調度時直接被置為成功。
    AdvancedSettingsString{"queue":"default","SPARK_CONF":"--conf spark.driver.memory=2g"}

    任務的高級配置。

    該參數與DataWorks控制臺中,EMR Spark Streaming和EMR Streaming SQL數據開發任務,編輯頁面右側導航欄的“高級設置“對應。

    當前僅EMR Spark Streaming和EMR Streaming SQL任務支持配置該參數,并且參數為JSON格式。

    StartImmediatelyBooleantrue

    發布后是否立即啟動。取值如下:

    • true:發布后立即啟動。
    • false:發布后暫不啟動。

    該參數與DataWorks控制臺中,EMR Spark Streaming和EMR Streaming SQL數據開發任務,編輯頁面右側導航欄的“配置>時間屬性>啟動方式“對應。

    InputParametersString[{"ValueSource": "project_001.first_node:bizdate_param","ParameterName": "bizdate_input"}]

    節點的上下文輸入參數。參數為JSON格式,包含的字段可參考GetFile接口返回值中的InputContextParameterList參數結構。

    該參數與DataWorks控制臺中,數據開發任務的“調度配置>節點上下文>本節點輸入參數“對應。

    OutputParametersString[{"Type": 1,"Value": "${bizdate}","ParameterName": "bizdate_param"}]

    節點的上下文輸出參數。參數為JSON格式,包含的字段可參考GetFile接口返回值中的OutputContextParameterList參數結構。

    該參數與DataWorks控制臺中,數據開發任務的“調度配置>節點上下文>本節點輸出參數“對應。

  3. 提交數據集成任務。
    調用SubmitFile接口,提交數據集成任務至調度系統的開發環境。任務提交后,Response會返回deploymentId,您可以調用GetDeployment接口,通過deploymentId獲取本次發布包的詳細信息。
     public void submitFile() throws ClientException{
            SubmitFileRequest request = new SubmitFileRequest();
            request.setProjectId(78837L);
            request.setProjectIdentifier("zxy_8221431");
          // 此節點ID為創建節點時返回的ID,對應數據庫File表的file_id。
            request.setFileId(501576542L);
            request.setComment("備注");
            SubmitFileResponse acsResponse = client.getAcsResponse(request);
          //DeploymentId為提交或發布的返回值。
          Long deploymentId = acsResponse.getData();
            log.info(acsResponse.toString());
    }
    上述代碼僅示例部分參數的配置,更多參數詳情請參見SubmitFileGetDeployment
  4. 發布同步任務到生產環境。
    調用DeployFile接口,發布數據集成同步任務至生產環境。
    說明 僅標準模式的工作空間涉及執行該發布操作。
     public void deploy() throws ClientException{
            DeployFileRequest request = new DeployFileRequest();
            request.setProjectIdentifier("zxy_8221431");
            request.setFileId(501576542L);
            request.setComment("備注");
            //NodeId和file_id二選一。NodeId的值為調度配置中基礎屬性的節點ID。
            request.setNodeId(700004537241L);
            DeployFileResponse acsResponse = client.getAcsResponse(request);
             //DeploymentId為提交或發布的返回值。
            Long deploymentId = acsResponse.getData();
            log.info(acsResponse.getData().toString());
        }
    上述代碼僅示例部分參數的配置,更多參數詳情請參見DeployFile
  5. 獲取發布包詳情。
    任務發布后,Response會返回deploymentId,您可以調用GetDeployment接口,通過deploymentId獲取本次發布包的詳細信息。當GetDeployment接口的返回參數Status取值為1時,則表示此次發布成功。
    public void getDeployment() throws ClientException{
            GetDeploymentRequest request = new GetDeploymentRequest();
            request.setProjectId(78837L);
            request.setProjectIdentifier("zxy_8221431");
          //DeploymentId為提交或發布的返回值。調用GetDeployment接口,獲取本次發布的具體情況。
            request.setDeploymentId(2776067L);
            GetDeploymentResponse acsResponse = client.getAcsResponse(request);
            log.info(acsResponse.getData().toString());
        }
    上述代碼僅示例部分參數的配置,更多參數詳情請參見GetDeployment

修改同步任務的相關配置

成功創建數據集成同步任務后,您可以調用UpdateDISyncTask接口更新任務的Content,或通過TaskParam來更新使用的獨享資源組。更新后,您需要重新提交、發布同步任務,詳情請參見配置流程

刪除同步任務

創建數據集成同步任務后,您可以調用DeleteFile接口刪除同步任務。

Sample代碼

POM 依賴

<dependency>
        <groupId>com.aliyun</groupId>
        <artifactId>aliyun-java-sdk-core</artifactId>
        <version>4.5.20</version>
</dependency>
<dependency>
    <groupId>com.aliyun</groupId>
    <artifactId>aliyun-java-sdk-dataworks-public</artifactId>
    <version>3.4.1</version>
 </dependency>

Java Sdk調用代碼

import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.dataworks_public.model.v20200518.*;
import com.aliyuncs.profile.DefaultProfile;
import com.aliyuncs.profile.IClientProfile;
import java.util.List;

public class createofflineTask {
    static Long createTask(String fileName) throws  Exception {
        Long projectId = 2043L;
        String taskType = "DI_OFFLINE";
        String taskContent = "{\n" +
                "    \"type\": \"job\",\n" +
                "    \"version\": \"2.0\",\n" +
                "    \"steps\": [\n" +
                "        {\n" +
                "            \"stepType\": \"mysql\",\n" +
                "            \"parameter\": {\n" +
                "                \"envType\": 0,\n" +
                "                \"datasource\": \"mysql_autotest_dev\",\n" +
                "                \"column\": [\n" +
                "                    \"id\",\n" +
                "                    \"name\"\n" +
                "                ],\n" +
                "                \"connection\": [\n" +
                "                    {\n" +
                "                        \"datasource\": \"mysql_autotest_dev\",\n" +
                "                        \"table\": [\n" +
                "                            \"user\"\n" +
                "                        ]\n" +
                "                    }\n" +
                "                ],\n" +
                "                \"where\": \"\",\n" +
                "                \"splitPk\": \"\",\n" +
                "                \"encoding\": \"UTF-8\"\n" +
                "            },\n" +
                "            \"name\": \"Reader\",\n" +
                "            \"category\": \"reader\"\n" +
                "        },\n" +
                "        {\n" +
                "            \"stepType\": \"odps\",\n" +
                "            \"parameter\": {\n" +
                "                \"partition\": \"pt=${bizdate}\",\n" +
                "                \"truncate\": true,\n" +
                "                \"datasource\": \"odps_first\",\n" +
                "                \"envType\": 0,\n" +
                "                \"column\": [\n" +
                "                    \"id\",\n" +
                "                    \"name\"\n" +
                "                ],\n" +
                "                \"emptyAsNull\": false,\n" +
                "                \"tableComment\": \"null\",\n" +
                "                \"table\": \"user\"\n" +
                "            },\n" +
                "            \"name\": \"Writer\",\n" +
                "            \"category\": \"writer\"\n" +
                "        }\n" +
                "    ],\n" +
                "    \"setting\": {\n" +
                "        \"executeMode\": null,\n" +
                "        \"errorLimit\": {\n" +
                "            \"record\": \"\"\n" +
                "        },\n" +
                "        \"speed\": {\n" +
                "            \"concurrent\": 2,\n" +
                "            \"throttle\": false\n" +
                "        }\n" +
                "    },\n" +
                "    \"order\": {\n" +
                "        \"hops\": [\n" +
                "            {\n" +
                "                \"from\": \"Reader\",\n" +
                "                \"to\": \"Writer\"\n" +
                "            }\n" +
                "        ]\n" +
                "    }\n" +
                "}";
        CreateDISyncTaskRequest request = new CreateDISyncTaskRequest();
        request.setProjectId(projectId);
        request.setTaskType(taskType);
        request.setTaskContent(taskContent);
        request.setTaskName(fileName);
        request.setTaskParam("{\"FileFolderPath\":\"業務流程/自動化測試空間_勿動/數據集成\",\"ResourceGroup\":\"S_res_group_XXX\"}");
        // 使用數據集成獨享資源組,
        CreateDISyncTaskResponse response1 = client.getAcsResponse(request);
        return response1.getData().getFileId();
    }

    public static void updateFile(Long fileId) throws Exception {
        UpdateFileRequest request = new UpdateFileRequest();
        request.setProjectId(2043L);
        request.setFileId(fileId);
        request.setAutoRerunTimes(3);
        request.setRerunMode("FAILURE_ALLOWED");
        request.setCronExpress("00 30 05 * * ?");
        request.setCycleType("DAY");
        request.setResourceGroupIdentifier("S_res_group_XXX");
        // 使用調度獨享資源組
        request.setInputList("dataworks_di_autotest_root");

        request.setAutoParsing(true);
        request.setDependentNodeIdList("5,10,15,20");
        request.setDependentType("SELF");
        request.setStartEffectDate(0L);
        request.setEndEffectDate(4155787800000L);
        request.setFileDescription("description");
        request.setStop(false);
        request.setParaValue("x=a y=b z=c");
        request.setSchedulerType("NORMAL");
        request.setAutoRerunIntervalMillis(120000);
        UpdateFileResponse response1 = client.getAcsResponse(request);
    }
    static void deleteTask(Long fileId) throws Exception {
        DeleteFileRequest request = new DeleteFileRequest();
        Long projectId = 63845L;
        request.setProjectId(projectId);
        request.setFileId(fileId);


        String akId = "XXX";
        String akSecret = "XXXX";
        String regionId = "cn-hangzhou";
        IClientProfile profile = DefaultProfile.getProfile(regionId, akId, akSecret);
        DefaultProfile.addEndpoint("cn-hangzhou","dataworks-public","dataworks.cn-hangzhou.aliyuncs.com");
        IAcsClient client;

        client = new DefaultAcsClient(profile);

        DeleteFileResponse response1 = client.getAcsResponse(request);
        System.out.println(JSONObject.toJSONString(response1));
    }

    static IAcsClient client;

    public static void main(String[] args) throws Exception {
        String akId = "XX";
        // Please ensure that the environment variables ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET are set.http://bestwisewords.com/document_detail/378657.html
        IClientProfile profile = DefaultProfile.getProfile("cn-shanghai", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
        DefaultProfile.addEndpoint(regionId, "dataworks-public", "dataworks." + regionId + ".aliyuncs.com");

        client = new DefaultAcsClient(profile);
        String taskName = "offline_job_0930_1648";
        Long fileId = createTask(taskName); // 創建數據集成任務
        updateFile(fileId);   // 修改數據集成任務的調度屬性
    }
}