本文介紹如何通過Lindorm計算引擎訪問Kafka數據。您可以將Kafka實例中的數據加載到Lindorm計算引擎中,結合Lindorm計算引擎的其他數據源數據,完成復雜的數據生產作業。
前提條件
操作步驟
啟動Beeline,并訪問Lindorm計算引擎服務。具體操作,請參見使用Beeline訪問JDBC服務。
登錄云消息隊列 Kafka 版控制臺,獲取訪問Kafka數據的連接信息。
在實例詳情頁面的接入點信息區域,獲取Kafka實例的域名接入點。
在Topic管理頁面,獲取Kafka實例的Topic名稱。
在Lindorm計算引擎中創建Kafka Topic對應的Hive臨時表。
CREATE TEMPORARY TABLE kafka_tbl USING kafka OPTIONS(table_options);
USING kafka
中Kafka為指定數據源。Kafka表無需指定Schema,Lindorm計算引擎提供默認的Kafka Schema,不可修改,Schema信息如下:列名稱
數據類型
說明
key
binary
該記錄在Kafka中的Key信息。
value
binary
該記錄在Kafka中的Value信息。
topic
string
該記錄所屬的Kafka Topic。
partition
int
該記錄所屬的Partition。
offset
bigint
該記錄在Partition的offset。
timestamp
timestamp
該記錄的時間戳。
timestampType
int
該記錄時間戳的類型:
0:CreateTime。
1:LogAppendTime。
詳細信息,請參見timestampType。
table_options
參數說明:參數
是否必選
說明
示例值
kafka.bootstrap.servers
是
Kafka實例的域名接入點。
alikafka-pre-cn-x0r34a20****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-3-vpc.alikafka.aliyuncs.com:9092
subscribe
是
Kafka實例的Topic名稱。
topic_to_read
startingTimestamp
是
待訪問Topic數據片段開始時間的時間戳,單位為毫秒。
說明您可以使用Spark SQL unix_timestamp函數將時間轉換為UNIX時間戳。
1682406000000
endingTimestamp
是
待訪問Topic數據片段結束時間的時間戳,單位為毫秒。
說明您可以使用Spark SQL unix_timestamp函數將時間轉換為UNIX時間戳。
1682409600000
startingOffsetsByTimestampStrategy
否
如果Kafka實例中部分Partition中沒有數據,需要添加該參數。
latest
創建Kafka表支持多種類型參數,更多參數請參考:Structured Streaming + Kafka Integration Guide。
示例:
CREATE TEMPORARY TABLE kafka_tbl USING kafka OPTIONS( "kafka.bootstrap.servers"="alikafka-pre-cn-x0r34a20****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-3-vpc.alikafka.aliyuncs.com:9092", "subscribe"="topic_to_read", "startingTimestamp"=1682406000000, "endingTimestamp"=1682409600000);
查詢Kafka表的數據。
查詢表Kafka_tbl中Schema的數據。
DESCRIBE kafka_tbl;
查詢表Kafka_tbl中的數據。
SELECT * FROM kafka_tbl LIMIT 10;
使用Spark函數提取Kafka中的數據。例如,查詢表Kafka_tbl中Value為
{"content": "kafka record"}
的數據。SELECT get_json_object(cast(value as string), '$.content') FROM kafka_tbl LIMIT 10;
返回結果:
Output: kafka record
(可選)實踐:將Kafka數據導入Hive表
如果您有數據分析等相關需求,可以參考以下步驟將Kafka中的數據導入Hive表。
假設域名接入點為kafka_addr:9092,topic名稱為topic1的Kafka實例中有兩條寫入時間在2023-04-25 15:00:00至2023-04-25 16:00:00之間的數據,具體內容為:
{"id": 1, "name": "name1"}
{"id": 2, "name": "name2"}
現在需要將這兩條數據寫入Hive表中,便于后續進行數據分析。
創建Kafka源表。
CREATE TEMPORARY TABLE kafka_src_tbl USING kafka OPTIONS( "kafka.bootstrap.servers"="kafka_addr:9092", "subscribe"="topic1", "startingTimestamp"=1682406000000, "endingTimestamp"=1682409600000);
參數的詳細說明,請參見參數說明。
創建Hive目標表。
CREATE TABLE kafka_target_tbl(id LONG, name STRING) USING parquet;
將Kafka源表中所有數據解析后寫入Hive目標表中。
INSERT INTO kafka_target_tbl SELECT cast(get_json_object(cast(value as string), '$.id') as long), get_json_object(cast(value as string), '$.name') FROM kafka_src_tbl;
查詢Kafka導入Hive表中的數據。
SELECT * FROM kafka_target_tbl LIMIT 10;
返回結果:
Output: +-----+--------+ | id | name | +-----+--------+ | 1 | name1 | | 2 | name2 | +-----+--------+