日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

開發自定義組件示例

本文通過示例的方式為您介紹自定義輸入與輸出組件的開發案例。

前提條件

  1. 單擊此處下載依賴壓縮包

  2. 將壓縮包中的plugin.center.base-0.0.1-SNAPSHOT.jar依賴包添加進Maven的私有倉庫中。

背景信息

使用自定義組件前,需先創建離線自定義源類型并創建對應數據源。

  • 創建RDBMS類型的離線自定義源類型,在集成管道組件庫的開放目錄下自動生成輸入和輸出組件。

  • 創建其他數據庫類型的離線自定義源類型,系統會根據您上傳的讀取插件寫入插件,在組件庫的開放目錄下生成對應的輸入組件輸出組件

操作步驟

步驟一:準備讀/寫插件的JAR文件

  • Maven的依賴如下。

    <dependency>
      <groupId>com.alibaba.dt.pipeline</groupId>
      <artifactId>plugin.center.base</artifactId>
      <version>0.0.1-SNAPSHOT</version>
    </dependency>
  • 讀取插件文件的代碼示例如下

    package demo;
    
    import com.alibaba.dt.pipeline.plugin.center.base.Reader;
    import com.alibaba.dt.pipeline.plugin.center.base.RecordSender;
    import com.alibaba.dt.pipeline.plugin.center.conf.Configuration;
    import com.alibaba.dt.pipeline.plugin.center.element.*;
    import com.alibaba.dt.pipeline.plugin.center.record.Record;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.List;
    import java.util.Random;
    import java.util.stream.Collectors;
    import java.util.stream.IntStream;
    
    /**
     * 用戶入口類ReaderDemo
     * 該類必須繼承自com.alibaba.dt.pipeline.plugin.center.base.Reader
     * 該類中暫時不需要定義方法,但是必須要定義兩個public的靜態子類:Job和Task,名字必須是Job和Task,大小寫敏感,否則系統找不到類
     *
     * 系統會首先初始化Job類,調用init做初始化,再調用prepare做準備工作,然后調用split,把job的configuration拆分成用戶指定的并發度個數的
     * configuration。然后執行task。當所有的task全部執行完畢,再執行post,最后再執行destroy。destroy和post方法的不同是,destroy總是
     * 會執行,哪怕出現了異常。
     *
     * 當系統調用Job的split方法得到很多configuration后,會用每一個configuration實例化一個Task,Task的方法依次執行的順序是:init,
     * prepare,getInputRowMeta,startRead,post,destroy,同樣的,destroy和post方法的不同是,destroy總是會執行,哪怕出現了異常。
     *
     */
    public class ReaderDemo extends Reader {
        // 這個是用戶在自定義數據源中定義的key,這是一個demo數據源的key,用戶自定義的話,這個key值應該不同
        public static final String DS_KEY = "demo_ds";
        // 代碼內部定義的task編號,沒有使用到可以不用
        public static final String TASK_INDEX = "taskIndex";
        // 用戶在離線管道中配置該組件的一個參數
        public static final String USER_KEY = "user_param";
    
        /**
         * public的靜態子類Job必須繼承自Reader.Job
         */
        public static class Job extends Reader.Job {
            private static final Logger logger = LoggerFactory.getLogger(Job.class);
    
            Configuration jobConfig;
    
            @Override
            public void init() {
                logger.info("job init");
                //通過這個方法拿到用戶的輸入組件配置,這些參數就是用戶在"輸入組件"界面上配置的參數
                this.jobConfig = super.getPluginJobConf();
                String value = jobConfig.getString(USER_KEY, "default_value");
                String ds = jobConfig.getString(DS_KEY, "default_ds");
                logger.info("user_param:{} ds:{}", value, ds);
            }
    
            @Override
            public void prepare() {
                super.prepare();
                logger.info("job prepare");
            }
    
            @Override
            public List<Configuration> split(int i) {
                logger.info("job split:{}", i);
                return IntStream.range(0, i).boxed().map(x -> {
                    Configuration tmpConfiguration = jobConfig.clone();
                    // 寫入configuration的編號
                    tmpConfiguration.set(TASK_INDEX, x);
                    return tmpConfiguration;
                }).collect(Collectors.toList());
            }
    
            @Override
            public void post() {
                super.post();
                logger.info("job post");
            }
    
            @Override
            public void destroy() {
                logger.info("job destroy");
            }
        }
    
        /**
         * public的靜態子類Task必須繼承自Reader.Task
         */
        public static class Task extends Reader.Task {
            private static final Logger logger = LoggerFactory.getLogger(Task.class);
    
            private Configuration taskConfig;
            private int index;
            private RowMeta rowMeta;
    
            @Override
            public void init() {
                // 獲取Job split出來的configuration
                this.taskConfig = super.getPluginJobConf();
                // 獲取Task的編號
                index = taskConfig.getInt(TASK_INDEX, -1);
                logger.info("task init:{}", index);
            }
    
            @Override
            public void prepare() {
                super.prepare();
                logger.info("task prepare");
            }
    
            @Override
            public void startRead(RecordSender recordSender) {
                logger.info("task start");
    
                Random random = new Random();
    
                // 讀取數據,封裝成Record,發送到系統內部
                for(int i = 0; i < 10; i++) {
                    Record record = recordSender.createRecord();
                    // 只是3個列,這個列的類型需要和getInputRowMeta函數的meta對其,如果是真實數據源db,需要把讀取到的數據轉換成特定的column
                    record.addColumn(new LongColumn(i));
                    record.addColumn(new StringColumn("name_" + i));
                    record.addColumn(new DoubleColumn(random.nextDouble()));
                    recordSender.sendToWriter(record);
                    logger.info("read record:{}", i);
                }
            }
    
            @Override
            public RowMeta getInputRowMeta(){
                logger.info("task column meta");
                rowMeta = new RowMeta();
    
                /**
                 * 在這里定義輸入組件讀取到數據后,往下游寫出的數據的schema,一般而言,用戶可能需要連接到db,獲取到真實數據源的schema
                 * 特別注意:這里的column名字必須和輸入組件配置頁面的column名字完全一樣,順序也要一樣。比如:這里定義了id、name、score
                 * 那么,該組件的配置頁面也必須配置上一樣的列名:id、name、score
                 */
                ColumnMeta columnMeta1 = new ColumnMeta();
                columnMeta1.setName("id");
                columnMeta1.setType(Column.Type.LONG);
                rowMeta.addColumnMeta(columnMeta1);
    
                ColumnMeta columnMeta2 = new ColumnMeta();
                columnMeta2.setName("name");
                columnMeta2.setType(Column.Type.STRING);
                rowMeta.addColumnMeta(columnMeta2);
    
                ColumnMeta columnMeta3 = new ColumnMeta();
                columnMeta3.setName("score");
                columnMeta3.setType(Column.Type.DOUBLE);
                rowMeta.addColumnMeta(columnMeta3);
    
                return rowMeta;
            }
    
            @Override
            public void post() {
                super.post();
                logger.info("task post");
            }
    
            @Override
            public void destroy() {
                logger.info("task destroy");
    
            }
        }
    }
  • 寫入插件文件的代碼示例如下

    package demo;
    
    import com.alibaba.dt.pipeline.plugin.center.base.RecordReceiver;
    import com.alibaba.dt.pipeline.plugin.center.base.Writer;
    import com.alibaba.dt.pipeline.plugin.center.conf.Configuration;
    import com.alibaba.dt.pipeline.plugin.center.record.Record;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.List;
    import java.util.stream.Collectors;
    import java.util.stream.IntStream;
    
    /**
     * 用戶入口類WriterDemo
     * 該類必須繼承自com.alibaba.dt.pipeline.plugin.center.base.Writer
     * 該類中暫時不需要定義方法,但是必須要定義兩個public的靜態子類:Job和Task,名字必須是Job和Task,大小寫敏感,否則系統找不到類
     *
     * 系統會首先初始化Job類,調用init做初始化,再調用prepare做準備工作,然后調用split,把job的configuration拆分成用戶指定的并發度個數的
     * configuration。然后執行task。當所有的task全部執行完畢,再執行post,最后再執行destroy。destroy和post方法的不同是,destroy總是
     * 會執行,哪怕出現了異常。
     *
     * 當系統調用Job的split方法得到很多configuration后,會用每一個configuration實例化一個Task,Task的方法依次執行的順序是:init,
     * prepare,startWrite,post,destroy,同樣的,destroy和post方法的不同是,destroy總是會執行,哪怕出現了異常。
     *
     */
    public class WriterDemo extends Writer {
        // 這個是用戶在自定義數據源中定義的key,這是一個demo數據源的key,用戶自定義的話,這個key值應該不同
        public static final String DS_KEY = "demo_ds";
        // 代碼內部定義的task編號,沒有使用到可以不用
        public static final String TASK_INDEX = "taskIndex";
        // 用戶在離線管道中配置該組件的一個參數
        public static final String USER_KEY = "user_param";
    
        /**
         * public的靜態子類Job必須繼承自Writer.Job
         */
        public static class Job extends Writer.Job {
            private static final Logger logger = LoggerFactory.getLogger(Job.class);
    
            Configuration jobConfig;
    
            @Override
            public void init() {
                logger.info("job init");
                //通過這個方法拿到用戶的輸出組件配置,這些參數就是用戶在輸出組件界面上配置的參數
                this.jobConfig = super.getPluginJobConf();
                String value = jobConfig.getString(USER_KEY, "default_value");
                String ds = jobConfig.getString(DS_KEY, "default_ds");
                logger.info("user_param:{} ds:{}", value, ds);
            }
    
            @Override
            public void prepare() {
                super.prepare();
                logger.info("job prepare");
            }
    
            @Override
            public List<Configuration> split(int i) {
                logger.info("job split:{}", i);
                return IntStream.range(0, i).boxed().map(x -> {
                    Configuration tmpConfiguration = jobConfig.clone();
                    // 寫入configuration的編號
                    tmpConfiguration.set(TASK_INDEX, x);
                    return tmpConfiguration;
                }).collect(Collectors.toList());
            }
    
            @Override
            public void post() {
                super.post();
                logger.info("job post");
            }
    
            @Override
            public void destroy() {
                logger.info("job destroy");
            }
        }
    
        /**
         * public的靜態子類Task必須繼承自Writer.Task
         */
        public static class Task extends Writer.Task {
            private static final Logger logger = LoggerFactory.getLogger(Task.class);
    
            private Configuration taskConfig;
            private int index;
    
            @Override
            public void init() {
                // 獲取Job split出來的configuration
                this.taskConfig = super.getPluginJobConf();
                // 獲取Task的編號
                index = taskConfig.getInt(TASK_INDEX, -1);
                logger.info("task init:{}", index);
            }
    
            @Override
            public void prepare() {
                super.prepare();
                logger.info("task prepare");
            }
    
            @Override
            public void startWrite(RecordReceiver recordReceiver) {
                logger.info("task start");
    
                Record record;
                while ((record = recordReceiver.getFromReader()) != null) {
                    logger.info("======: " + record.toString());
                }
            }
    
            @Override
            public void post() {
                super.post();
                logger.info("task post");
            }
    
            @Override
            public void destroy() {
                logger.info("task destroy");
    
            }
        }
    }
                                

