實(shí)時(shí)數(shù)據(jù)集成(Flink版)
物聯(lián)網(wǎng)平臺(tái)數(shù)據(jù)服務(wù)中的產(chǎn)品屬性時(shí)序表、產(chǎn)品事件表和自定義存儲(chǔ)表(時(shí)序表)數(shù)據(jù),可以集成到阿里云實(shí)時(shí)計(jì)算Flink版中計(jì)算和分析,以便您實(shí)時(shí)分析和診斷設(shè)備的運(yùn)行狀況,實(shí)時(shí)檢測(cè)運(yùn)行故障等。本文介紹使用實(shí)時(shí)計(jì)算Flink版的連接器功能集成物聯(lián)網(wǎng)平臺(tái)實(shí)例下數(shù)據(jù)服務(wù)中數(shù)據(jù)的完成流程。
前提條件
已完成實(shí)時(shí)計(jì)算Flink版服務(wù)開通和準(zhǔn)備工作。
如果您使用RAM用戶或RAM角色等身份訪問,需要確認(rèn)已具有Flink控制臺(tái)相關(guān)權(quán)限,詳情請(qǐng)參見權(quán)限管理。
已創(chuàng)建Flink工作空間,詳情請(qǐng)參見開通實(shí)時(shí)計(jì)算Flink版。
已開啟目標(biāo)產(chǎn)品存儲(chǔ)表(產(chǎn)品屬性時(shí)序表、產(chǎn)品事件表或自定義時(shí)序存儲(chǔ)表)數(shù)據(jù)的Flink任務(wù)。具體操作,請(qǐng)參見開啟產(chǎn)品屬性時(shí)序表或事件表的Flink任務(wù)或開啟自定義存儲(chǔ)表時(shí)序數(shù)據(jù)的Flink任務(wù)。
重要如果集成產(chǎn)品屬性時(shí)序表和產(chǎn)品事件表數(shù)據(jù),需已在物聯(lián)網(wǎng)平臺(tái)的數(shù)據(jù)服務(wù)中開啟目標(biāo)產(chǎn)品的備份。具體操作,請(qǐng)參見備份設(shè)備數(shù)據(jù)源。
背景信息
阿里云實(shí)時(shí)計(jì)算Flink版是一套基于Apache Flink構(gòu)建的?站式實(shí)時(shí)大數(shù)據(jù)分析平臺(tái),提供端到端亞秒級(jí)實(shí)時(shí)數(shù)據(jù)分析能力,并通過標(biāo)準(zhǔn)SQL降低業(yè)務(wù)開發(fā)門檻,助力企業(yè)向?qū)崟r(shí)化、智能化大數(shù)據(jù)計(jì)算升級(jí)轉(zhuǎn)型。詳細(xì)內(nèi)容,請(qǐng)參見阿里云實(shí)時(shí)計(jì)算Flink版。
實(shí)時(shí)計(jì)算Flink版服務(wù)集成物聯(lián)網(wǎng)平臺(tái)的產(chǎn)品屬性時(shí)序表數(shù)據(jù)、產(chǎn)品事件表數(shù)據(jù)和自定義存儲(chǔ)表時(shí)序數(shù)據(jù),需要在Flink SQL開發(fā)作業(yè)中使用SQL連接器連接物聯(lián)網(wǎng)平臺(tái)數(shù)據(jù)服務(wù)中對(duì)應(yīng)的時(shí)序數(shù)據(jù)表。Flink全托管產(chǎn)品支持您自定義連接器后上傳使用,物聯(lián)網(wǎng)平臺(tái)提供連接器iot-source
的使用說明,請(qǐng)參見實(shí)時(shí)數(shù)據(jù)集成的連接器。
使用限制
限制項(xiàng) | 說明 |
地域 | 華東2(上海)、華南1(深圳)、華北2(北京)、美國(guó)(弗吉尼亞)。 |
物聯(lián)網(wǎng)平臺(tái)企業(yè)版實(shí)例 | 標(biāo)準(zhǔn)型、尊享型。實(shí)例類型說明,請(qǐng)參見企業(yè)版實(shí)例不同類型和功能區(qū)別。 |
集成的物聯(lián)網(wǎng)平臺(tái)數(shù)據(jù)源 |
計(jì)費(fèi)說明
數(shù)據(jù)服務(wù)中實(shí)時(shí)數(shù)據(jù)集成流出會(huì)消耗數(shù)據(jù)處理單元(CU),詳細(xì)說明,請(qǐng)參見計(jì)費(fèi)項(xiàng)說明。
實(shí)時(shí)計(jì)算Flink版服務(wù)的計(jì)費(fèi)邏輯和詳細(xì)說明,請(qǐng)參見計(jì)費(fèi)概述。
使用流程
下載連接器(IoT Connector)文件:
iot-flink-connector-source.jar
。注冊(cè)連接器(IoT Connector):在阿里云實(shí)時(shí)計(jì)算控制臺(tái)的數(shù)據(jù)連接頁面,單擊創(chuàng)建自定義連接器,上傳已下載的連接器文件
iot-flink-connector-source.jar
,單擊完成。SQL作業(yè)開發(fā):在阿里云實(shí)時(shí)計(jì)算控制臺(tái)的SQL開發(fā)中新建流作業(yè)模板,編寫作業(yè)信息,進(jìn)行更多配置和深度檢查后,完成調(diào)試和部署。
本文以物聯(lián)網(wǎng)設(shè)備上傳物模型屬性通過Flink任務(wù)流轉(zhuǎn)到log文件為例,結(jié)合Print連接器接收并打印數(shù)據(jù)輸入記錄。編寫代碼如下:
重要創(chuàng)建SQL作業(yè)時(shí),F(xiàn)link的引擎版本建議選擇vvr-6.0.6-flink-1.15。更高版本的Flink集群可能出現(xiàn)JAR包沖突。
實(shí)際場(chǎng)景中,需修改連接器
iot-source
中WITH下參數(shù)為真實(shí)值。參數(shù)配置說明,請(qǐng)參見WITH參數(shù)說明。
CREATE TEMPORARY TABLE iot_source_test { productKey STRING, deviceName STRING, body STRING } with { 'connector'='iot-source', 'regionId'='cn-shanghai', 'accessId'='xxxxxxxx', 'accessKey'='xxxxxxxx', 'tableName'='product.xxxx', 'iotInstanceId'='iot-xxxx', 'clientId'='test', 'uid' = 'xxxxxxxx' }; CREATE TEMPORARY TABLE TEST_SINK( productKey STRING, deviceName STRING, body STRING ) WITH ( 'connector' = 'print', 'logger'='true' ); INSERT INTO TEST_SINK SELECT * FROM iot_source_test;
啟動(dòng)作業(yè):部署完成后,在作業(yè)運(yùn)維中啟動(dòng)作業(yè)。
查看作業(yè)性能:在作業(yè)運(yùn)維中,單擊運(yùn)行中作業(yè)名稱,進(jìn)入作業(yè)運(yùn)維詳情頁面,查看Flink計(jì)算結(jié)果。
運(yùn)行日志可在作業(yè)探查中的運(yùn)行Task Managers頁簽下的日志列表中查看。連接器輸出日志均寫入到
flink.log
中(目前日志會(huì)打印寫入數(shù)據(jù)和錯(cuò)誤信息)。
相關(guān)文檔
Flink SQL開發(fā)參考:Flink SQL是為了簡(jiǎn)化計(jì)算模型、降低您使用Flink門檻而設(shè)計(jì)的一套符合標(biāo)準(zhǔn)SQL語義的開發(fā)語言。了解Flink SQL使用方法,可幫助您在實(shí)時(shí)計(jì)算Flink版中分析處理數(shù)據(jù)。