自定義實(shí)時(shí)數(shù)據(jù)源進(jìn)行實(shí)時(shí)開發(fā)示例
本文通過示例的方式為您介紹在Dataphin中如何使用自定義實(shí)時(shí)數(shù)據(jù)源進(jìn)行實(shí)時(shí)開發(fā)。
操作步驟
步驟一:創(chuàng)建實(shí)時(shí)數(shù)據(jù)源
在Dataphin首頁的頂部菜單欄中,選擇管理中心 > 數(shù)據(jù)源管理。
在左側(cè)導(dǎo)航欄中,選擇數(shù)據(jù)源 > 自定義源類型。
在自定義源類型頁面中,單擊新建自定義源類型,選擇新建實(shí)時(shí)自定義源類型,打開新建實(shí)時(shí)自定義源類型頁面。
在新建實(shí)時(shí)自定義源類型頁面,配置以下參數(shù)。
參數(shù)
模式
類型名稱
填寫Oracle_cdc_test。
類型編碼
填寫Oracle_cdc_test。
JAR包
上傳數(shù)據(jù)源類型連接器的JAR包。本案例以社區(qū)版本的Oracle CDC Connector為例。JAR包下載,請(qǐng)參見Oracle | Apache Flink CDC。
配置文件
填寫以下配置文件:
說明若使用其他JAR包,請(qǐng)進(jìn)行對(duì)應(yīng)修改。
#delclare the use type of this connector kind: source # the name of this connector # if you use vvp flink,you also need to follow this match pattern [a-z0-9]([-a-z0-9\.]*[a-z0-9])? connector: oracle-cdc # for example, hostname is one property of this connector, you can see it in with clause # when isSensitive is true, usually used for password, it is mean that the value of your key will be encrypted in page # defaultValue is the default value of this key # when isRequired is true, you must set value to the key, and the key in page is marked begin with start * # when module is datasource it is mean that you can set the config in datasource page # when module is table it is mean that you can set the config in table page hostname: isSensitive: false isRequired: true module: datasource port: isSensitive: false isRequired: true module: datasource username: isSensitive: false isRequired: true module: datasource password: isSensitive: true isRequired: true module: datasource database-name: isSensitive: false isRequired: true module: table schema-name: isSensitive: false isRequired: true module: table table-name: isSensitive: false isRequired: true module: table table-name: isSensitive: false isRequired: true defaultValue: latest-offset module: table #format define the input or sink format of the data, it always with module table
配置項(xiàng)說明:
配置文件的配置項(xiàng)參數(shù),在系統(tǒng)執(zhí)行時(shí),最終會(huì)變成Flink DDL中的WITH參數(shù)。如下所示:
CREATE TABLE MyUserTable ( ID INT NOT NULL, NAME STRING, DESCRIPTION STRING, WEIGHT DECIMAL(10, 3), PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector' = 'oracle-cdc', 'hostname' = 'localhost', 'port' = '1521', 'username' = 'Dataphin', 'password' = 'fli*****ssword', 'database-name' = 'ORCLCDB', 'schema-name' = 'dataphin', 'table-name' = 'myusertable');
描述
填寫自定義實(shí)時(shí)數(shù)據(jù)源測(cè)試。
單擊確定,完成自定義實(shí)時(shí)數(shù)據(jù)源類型。
步驟二:自定義Ververica Flink Connector
若您的Dataphin租戶使用開源Flink實(shí)時(shí)計(jì)算引擎,則無需進(jìn)行此步驟操作。更多信息,請(qǐng)參見自定義Ververica Flink Connector說明。
單擊目標(biāo)工作空間操作列下的控制臺(tái)。
在左側(cè)導(dǎo)航欄,單擊連接器。
在連接器頁面,單擊創(chuàng)建自定義連接器。
上傳自定義連接器JAR文件。
您可以通過以下任何一種方式上傳自定義連接器JAR文件:
上傳文件:?jiǎn)螕?b data-tag="uicontrol" id="uicontrol-fuu-rqg-vyu" class="uicontrol">選擇文件后,選擇您的目標(biāo)連接器JAR文件。
外部URL:當(dāng)需要使用其他服務(wù)上存在的JAR文件時(shí),可以使用外部URL功能獲取JAR文件。例如,
https://ossbucket/artifacts/namespaces/flink-default/flink-jobs-1.0-SNAPSHOT.jar
說明僅支持以下兩類外部URL:
開通Flink工作空間時(shí)選擇的OSS Bucket地址。您可以在實(shí)時(shí)計(jì)算管理控制臺(tái)目標(biāo)工作空間詳情中查看綁定的OSS Bucket。
實(shí)時(shí)計(jì)算Flink版可以訪問且被允許訪問(公共讀或被授予權(quán)限)的其他外部存儲(chǔ)系統(tǒng)地址。
上傳完成后,單擊下一步。
系統(tǒng)會(huì)對(duì)您上傳的自定義連接器內(nèi)容進(jìn)行解析。如果解析成功,您可以繼續(xù)下一步。如果解析失敗,請(qǐng)確認(rèn)您上傳的自定義連接器代碼是否符合Flink社區(qū)標(biāo)準(zhǔn)。
單擊完成。
創(chuàng)建完成的自定義連接器會(huì)出現(xiàn)在連接器列表中。
說明WITH參數(shù)中的連接器取值為您自定義連接器JAR包中DynamicTableFactory的identifier參數(shù)取值,其他WITH參數(shù)及含義詳情由您開發(fā)的自定義連接器決定。
步驟三:創(chuàng)建自定義的實(shí)時(shí)數(shù)據(jù)源
完成實(shí)時(shí)數(shù)據(jù)源的創(chuàng)建后,Dataphin數(shù)據(jù)源管理將會(huì)生成自定義的實(shí)時(shí)數(shù)據(jù)源。您可以在數(shù)據(jù)源管理配置實(shí)時(shí)數(shù)據(jù)源的連接信息,將數(shù)據(jù)源接入Dataphin,以支持后續(xù)的元表創(chuàng)建。
在Dataphin首頁的頂部菜單欄中,選擇管理中心 > 數(shù)據(jù)源管理。
在數(shù)據(jù)源頁面,單擊+新建數(shù)據(jù)源,打開新建數(shù)據(jù)源對(duì)話框。
在新建數(shù)據(jù)源對(duì)話框的自定義數(shù)據(jù)源區(qū)域,選擇Oracle_cdc_test。
在新建Oracle-cdc-test數(shù)據(jù)源對(duì)話框中,配置連接數(shù)據(jù)源參數(shù)。
自定義實(shí)時(shí)數(shù)據(jù)源的配置項(xiàng)由步驟一中的YAML配置文件決定,YAML配置文件的不同配置參數(shù),會(huì)導(dǎo)致數(shù)據(jù)源界面出現(xiàn)不同的選項(xiàng),并且選項(xiàng)的展示形式也有所不同。
參數(shù)
描述
數(shù)據(jù)源名稱
填寫Oracle-cdc-test。
數(shù)據(jù)源編碼
填寫Oracle_cdc_test。
數(shù)據(jù)源描述
填寫自定義實(shí)時(shí)數(shù)據(jù)源接入測(cè)試。
數(shù)據(jù)源配置
選擇生產(chǎn)數(shù)據(jù)源。
標(biāo)簽
默認(rèn)為空。
hostname
填寫Oracle服務(wù)的主機(jī)地址。
port
填寫Oracle服務(wù)的端口。
username
填寫Oracle服務(wù)的用戶名。
password
填寫Oracle服務(wù)的密碼。
單擊確定,完成自定義數(shù)據(jù)源Oracle-cdc-test的創(chuàng)建。
說明自定義Connector(實(shí)時(shí)數(shù)據(jù)源)不支持連通性檢查。
步驟四:新建自定義數(shù)據(jù)源的元表
在數(shù)據(jù)源管理配置自定義數(shù)據(jù)源的連接信息后,您可以使用配置的數(shù)據(jù)源創(chuàng)建元表以支持后續(xù)的實(shí)時(shí)開發(fā)。
在Dataphin首頁的頂部菜單欄中,選擇研發(fā) > 數(shù)據(jù)開發(fā)。
在頂部菜單欄選擇項(xiàng)目(Dev-Prod 模式需要選擇環(huán)境)。
在左側(cè)導(dǎo)航欄中選擇數(shù)據(jù)處理 > 表管理。
在右側(cè)表管理列表中,單擊新建圖標(biāo),選擇實(shí)時(shí)計(jì)算表,打開新建表對(duì)話框。
在新建表對(duì)話框,配置參數(shù)。
元表中的配置項(xiàng)由步驟一中的YAML配置文件決定,YAML配置文件的不同配置參數(shù),會(huì)導(dǎo)致元表出現(xiàn)不同的選項(xiàng),并且選項(xiàng)的展示形式也有不同。
參數(shù)
描述
表類型
選擇元表。
元表名稱
填寫Oracle_cdc_test。
數(shù)據(jù)源
選擇創(chuàng)建的Oracle-cdc-test。
database-name
填寫數(shù)據(jù)庫名稱。
schema-name
填寫schema。
table-name
填寫數(shù)據(jù)表名稱。
選擇目錄
默認(rèn)實(shí)時(shí)計(jì)算表。
描述
填寫自定義實(shí)時(shí)數(shù)據(jù)源的元表測(cè)試。
單擊確定,即可完成元表的創(chuàng)建。
完成元表創(chuàng)建后,Dataphin會(huì)自動(dòng)組裝元表的Flink SQL。代碼中的WITH參數(shù)的參數(shù)名是在YAML配置中定義的;具體的參數(shù)值,是在數(shù)據(jù)源頁面和元表頁面配置的;代碼中的字段是在元表結(jié)構(gòu)配置的。最終生成元表的Oracle_cdc_test代碼如下:
create table oracle_cdc_test ( `id` INT comment '', `name` VARCHAR comment '', PRIMARY KEY(id) NOT ENFORCED ) with ( 'hostname'='47.***.***.217' ,'connector'='oracle-cdc' ,'port'='1511' ,'database-name'='dataphin' ,'schema-name'='dataphin' ,'table-name'='dataphin-tables' ,'username'='flink_demo' );