日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

訪問Kafka數據

本文介紹如何通過Lindorm計算引擎訪問Kafka數據。您可以將Kafka實例中的數據加載到Lindorm計算引擎中,結合Lindorm計算引擎的其他數據源數據,完成復雜的數據生產作業。

前提條件

操作步驟

  1. 啟動Beeline,并訪問Lindorm計算引擎服務。具體操作,請參見使用Beeline訪問JDBC服務

  2. 登錄云消息隊列 Kafka 版控制臺,獲取訪問Kafka數據的連接信息。

    1. 實例詳情頁面的接入點信息區域,獲取Kafka實例的域名接入點。

    2. Topic管理頁面,獲取Kafka實例的Topic名稱。

  3. 在Lindorm計算引擎中創建Kafka Topic對應的Hive臨時表。

    CREATE TEMPORARY TABLE kafka_tbl USING kafka OPTIONS(table_options);

    USING kafka中Kafka為指定數據源。Kafka表無需指定Schema,Lindorm計算引擎提供默認的Kafka Schema,不可修改,Schema信息如下:

    列名稱

    數據類型

    說明

    key

    binary

    該記錄在Kafka中的Key信息。

    value

    binary

    該記錄在Kafka中的Value信息。

    topic

    string

    該記錄所屬的Kafka Topic。

    partition

    int

    該記錄所屬的Partition。

    offset

    bigint

    該記錄在Partition的offset。

    timestamp

    timestamp

    該記錄的時間戳。

    timestampType

    int

    該記錄時間戳的類型:

    • 0:CreateTime。

    • 1:LogAppendTime。

    詳細信息,請參見timestampType

    table_options參數說明:

    參數

    是否必選

    說明

    示例值

    kafka.bootstrap.servers

    Kafka實例的域名接入點。

    alikafka-pre-cn-x0r34a20****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-3-vpc.alikafka.aliyuncs.com:9092

    subscribe

    Kafka實例的Topic名稱。

    topic_to_read

    startingTimestamp

    待訪問Topic數據片段開始時間的時間戳,單位為毫秒。

    說明

    您可以使用Spark SQL unix_timestamp函數將時間轉換為UNIX時間戳。

    1682406000000

    endingTimestamp

    待訪問Topic數據片段結束時間的時間戳,單位為毫秒。

    說明

    您可以使用Spark SQL unix_timestamp函數將時間轉換為UNIX時間戳。

    1682409600000

    startingOffsetsByTimestampStrategy

    如果Kafka實例中部分Partition中沒有數據,需要添加該參數。

    latest

    創建Kafka表支持多種類型參數,更多參數請參考:Structured Streaming + Kafka Integration Guide

    示例:

    CREATE TEMPORARY TABLE kafka_tbl USING kafka OPTIONS(
      "kafka.bootstrap.servers"="alikafka-pre-cn-x0r34a20****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-x0r34a20****-3-vpc.alikafka.aliyuncs.com:9092",
      "subscribe"="topic_to_read",
      "startingTimestamp"=1682406000000,
      "endingTimestamp"=1682409600000);
  4. 查詢Kafka表的數據。

    • 查詢表Kafka_tbl中Schema的數據。

      DESCRIBE kafka_tbl;
    • 查詢表Kafka_tbl中的數據。

      SELECT * FROM kafka_tbl LIMIT 10;
    • 使用Spark函數提取Kafka中的數據。例如,查詢表Kafka_tbl中Value為{"content": "kafka record"}的數據。

      SELECT get_json_object(cast(value as string), '$.content') FROM kafka_tbl LIMIT 10;

      返回結果:

      Output: kafka record

(可選)實踐:將Kafka數據導入Hive表

如果您有數據分析等相關需求,可以參考以下步驟將Kafka中的數據導入Hive表。

假設域名接入點為kafka_addr:9092,topic名稱為topic1的Kafka實例中有兩條寫入時間在2023-04-25 15:00:00至2023-04-25 16:00:00之間的數據,具體內容為:

{"id": 1, "name": "name1"}
{"id": 2, "name": "name2"}

現在需要將這兩條數據寫入Hive表中,便于后續進行數據分析。

  1. 創建Kafka源表。

    CREATE TEMPORARY TABLE kafka_src_tbl USING kafka OPTIONS(
      "kafka.bootstrap.servers"="kafka_addr:9092",
      "subscribe"="topic1",
      "startingTimestamp"=1682406000000,
      "endingTimestamp"=1682409600000);

    參數的詳細說明,請參見參數說明

  2. 創建Hive目標表。

    CREATE TABLE kafka_target_tbl(id LONG, name STRING) USING parquet;
  3. 將Kafka源表中所有數據解析后寫入Hive目標表中。

    INSERT INTO kafka_target_tbl
        SELECT
          cast(get_json_object(cast(value as string), '$.id') as long),
          get_json_object(cast(value as string), '$.name')
        FROM
          kafka_src_tbl;
  4. 查詢Kafka導入Hive表中的數據。

    SELECT * FROM kafka_target_tbl LIMIT 10;

    返回結果:

    Output:
    +-----+--------+
    | id  |  name  |
    +-----+--------+
    | 1   | name1  |
    | 2   | name2  |
    +-----+--------+