Lindorm流引擎提供了100%兼容Flink SQL的能力。您可以將原始數據存儲在Kafka Topic,并通過Flink SQL在流引擎中創建實時計算任務,對原始數據進行高效計算和處理。本文介紹如何使用Flink SQL提交流引擎計算任務將Kafka Topic中的數據導入至Lindorm寬表。
前提條件
注意事項
如果您的應用部署在ECS實例,且想要通過專有網絡訪問Lindorm實例,則需要確保Lindorm實例和ECS實例滿足以下條件,以保證網絡的連通性。
所在地域相同,并建議所在可用區相同(以減少網絡延時)。
ECS實例與Lindorm實例屬于同一專有網絡。
操作步驟
步驟一:數據準備
通過Kafka API將源數據寫入Kafka Topic。共支持以下兩種寫入方式:
以通過開源Kafka腳本工具寫入數據為例。
#創建Topic ./kafka-topics.sh --bootstrap-server <Lindorm Stream Kafka地址> --topic log_topic --create #寫入數據 ./kafka-console-producer.sh --bootstrap-server <Lindorm Stream Kafka地址> --topic log_topic {"loglevel": "INFO", "thread":"thread-1", "class": "com.alibaba.stream.test", "detail":"thread-1 info detail", "timestamp": "1675840911549"} {"loglevel": "ERROR", "thread":"thread-2", "class": "com.alibaba.stream.test", "detail":"thread-2 error detail", "timestamp": "1675840911549"} {"loglevel": "WARN", "thread":"thread-3", "class": "com.alibaba.stream.test", "detail":"thread-3 warn detail", "timestamp": "1675840911549"} {"loglevel": "ERROR", "thread":"thread-4", "class": "com.alibaba.stream.test", "detail":"thread-4 error detail", "timestamp": "1675840911549"}
Lindorm Stream Kafka地址的獲取方式請參見查看連接地址。
在寬表引擎中創建結果表,用于存儲計算任務的處理結果。
通過Lindorm-cli連接寬表引擎。如何連接,請參見通過Lindorm-cli連接并使用寬表引擎。
創建結果表
log
。CREATE TABLE IF NOT EXISTS log ( loglevel VARCHAR, thread VARCHAR, class VARCHAR, detail VARCHAR, timestamp BIGINT, primary key (loglevel, thread) );
步驟二:安裝流引擎客戶端
在ECS上執行以下命令,下載流引擎客戶端壓縮包。
wget https://hbaseuepublic.oss-cn-beijing.aliyuncs.com/lindorm-sqlline-2.0.2.tar.gz
執行以下命令,解壓壓縮包。
tar zxvf lindorm-sqlline-2.0.2.tar.gz
進入
lindorm-sqlline-2.0.2/bin
目錄,執行以下命令連接至Lindorm流引擎。./lindorm-sqlline -url <Lindorm Stream SQL地址>
Lindorm Stream SQL地址的獲取方式,請參見查看連接地址。
步驟三:在流引擎中提交計算任務
示例計算任務的具體實現如下:
創建名為log_to_lindorm的Flink Job,并在Flink Job中創建兩張表:originalData(Source表)和lindorm_log_table(Sink表),分別關聯已創建的Kafka Topic和結果表log。
創建流任務,過濾掉loglevel為ERROR的日志,將過濾結果寫入結果表log中。
具體代碼如下:
CREATE FJOB log_to_lindorm(
--Kafka Source表
CREATE TABLE originalData(
`loglevel` VARCHAR,
`thread` VARCHAR,
`class` VARCHAR,
`detail` VARCHAR,
`timestamp` BIGINT
)WITH(
'connector'='kafka',
'topic'='log_topic',
'scan.startup.mode'='earliest-offset',
'properties.bootstrap.servers'='Lindorm Stream Kafka地址',
'format'='json'
);
--Lindorm寬表 Sink表
CREATE TABLE lindorm_log_table(
`loglevel` VARCHAR,
`thread` VARCHAR,
`class` VARCHAR,
`detail` VARCHAR,
`timestamp` BIGINT,
PRIMARY KEY (`loglevel`, `thread`) NOT ENFORCED
)WITH(
'connector'='lindorm',
'seedServer'='Lindorm寬表引擎的HBase兼容地址',
'userName'='root',
'password'='test',
'tableName'='log',
'namespace'='default'
);
--過濾Kafka中的ERROR日志,將結果寫入Lindorm寬表
INSERT INTO lindorm_log_table SELECT * FROM originalData WHERE loglevel = 'ERROR';
);
Lindorm寬表引擎的HBase兼容地址的獲取方式,請參見查看連接地址。
寬表連接器(Connector)的詳細說明,請參見配置流引擎的寬表連接器。
步驟四:查詢流引擎處理結果
支持以下兩種查詢方式:
通過Lindorm-cli連接寬表引擎并執行以下命令查詢處理結果。
SELECT * FROM log LIMIT 5;
返回結果:
+----------+----------+-------------------------+-----------------------+---------------+ | loglevel | thread | class | detail | timestamp | +----------+----------+-------------------------+-----------------------+---------------+ | ERROR | thread-2 | com.alibaba.stream.test | thread-2 error detail | 1675840911549 | | ERROR | thread-4 | com.alibaba.stream.test | thread-4 error detail | 1675840911549 | +----------+----------+-------------------------+-----------------------+---------------+
通過寬表引擎的集群管理系統查詢處理結果,具體操作請參見數據查詢。