本文通過示例的方式為您介紹自定義輸入與輸出組件的開發案例。
前提條件
單擊此處下載依賴壓縮包。
將壓縮包中的
plugin.center.base-0.0.1-SNAPSHOT.jar
依賴包添加進Maven的私有倉庫中。
背景信息
使用自定義組件前,需先創建離線自定義源類型并創建對應數據源。
創建RDBMS類型的離線自定義源類型,在集成管道組件庫的開放目錄下自動生成輸入和輸出組件。
創建其他數據庫類型的離線自定義源類型,系統會根據您上傳的讀取插件和寫入插件,在組件庫的開放目錄下生成對應的輸入組件和輸出組件。
操作步驟
步驟一:準備讀/寫插件的JAR文件
Maven的依賴如下。
<dependency> <groupId>com.alibaba.dt.pipeline</groupId> <artifactId>plugin.center.base</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency>
讀取插件文件的代碼示例如下。
package demo; import com.alibaba.dt.pipeline.plugin.center.base.Reader; import com.alibaba.dt.pipeline.plugin.center.base.RecordSender; import com.alibaba.dt.pipeline.plugin.center.conf.Configuration; import com.alibaba.dt.pipeline.plugin.center.element.*; import com.alibaba.dt.pipeline.plugin.center.record.Record; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; import java.util.Random; import java.util.stream.Collectors; import java.util.stream.IntStream; /** * 用戶入口類ReaderDemo * 該類必須繼承自com.alibaba.dt.pipeline.plugin.center.base.Reader * 該類中暫時不需要定義方法,但是必須要定義兩個public的靜態子類:Job和Task,名字必須是Job和Task,大小寫敏感,否則系統找不到類 * * 系統會首先初始化Job類,調用init做初始化,再調用prepare做準備工作,然后調用split,把job的configuration拆分成用戶指定的并發度個數的 * configuration。然后執行task。當所有的task全部執行完畢,再執行post,最后再執行destroy。destroy和post方法的不同是,destroy總是 * 會執行,哪怕出現了異常。 * * 當系統調用Job的split方法得到很多configuration后,會用每一個configuration實例化一個Task,Task的方法依次執行的順序是:init, * prepare,getInputRowMeta,startRead,post,destroy,同樣的,destroy和post方法的不同是,destroy總是會執行,哪怕出現了異常。 * */ public class ReaderDemo extends Reader { // 這個是用戶在自定義數據源中定義的key,這是一個demo數據源的key,用戶自定義的話,這個key值應該不同 public static final String DS_KEY = "demo_ds"; // 代碼內部定義的task編號,沒有使用到可以不用 public static final String TASK_INDEX = "taskIndex"; // 用戶在離線管道中配置該組件的一個參數 public static final String USER_KEY = "user_param"; /** * public的靜態子類Job必須繼承自Reader.Job */ public static class Job extends Reader.Job { private static final Logger logger = LoggerFactory.getLogger(Job.class); Configuration jobConfig; @Override public void init() { logger.info("job init"); //通過這個方法拿到用戶的輸入組件配置,這些參數就是用戶在"輸入組件"界面上配置的參數 this.jobConfig = super.getPluginJobConf(); String value = jobConfig.getString(USER_KEY, "default_value"); String ds = jobConfig.getString(DS_KEY, "default_ds"); logger.info("user_param:{} ds:{}", value, ds); } @Override public void prepare() { super.prepare(); logger.info("job prepare"); } @Override public List<Configuration> split(int i) { logger.info("job split:{}", i); return IntStream.range(0, i).boxed().map(x -> { Configuration tmpConfiguration = jobConfig.clone(); // 寫入configuration的編號 tmpConfiguration.set(TASK_INDEX, x); return tmpConfiguration; }).collect(Collectors.toList()); } @Override public void post() { super.post(); logger.info("job post"); } @Override public void destroy() { logger.info("job destroy"); } } /** * public的靜態子類Task必須繼承自Reader.Task */ public static class Task extends Reader.Task { private static final Logger logger = LoggerFactory.getLogger(Task.class); private Configuration taskConfig; private int index; private RowMeta rowMeta; @Override public void init() { // 獲取Job split出來的configuration this.taskConfig = super.getPluginJobConf(); // 獲取Task的編號 index = taskConfig.getInt(TASK_INDEX, -1); logger.info("task init:{}", index); } @Override public void prepare() { super.prepare(); logger.info("task prepare"); } @Override public void startRead(RecordSender recordSender) { logger.info("task start"); Random random = new Random(); // 讀取數據,封裝成Record,發送到系統內部 for(int i = 0; i < 10; i++) { Record record = recordSender.createRecord(); // 只是3個列,這個列的類型需要和getInputRowMeta函數的meta對其,如果是真實數據源db,需要把讀取到的數據轉換成特定的column record.addColumn(new LongColumn(i)); record.addColumn(new StringColumn("name_" + i)); record.addColumn(new DoubleColumn(random.nextDouble())); recordSender.sendToWriter(record); logger.info("read record:{}", i); } } @Override public RowMeta getInputRowMeta(){ logger.info("task column meta"); rowMeta = new RowMeta(); /** * 在這里定義輸入組件讀取到數據后,往下游寫出的數據的schema,一般而言,用戶可能需要連接到db,獲取到真實數據源的schema * 特別注意:這里的column名字必須和輸入組件配置頁面的column名字完全一樣,順序也要一樣。比如:這里定義了id、name、score * 那么,該組件的配置頁面也必須配置上一樣的列名:id、name、score */ ColumnMeta columnMeta1 = new ColumnMeta(); columnMeta1.setName("id"); columnMeta1.setType(Column.Type.LONG); rowMeta.addColumnMeta(columnMeta1); ColumnMeta columnMeta2 = new ColumnMeta(); columnMeta2.setName("name"); columnMeta2.setType(Column.Type.STRING); rowMeta.addColumnMeta(columnMeta2); ColumnMeta columnMeta3 = new ColumnMeta(); columnMeta3.setName("score"); columnMeta3.setType(Column.Type.DOUBLE); rowMeta.addColumnMeta(columnMeta3); return rowMeta; } @Override public void post() { super.post(); logger.info("task post"); } @Override public void destroy() { logger.info("task destroy"); } } }
寫入插件文件的代碼示例如下。
package demo; import com.alibaba.dt.pipeline.plugin.center.base.RecordReceiver; import com.alibaba.dt.pipeline.plugin.center.base.Writer; import com.alibaba.dt.pipeline.plugin.center.conf.Configuration; import com.alibaba.dt.pipeline.plugin.center.record.Record; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; import java.util.stream.Collectors; import java.util.stream.IntStream; /** * 用戶入口類WriterDemo * 該類必須繼承自com.alibaba.dt.pipeline.plugin.center.base.Writer * 該類中暫時不需要定義方法,但是必須要定義兩個public的靜態子類:Job和Task,名字必須是Job和Task,大小寫敏感,否則系統找不到類 * * 系統會首先初始化Job類,調用init做初始化,再調用prepare做準備工作,然后調用split,把job的configuration拆分成用戶指定的并發度個數的 * configuration。然后執行task。當所有的task全部執行完畢,再執行post,最后再執行destroy。destroy和post方法的不同是,destroy總是 * 會執行,哪怕出現了異常。 * * 當系統調用Job的split方法得到很多configuration后,會用每一個configuration實例化一個Task,Task的方法依次執行的順序是:init, * prepare,startWrite,post,destroy,同樣的,destroy和post方法的不同是,destroy總是會執行,哪怕出現了異常。 * */ public class WriterDemo extends Writer { // 這個是用戶在自定義數據源中定義的key,這是一個demo數據源的key,用戶自定義的話,這個key值應該不同 public static final String DS_KEY = "demo_ds"; // 代碼內部定義的task編號,沒有使用到可以不用 public static final String TASK_INDEX = "taskIndex"; // 用戶在離線管道中配置該組件的一個參數 public static final String USER_KEY = "user_param"; /** * public的靜態子類Job必須繼承自Writer.Job */ public static class Job extends Writer.Job { private static final Logger logger = LoggerFactory.getLogger(Job.class); Configuration jobConfig; @Override public void init() { logger.info("job init"); //通過這個方法拿到用戶的輸出組件配置,這些參數就是用戶在輸出組件界面上配置的參數 this.jobConfig = super.getPluginJobConf(); String value = jobConfig.getString(USER_KEY, "default_value"); String ds = jobConfig.getString(DS_KEY, "default_ds"); logger.info("user_param:{} ds:{}", value, ds); } @Override public void prepare() { super.prepare(); logger.info("job prepare"); } @Override public List<Configuration> split(int i) { logger.info("job split:{}", i); return IntStream.range(0, i).boxed().map(x -> { Configuration tmpConfiguration = jobConfig.clone(); // 寫入configuration的編號 tmpConfiguration.set(TASK_INDEX, x); return tmpConfiguration; }).collect(Collectors.toList()); } @Override public void post() { super.post(); logger.info("job post"); } @Override public void destroy() { logger.info("job destroy"); } } /** * public的靜態子類Task必須繼承自Writer.Task */ public static class Task extends Writer.Task { private static final Logger logger = LoggerFactory.getLogger(Task.class); private Configuration taskConfig; private int index; @Override public void init() { // 獲取Job split出來的configuration this.taskConfig = super.getPluginJobConf(); // 獲取Task的編號 index = taskConfig.getInt(TASK_INDEX, -1); logger.info("task init:{}", index); } @Override public void prepare() { super.prepare(); logger.info("task prepare"); } @Override public void startWrite(RecordReceiver recordReceiver) { logger.info("task start"); Record record; while ((record = recordReceiver.getFromReader()) != null) { logger.info("======: " + record.toString()); } } @Override public void post() { super.post(); logger.info("task post"); } @Override public void destroy() { logger.info("task destroy"); } } }
步驟二:創建其他數據庫類型的離線自定義源類型
在Dataphin首頁,單擊頂部菜單欄管理中心 > 數據源管理。
在左側導航欄單擊自定義源類型。
在自定義源類型頁面中,單擊新建自定義源類型,下拉列表中選擇新建離線自定義源類型。
在新建離線自定義源類型頁面,配置參數。
參數
描述
基本配置
類型
選擇其他數據庫。
名稱
自定義組件的名稱。
支持中文、英文字母大小寫、下劃線(_)和數字。長度不超過64個字符。
類型編碼
組件的唯一標識。供后端使用,創建后不可編輯。
僅支持英文字母大小寫、數字和下劃線(_),且不能以數字開頭。
數據源JSON
填寫數據源JSON代碼及上傳讀寫插件:
填寫數據源JSON代碼,即定義數據源的配置項,代碼示例說明如下:
[ { "columnName": "url", "columnType": "NORMAL", "text": { "zh_CN": "鏈接地址", "en_US": "address", "zh_TW": "繁體" }, "placeholder": { "zh_CN": "請輸入鏈接地址", "en_US": "input address", "zh_TW": "繁體" } }, { "columnName": "username", "columnType": "NORMAL", "text": { "zh_CN": "用戶名", "en_US": "username", "zh_TW": "繁體" }, "placeholder": { "zh_CN": "請輸入用戶名", "en_US": "input username", "zh_TW": "繁體" } }, { "columnName": "password", "columnType": "ENCRYPT", "text": { "zh_CN": "密碼", "en_US": "password", "zh_TW": "繁體" }, "placeholder": { "zh_CN": "請輸入密碼", "en_US": "input password", "zh_TW": "繁體" } } ]
上述JSON示例中字段說明如下:
columnName:參數名稱。
columnType:參數類型分為NORMAL和ENCRYPT。密碼類的參數,參數類型需使用ENCRYPT。
placeholder:用戶輸入框中的默認值在三種語言情況下的輸出。若不填寫,則用戶輸入框為空。
text:參數名稱在三種語言情況下的輸出。
資源配置
讀寫插件
選擇需要配置的插件類型、填寫對應的ClassName(插件類名)并上傳步驟一中準備好的插件文件。插件文件僅支持.jar類型,文件大小不超過50MB。
說明讀寫插件需至少選擇一項進行配置。完成讀取插件和寫入插件的配置后,會生成對應的輸入插件和輸出插件。
描述信息
描述
對自定義的數據源的簡單描述。不超過128個字符。
單擊創建。