步驟二:創建其他數據庫類型的離線自定義源類型

  1. 在Dataphin首頁,單擊頂部菜單欄管理中心 > 數據源管理

  2. 在左側導航欄單擊自定義源類型

  3. 自定義源類型頁面中,單擊新建自定義源類型,下拉列表中選擇新建離線自定義源類型image

  4. 新建離線自定義源類型頁面,配置參數。

    參數

    描述

    基本配置

    類型

    選擇其他數據庫

    名稱

    自定義組件的名稱。

    支持中文、英文字母大小寫、下劃線(_)和數字。長度不超過64個字符。

    類型編碼

    組件的唯一標識。供后端使用,創建后不可編輯。

    僅支持英文字母大小寫、數字和下劃線(_),且不能以數字開頭。

    數據源JSON

    填寫數據源JSON代碼及上傳讀寫插件:

    填寫數據源JSON代碼,即定義數據源的配置項,代碼示例說明如下:

    [
      {
        "columnName": "url",
        "columnType": "NORMAL",
        "text": {
          "zh_CN": "鏈接地址",
          "en_US": "address",
          "zh_TW": "繁體"
        },
        "placeholder": {
          "zh_CN": "請輸入鏈接地址",
          "en_US": "input address",
          "zh_TW": "繁體"
        }
      },
      {
        "columnName": "username",
        "columnType": "NORMAL",
        "text": {
          "zh_CN": "用戶名",
          "en_US": "username",
          "zh_TW": "繁體"
        },
        "placeholder": {
          "zh_CN": "請輸入用戶名",
          "en_US": "input username",
          "zh_TW": "繁體"
        }
      },
      {
        "columnName": "password",
        "columnType": "ENCRYPT",
        "text": {
          "zh_CN": "密碼",
          "en_US": "password",
          "zh_TW": "繁體"
        },
        "placeholder": {
          "zh_CN": "請輸入密碼",
          "en_US": "input password",
          "zh_TW": "繁體"
        }
      }
    ]

    上述JSON示例中字段說明如下:

    • columnName:參數名稱。

    • columnType:參數類型分為NORMALENCRYPT。密碼類的參數,參數類型需使用ENCRYPT。

    • placeholder:用戶輸入框中的默認值在三種語言情況下的輸出。若不填寫,則用戶輸入框為空。

    • text:參數名稱在三種語言情況下的輸出。

    資源配置

    讀寫插件

    選擇需要配置的插件類型、填寫對應的ClassName(插件類名)并上傳步驟一中準備好的插件文件。插件文件僅支持.jar類型,文件大小不超過50MB

    說明

    讀寫插件需至少選擇一項進行配置。完成讀取插件寫入插件的配置后,會生成對應的輸入插件輸出插件

    描述信息

    描述

    對自定義的數據源的簡單描述。不超過128個字符。

  5. 單擊創建