電子圍欄
電子圍欄也被稱為電子警報(bào)系統(tǒng),多用于城市規(guī)劃、交通調(diào)度等數(shù)據(jù)量大、實(shí)時(shí)性更強(qiáng)的場(chǎng)景。如果您需要實(shí)時(shí)調(diào)度和管理車(chē)輛,可以依據(jù)本方案的操作步驟,使用Lindorm Ganos時(shí)空函數(shù)計(jì)算和判斷車(chē)輛實(shí)時(shí)位置和地理圍欄的關(guān)系,實(shí)現(xiàn)對(duì)車(chē)輛的精確監(jiān)控。
背景信息
在車(chē)輛管理場(chǎng)景中,車(chē)輛電子圍欄可以用于監(jiān)控車(chē)輛的行駛范圍、判斷車(chē)輛是否偏離預(yù)定路線等,幫助管理人員更好地進(jìn)行車(chē)輛調(diào)度和管理。
通常管理人員會(huì)預(yù)先設(shè)置好若干個(gè)區(qū)域,簡(jiǎn)稱地理圍欄,由于該區(qū)域不是實(shí)時(shí)變化的,可以使用外表的形式存儲(chǔ)到Lindorm寬表中。而車(chē)輛的實(shí)時(shí)數(shù)據(jù)(坐標(biāo)信息)是實(shí)時(shí)上傳的,可以將車(chē)輛的實(shí)時(shí)數(shù)據(jù)存儲(chǔ)到Kafka中,再由Lindorm流引擎通過(guò)訂閱Kafka中的實(shí)時(shí)數(shù)據(jù),實(shí)時(shí)計(jì)算多個(gè)車(chē)輛位置和地理圍欄之間的關(guān)系。
前提條件
已開(kāi)通Lindorm Ganos時(shí)空服務(wù)。如何開(kāi)通,請(qǐng)參見(jiàn)開(kāi)通時(shí)空服務(wù)(免費(fèi))。
已開(kāi)通Lindorm流引擎。
說(shuō)明請(qǐng)加入釘釘群(群號(hào):78080001631),申請(qǐng)流引擎開(kāi)通權(quán)限。
已將客戶端IP地址添加至Lindorm白名單。如何添加,請(qǐng)參見(jiàn)設(shè)置白名單。
已安裝Java環(huán)境,要求安裝JDK 1.8及以上版本。
注意事項(xiàng)
本實(shí)踐將Lindorm客戶端部署在ECS實(shí)例上,并通過(guò)專(zhuān)有網(wǎng)絡(luò)進(jìn)行數(shù)據(jù)訪問(wèn)。
通過(guò)專(zhuān)有網(wǎng)絡(luò)訪問(wèn)Lindorm實(shí)例前,需要確保您的Lindorm實(shí)例和ECS實(shí)例滿足以下條件,以保證網(wǎng)絡(luò)的連通性。
所在地域相同,并建議所在可用區(qū)相同(以減少網(wǎng)絡(luò)延時(shí))。
ECS實(shí)例與Lindorm實(shí)例屬于同一專(zhuān)有網(wǎng)絡(luò)。
步驟一:創(chuàng)建地理圍欄表和結(jié)果表
在寬表引擎中創(chuàng)建地理圍欄表和結(jié)果表,分別用于保存地理圍欄數(shù)據(jù)和計(jì)算結(jié)果。
創(chuàng)建地理圍欄表并插入示例數(shù)據(jù)。
創(chuàng)建地理圍欄表
regions
,包含三個(gè)字段:rID(區(qū)域ID)、rName(區(qū)域名稱)、fence(地理圍欄范圍)。CREATE TABLE regions(rID INT, rName VARCHAR, fence GEOMETRY, PRIMARY KEY(rID));
插入地理圍欄數(shù)據(jù),包括3個(gè)區(qū)域,分別命名為SoHo、Chinatown和Tribeca。
INSERT INTO regions(rID, rName, fence) VALUES (1, 'SoHo', ST_GeomFromText('POLYGON((-74.00279525078275 40.72833625216264,-74.00547745979765 40.721929158663244,-74.00125029839018 40.71893680218994,-73.9957785919998 40.72521409075776,-73.9972377137039 40.72557184584898,-74.00279525078275 40.72833625216264))')), (2, 'Chinatown', ST_GeomFromText('POLYGON((-73.99712367114876 40.71281582267133,-73.9901070123658 40.71336881907936,-73.99023575839851 40.71452359088633,-73.98976368961189 40.71554823078944,-73.99551434573982 40.717337246783735,-73.99480624255989 40.718491949759304,-73.99652285632942 40.719109951574,-73.99776740131233 40.7168005470334,-73.99903340396736 40.71727219249899,-74.00193018970344 40.71938642421256,-74.00409741458748 40.71688186545551,-74.00051398334358 40.71517415773184,-74.0004281526551 40.714377212470005,-73.99849696216438 40.713450141693166,-73.99748845157478 40.71405192594819,-73.99712367114876 40.71281582267133))')), (3, 'Tribeca', ST_GeomFromText('POLYGON((-74.01091641815208 40.72583120006787,-74.01338405044578 40.71436586362705,-74.01370591552757 40.713617702123415,-74.00862044723533 40.711308107057235,-74.00194711120628 40.7194238654018,-74.01091641815208 40.72583120006787))'));
創(chuàng)建結(jié)果表
fresult
。CREATE TABLE fresult(uID VARCHAR, rName VARCHAR, rID INT, PRIMARY KEY (uID));
步驟二:寫(xiě)入流數(shù)據(jù)
您可以通過(guò)開(kāi)源客戶端或腳本工具連接Lindorm流引擎并寫(xiě)入測(cè)試數(shù)據(jù)。
以通過(guò)開(kāi)源腳本工具寫(xiě)入為例。
下載并安裝腳本工具。具體操作,請(qǐng)參見(jiàn)通過(guò)開(kāi)源腳本工具連接Lindorm流引擎。
創(chuàng)建名為
log_topic
的Topic。./bin/kafka-topics.sh --bootstrap-server <Lindorm Stream連接地址> --topic log_topic --create
Lindorm Stream連接地址獲取方式如下:
將測(cè)試數(shù)據(jù)寫(xiě)入Topic中,使用組合鍵
Ctrl+C
可終止寫(xiě)入。./bin/kafka-console-producer.sh --bootstrap-server <Lindorm Stream連接地址> --topic log_topic {"uID": "A", "x":"-74.00035", "y": "40.72432"} {"uID": "B", "x":"-74.00239", "y": "40.71692"} {"uID": "C", "x":"-74.00201", "y": "40.72563"} {"uID": "D", "x":"-74.00158", "y": "40.72412"} {"uID": "E", "x":"-73.99836", "y": "40.71588"} {"uID": "F", "x":"-74.01015", "y": "40.71422"} {"uID": "G", "x":"-73.99183", "y": "40.71451"} {"uID": "H", "x":"-73.99595", "y": "40.71773"}
您可以使用
./bin/kafka-console-consumer.sh --bootstrap-server <Lindorm Stream連接地址> --topic log_topic --from-beginning
命令,查看數(shù)據(jù)是否成功寫(xiě)入。
步驟三:提交計(jì)算任務(wù)
使用Flink SQL提交計(jì)算任務(wù),讀取Topic中的數(shù)據(jù),并結(jié)合地理圍欄數(shù)據(jù)進(jìn)行計(jì)算。
安裝流引擎客戶端。
在ECS上執(zhí)行以下命令,下載流引擎客戶端壓縮包。
wget https://hbaseuepublic.oss-cn-beijing.aliyuncs.com/lindorm-sqlline-2.0.2.tar.gz
執(zhí)行以下命令,解壓壓縮包。
tar zxvf lindorm-sqlline-2.0.2.tar.gz
進(jìn)入
lindorm-sqlline-2.0.2/bin
目錄,執(zhí)行以下命令連接至Lindorm流引擎。./lindorm-sqlline -url <Lindorm Stream SQL地址>
提交計(jì)算任務(wù)。
計(jì)算任務(wù)具體步驟如下:
加載
ganos
函數(shù)模塊。在Flink Job中創(chuàng)建三張表:數(shù)據(jù)源表
carData
、數(shù)據(jù)維表regions
、數(shù)據(jù)結(jié)果表fresult
,通過(guò)設(shè)置連接器參數(shù),分別關(guān)聯(lián)已創(chuàng)建的Topic、地理圍欄表regions
和結(jié)果表fresult
。創(chuàng)建流任務(wù),使用關(guān)系函數(shù)
ST_Contains
進(jìn)行過(guò)濾,計(jì)算實(shí)時(shí)數(shù)據(jù)所屬的地理圍欄區(qū)域,并將計(jì)算結(jié)果寫(xiě)入結(jié)果表fresult
。
示例代碼如下:
CREATE FJOB fenceFilter ( LOAD MODULE ganos; -- Enable parallelism SET 'parallelism.default'='12'; -- Create stream table CREATE TABLE carData(`uID` STRING, `x` DOUBLE, `y` DOUBLE, `proctime` AS PROCTIME() ) WITH ( 'connector'='kafka', 'topic'='log_topic', 'scan.startup.mode'='earliest-offset', 'properties.bootstrap.servers'='<Lindorm Stream連接地址>', 'format'='json' ); -- Create area table CREATE TABLE regions ( `rID` INT, `rName` STRING, `fence` GEOMETRY, PRIMARY KEY (`rID`) NOT ENFORCED ) WITH ( 'connector'='lindorm', 'seedServer'='<Lindorm寬表HBase Java API訪問(wèn)地址>', 'userName'='<寬表引擎用戶名>', 'password'='<寬表引擎密碼>', 'tableName'='regions', 'namespace'='default' ); -- Create result table CREATE TABLE fresult ( `uID` STRING, `rName` STRING, `rID` INT, PRIMARY KEY (`uID`) NOT ENFORCED ) WITH ( 'connector'='lindorm', 'seedServer'='<Lindorm寬表HBase Java API訪問(wèn)地址>', 'userName'='<寬表引擎用戶名>', 'password'='<寬表引擎密碼>', 'tableName'='fresult', 'namespace'='default' ); -- Find the area that car located INSERT INTO fresult SELECT A.uID, B.rName, B.rID FROM carData AS A JOIN regions /*+ OPTIONS('geomHint'='fence:st_contains','geomIndex'='true','cacheTTLMs'='1800000') */ FOR SYSTEM_TIME AS OF A.proctime AS B ON B.fence=ST_MakePoint(A.x,A.y); );
其中,Lindorm寬表HBase Java API訪問(wèn)地址、寬表引擎用戶名及密碼的獲取方式如下:
步驟四:查看結(jié)果
執(zhí)行以下語(yǔ)句,查看計(jì)算結(jié)果。
SELECT * FROM fresult;
返回結(jié)果:
+-----+-----------+-----+ | uID | rName | rID | +-----+-----------+-----+ | A | SoHo | 1 | | B | Chinatown | 2 | | C | SoHo | 1 | | D | SoHo | 1 | | E | Chinatown | 2 | | F | Tribeca | 3 | | G | Chinatown | 2 | | H | Chinatown | 2 | +-----+-----------+-----+
返回結(jié)果顯示各個(gè)車(chē)輛當(dāng)前屬于哪個(gè)區(qū)域,例如車(chē)輛A目前位于區(qū)域1,即SoHo。