盲區(qū)會(huì)車(chē)
在導(dǎo)航等場(chǎng)景中,由于某些路口彎道過(guò)大或存在盲區(qū),無(wú)法看到對(duì)面車(chē)道的車(chē)輛情況,存在事故風(fēng)險(xiǎn)。為了更好地規(guī)避交通事故,您可以使用Lindorm寬表引擎、Lindorm流引擎(實(shí)時(shí)計(jì)算),結(jié)合Apache Kafka,預(yù)先設(shè)置好盲區(qū)(地理圍欄),在檢測(cè)到其他車(chē)輛時(shí)及時(shí)向司機(jī)發(fā)出提醒。
場(chǎng)景描述
在盲區(qū)會(huì)車(chē)場(chǎng)景中:
可以將盲區(qū)、彎道存儲(chǔ)為地理圍欄(使用Polygon類(lèi)型的數(shù)據(jù)),地理圍欄數(shù)據(jù)一般不會(huì)頻繁更改。
車(chē)輛位置信息是實(shí)時(shí)上傳的,通常會(huì)按照指定的時(shí)間間隔更新當(dāng)前位置信息,這類(lèi)數(shù)據(jù)變化較頻繁。
示例:
當(dāng)編號(hào)為car10的車(chē)輛進(jìn)入指定的盲區(qū)時(shí),如果此時(shí)對(duì)面有其他車(chē)輛駛?cè)耄ū热鏲ar11),則會(huì)向car10的司機(jī)發(fā)出提醒。
技術(shù)實(shí)現(xiàn)
盲區(qū)會(huì)車(chē)涉及到地理圍欄數(shù)據(jù)(盲區(qū))、車(chē)輛實(shí)時(shí)位置數(shù)據(jù)以及最終的計(jì)算結(jié)果數(shù)據(jù)。
地理圍欄數(shù)據(jù)變化頻率不大,可將其存儲(chǔ)于Lindorm寬表引擎中。在計(jì)算過(guò)程中,流引擎會(huì)從寬表引擎中讀取這些數(shù)據(jù)。
車(chē)輛位置信息會(huì)實(shí)時(shí)上傳,采用Apache Kafka來(lái)收集車(chē)輛上傳的信息,并將其接入Lindorm流引擎參與計(jì)算。
最終的計(jì)算結(jié)果存儲(chǔ)到Lindorm寬表引擎中。
前提條件
已創(chuàng)建Lindorm實(shí)例并開(kāi)通寬表引擎和流引擎,詳情請(qǐng)參見(jiàn)創(chuàng)建實(shí)例。
已開(kāi)通Lindorm Ganos服務(wù),詳情請(qǐng)參見(jiàn)開(kāi)通時(shí)空服務(wù)(免費(fèi))。
已創(chuàng)建操作系統(tǒng)為L(zhǎng)inux的ECS實(shí)例,詳情請(qǐng)參見(jiàn)創(chuàng)建ECS實(shí)例。
說(shuō)明ECS實(shí)例需要與Lindorm實(shí)例所在地域相同,且屬于同一專(zhuān)有網(wǎng)絡(luò)。
ECS的操作系統(tǒng)鏡像建議使用Alibaba Cloud Linux。
步驟一:創(chuàng)建地理圍欄表和結(jié)果表
在寬表引擎中,創(chuàng)建地理圍欄表并插入示例數(shù)據(jù)。
創(chuàng)建地理圍欄表
area
。CREATE TABLE area(id INT, name VARCHAR, poly GEOMETRY, PRIMARY KEY(id));
說(shuō)明地理圍欄表中的
poly
列用于存儲(chǔ)地理圍欄數(shù)據(jù)。插入示例數(shù)據(jù)。
INSERT INTO area(id,name,poly) VALUES (1,'cross1',ST_GeomFromText('POLYGON ((116.4234 39.9147, 116.4234 39.9337, 116.4329 39.9337, 116.4329 39.9147, 116.4234 39.9147))')), (2,'cross2',ST_GeomFromText('POLYGON ((116.3979 39.9035, 116.3978 39.8938, 116.4184 39.8938, 116.4184 39.9035, 116.3979 39.9035))'));
創(chuàng)建結(jié)果表。
針對(duì)盲區(qū)會(huì)車(chē)場(chǎng)景,創(chuàng)建結(jié)果表
warns
:CREATE TABLE warns(cid INT, ts TIMESTAMP, alert BOOLEAN, PRIMARY KEY (cid,ts));
說(shuō)明結(jié)果表中,
cid
表示車(chē)輛編號(hào);ts
表示收到通知的時(shí)間;alter
表示通知結(jié)果,取值為true時(shí)表示前方對(duì)面車(chē)道有車(chē)輛。
步驟二:接入流數(shù)據(jù)
本示例將通過(guò)開(kāi)源Kafka腳本工具連接Lindorm流引擎。
連接Lindorm流引擎,并創(chuàng)建名為
logVehicle
的Topic。詳情請(qǐng)參見(jiàn)通過(guò)開(kāi)源Kafka腳本工具連接Lindorm流引擎。執(zhí)行如下命令,將示例數(shù)據(jù)寫(xiě)入到已創(chuàng)建的Topic中。
./bin/kafka-console-producer.sh --bootstrap-server <Lindorm Stream Kafka地址> --topic logVehicle < testcar.txt
說(shuō)明testcar.txt
為本文提供的示例數(shù)據(jù),您需要將其下載并上傳到開(kāi)源Kafka腳本工具的根目錄下。您可以使用
./bin/kafka-console-consumer.sh --bootstrap-server <Lindorm Stream Kafka地址> --topic logVehicle --from-beginning
命令,查看數(shù)據(jù)是否成功寫(xiě)入。
步驟三:提交流引擎計(jì)算任務(wù)
使用Flink SQL提交Lindorm流引擎計(jì)算任務(wù),讀取開(kāi)源Kafka topic中的數(shù)據(jù)并結(jié)合寬表數(shù)據(jù)做計(jì)算。
連接到Lindorm流引擎,詳情請(qǐng)參見(jiàn)步驟二:安裝流引擎客戶端。
創(chuàng)建盲區(qū)會(huì)車(chē)計(jì)算任務(wù)。
在如下的示例任務(wù)中,判斷編號(hào)為10的車(chē)輛進(jìn)入指定的盲區(qū)時(shí),盲區(qū)中是否有其他車(chē)輛。任務(wù)流程如下:
加載
ganos
函數(shù)模塊。創(chuàng)建數(shù)據(jù)源表
drive
、數(shù)據(jù)維表area
、結(jié)果表warns
。創(chuàng)建流任務(wù),使用
count
函數(shù)及geomHint
函數(shù)過(guò)濾數(shù)據(jù),判斷編號(hào)為10的車(chē)輛進(jìn)入指定的盲區(qū)時(shí),盲區(qū)中是否有其他車(chē)輛。
重要請(qǐng)將示例程序中的參數(shù)值替換為實(shí)際的值,您可以在Lindorm實(shí)例的數(shù)據(jù)庫(kù)連接中查看參數(shù)值,詳情請(qǐng)參見(jiàn)查看連接地址。
seedServer
:Lindorm寬表引擎SQL地址的專(zhuān)有網(wǎng)絡(luò)地址,并將端口改為30020。格式為ld-****-proxy-lindorm.lindorm.rds.aliyuncs.com:30020
。password
:Lindorm寬表引擎的默認(rèn)密碼。properties.bootstrap.servers
:Kafka的連接地址,請(qǐng)使用流引擎的Lindorm Stream Kafka專(zhuān)有網(wǎng)絡(luò)地址。格式為ld-****-proxy-stream.lindorm.rds.aliyuncs.com:30080
。
示例程序:
CREATE FJOB warnCar1( LOAD MODULE ganos; -- create danger area table CREATE TABLE area( id INT, name VARCHAR, poly GEOMETRY, PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector'='lindorm', 'seedServer'='ld-****-proxy-lindorm.lindorm.rds.aliyuncs.com:30020', 'userName'='root', 'password'='your_password', 'tableName'='area', 'namespace'='default' ); -- create stream table CREATE TABLE drive( cid INT, ts TIMESTAMP(0), lng DOUBLE, lat DOUBLE, pt AS proctime(), WATERMARK FOR ts AS ts ) WITH ( 'connector'='kafka', 'topic'='logVehicle', 'scan.startup.mode'='earliest-offset', 'properties.bootstrap.servers'='ld-****-proxy-stream.lindorm.rds.aliyuncs.com:30080', 'format'='json' ); -- create result table CREATE TABLE warns( cid INT, ts TIMESTAMP(0), alert BOOLEAN, PRIMARY KEY(cid,ts) NOT ENFORCED ) WITH ( 'connector'='lindorm', 'seedServer'='ld-****-proxy-lindorm.lindorm.rds.aliyuncs.com:30020', 'userName'='root', 'password'='your_password', 'tableName'='warns', 'namespace'='default' ); -- warns car 10 if there's other car enter the danger zone INSERT INTO warns SELECT 10 AS cid, A.ts AS ts, IF(count(DISTINCT A.cid)>=1,TRUE,FALSE) AS alert FROM drive AS A JOIN area /*+ OPTIONS('geomHint'='poly:st_contains','geomIndex'='true','cacheTTLMs'='180000') */ FOR SYSTEM_TIME AS OF A.pt AS B ON B.poly=ST_MakePoint(A.lng, A.lat) WHERE A.cid<>10 AND A.lng IS NOT NULL AND A.lat IS NOT NULL GROUP BY A.ts; );
步驟四:查看結(jié)果
在Lindorm寬表中,通過(guò)SQL語(yǔ)句查看對(duì)應(yīng)的結(jié)果。
查詢結(jié)果表
warns
表中的數(shù)據(jù)。SELECT * FROM warns;
返回結(jié)果如下,編號(hào)為10的車(chē)輛分別在下列時(shí)間點(diǎn)收到了會(huì)車(chē)警告。
+-----+-------------------------------+-------+ | cid | ts | alert | +-----+-------------------------------+-------+ | 10 | 2008-02-02 21:33:23 +0000 UTC | true | | 10 | 2008-02-02 21:33:33 +0000 UTC | true | | 10 | 2008-02-02 21:39:53 +0000 UTC | true | | 10 | 2008-02-02 21:41:03 +0000 UTC | true | | 10 | 2008-02-02 21:41:13 +0000 UTC | true | | 10 | 2008-02-02 21:41:23 +0000 UTC | true | | 10 | 2008-02-02 21:43:43 +0000 UTC | true | +-----+-------------------------------+-------+