日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

日志實時入倉快速入門

實時計算Flink版提供豐富強大的日志數據實時入倉能力。本文為您介紹如何在實時計算管理控制臺上快速構建一個從Kafka到Hologres的數據同步作業。

背景信息

假設消息隊列Kafka實例中有一個名稱為users的Topic,其中有100條JSON數據,代表通過日志文件采集工具或者應用寫入Kafka的日志數據,其數據分布大致如下圖所示。數據分布

本文使用實時計算Flink版提供的CREATE TABLE AS(CTAS)語句,一鍵完成日志數據的同步,以及實時的表結構變更同步。

前提條件

步驟一:配置IP白名單

為了讓Flink能訪問Kafka和Hologres實例,您需要將Flink工作空間的網段添加到在Kafka和Hologres的白名單中。

  1. 獲取Flink工作空間的VPC網段。

    1. 登錄實時計算控制臺

    2. 在目標工作空間右側操作列,選擇更多 > 工作空間詳情

    3. 工作空間詳情對話框,查看虛擬交換機的網段信息。

      網段信息

  2. 在消息隊列Kafka的IP白名單中,添加網段信息。

    操作步驟詳情請參見配置白名單Kafka白名單

  3. 在Hologres的IP白名單中,添加網段信息。

    操作步驟詳情請參見IP白名單Holo白名單

步驟二:準備Kafka測試數據

使用實時計算Flink版的模擬數據生成源表作為數據生成器,將數據寫入到Kafka中。請按以下步驟使用實時計算開發控制臺將數據寫入至消息隊列Kafka。

  1. 在Kafka控制臺創建一個名稱為users的Topic。

    操作詳情請參見步驟一:創建Topic

  2. 創建將數據寫入到Kafka的作業。

    1. 登錄實時計算管理控制臺

    2. 單擊目標工作空間操作列下的控制臺

    3. 在左側導航欄,單擊數據開發 > ETL,單擊新建

    4. 新建作業草稿對話框,選擇目標模板(例如:選擇空白的流作業草稿),完成后單擊下一步,填寫作業配置信息。

      作業參數

      示例

      說明

      文件名稱

      kafka-data-input

      作業的名稱。

      說明

      作業名稱在當前項目中必須保持唯一。

      存儲位置

      作業草稿

      指定該作業的代碼文件所屬的文件夾。默認存放在作業草稿目錄。

      您還可以在現有文件夾右側,單擊新建文件夾圖標,新建子文件夾。

      引擎版本

      vvr-8.0.5-flink-1.17

      在引擎版本下拉列表中選擇目標引擎版本。

    5. 單擊創建

    6. 將以下作業代碼拷貝到作業文本編輯區。

      CREATE TEMPORARY TABLE source (
        id INT,
        first_name STRING,
        last_name STRING,
        `address` ROW<`country` STRING, `state` STRING, `city` STRING>,
        event_time TIMESTAMP
      ) WITH (
        'connector' = 'faker',
        'number-of-rows' = '100',
        'rows-per-second' = '10',
        'fields.id.expression' = '#{number.numberBetween ''0'',''1000''}',
        'fields.first_name.expression' = '#{name.firstName}',
        'fields.last_name.expression' = '#{name.lastName}',
        'fields.address.country.expression' = '#{Address.country}',
        'fields.address.state.expression' = '#{Address.state}',
        'fields.address.city.expression' = '#{Address.city}',
        'fields.event_time.expression' = '#{date.past ''15'',''SECONDS''}'
      );
      
      CREATE TEMPORARY TABLE sink (
        id INT,
        first_name STRING,
        last_name STRING,
        `address` ROW<`country` STRING, `state` STRING, `city` STRING>,
        `timestamp` TIMESTAMP METADATA
      ) WITH (
        'connector' = 'kafka',
        'properties.bootstrap.servers' = 'alikafka-host1.aliyuncs.com:9000,alikafka-host2.aliyuncs.com:9000,alikafka-host3.aliyuncs.com:9000',
        'topic' = 'users',
        'format' = 'json'
      );
      
      INSERT INTO sink SELECT * FROM source;
    7. 請按您的實際配置,修改以下參數配置信息。

      參數

      示例值

      說明

      properties.bootstrap.servers

      alikafka-host1.aliyuncs.com:9000,alikafka-host2.aliyuncs.com:9000,alikafka-host3.aliyuncs.com:9000

      Kafka Broker地址。

      格式為host:port,host:port,host:port,以英文逗號(,)分割。

      topic

      users

      Kafka Topic名稱。

  3. 啟動作業。

    1. 數據開發 > ETL頁面,單擊部署

    2. 部署新版本對話框中,單擊確定

    3. 配置作業資源,資源設置填寫詳情請參見配置作業資源

    4. 運維中心 > 作業運維頁面,單擊目標作業名稱操作列中的啟動。關于作業啟動的配置說明,請參見作業啟動

    5. 您可以在作業運維頁面觀察作業的運行信息和狀態。image

      由于faker數據源是一個有限流,因此在作業處于運行狀態后,大約1分鐘左右后,作業就會處于完成狀態。當作業結束運行代表作業已經將相關的數據寫入到Kafka的users中。其中,寫入到消息隊列Kafka的JSON數據格式大致如下。

      {
        "id": 765,
        "first_name": "Barry",
        "last_name": "Pollich",
        "address": {
          "country": "United Arab Emirates",
          "state": "Nevada",
          "city": "Powlowskifurt"
        }
      }

步驟三:創建Hologres Catalog

單表同步都需要依賴目標Catalog來創建目標表。因此,您需要通過控制臺創建目標Catalog。本文將以目標Catalog為Hologres Catalog為例,為您進行介紹。

創建名稱為holo的Hologres Catalog,操作步驟詳情請參見創建Hologres Catalogholo catalog

重要

您需要在您的目標實例中已創建flink_test_db數據庫,否則創建Catalog會報錯。

步驟四:創建并啟動數據同步作業

  1. 登錄實時計算開發控制臺,創建數據同步作業。

    1. 登錄實時計算管理控制臺

    2. 單擊目標工作空間操作列下的控制臺

    3. 在左側導航欄,單擊數據開發 > ETL,單擊新建

    4. 新建作業草稿對話框,選擇目標模板(例如:選擇空白的流作業草稿),完成后單擊下一步,填寫作業配置信息。

      作業參數

      示例

      說明

      文件名稱

      flink-quickstart-test

      作業的名稱。

      說明

      作業名稱在當前項目中必須保持唯一。

      存儲位置

      作業草稿

      指定該作業的代碼文件所屬的文件夾。默認存放在作業草稿目錄。

      您還可以在現有文件夾右側,單擊新建文件夾圖標,新建子文件夾。

      引擎版本

      vvr-8.0.5-flink-1.17

      在引擎版本下拉列表中選擇目標引擎版本。

    5. 單擊創建

  2. 將以下作業代碼拷貝到作業文本編輯區。

    將消息隊列Kafka中名稱為users的Topic數據同步至Hologres的flink_test_db數據庫的sync_kafka_users表中。您可以通過以下任意一種方式進行:

    • 通過CATS語句同步

      該方式無需您手動在Hologres中創建該表,也無需指明對應的列類型為JSON或JSONB。

      CREATE TEMPORARY TABLE kafka_users (
        `id` INT NOT NULL,
        `address` STRING,
        `offset` BIGINT NOT NULL METADATA,
        `partition` BIGINT NOT NULL METADATA,
        `timestamp` TIMESTAMP METADATA,
        `date` AS CAST(`timestamp` AS DATE),
        `country` AS JSON_VALUE(`address`, '$.country'),
        PRIMARY KEY (`partition`, `offset`) NOT ENFORCED
      ) WITH (
        'connector' = 'kafka',
        'properties.bootstrap.servers' = 'alikafka-host1.aliyuncs.com:9000,alikafka-host2.aliyuncs.com:9000,alikafka-host3.aliyuncs.com:9000',
        'topic' = 'users',
        'format' = 'json',
        'json.infer-schema.flatten-nested-columns.enable' = 'true', -- 自動展開嵌套列。
        'scan.startup.mode' = 'earliest-offset'
      );
      
      CREATE TABLE IF NOT EXISTS holo.flink_test_db.sync_kafka_users
      WITH (
        'connector' = 'hologres'
      ) AS TABLE kafka_users;
      說明

      為了避免作業Failover后,作業重啟將重復數據寫入到Hologres中,您可以添加相關主鍵從而唯一地標識數據。當數據重發時,Hologres將會保證相同partition和offset的數據只會保留一份。

    • 通過INSERT INTO語句同步

      考慮到Hologres中對于JSON和JSONB類型的數據會進行特殊的優化,您也可以通過INSERT INTO語句將嵌套JSON寫入到Hologres中。

      該方式需要您手動在Hologres中創建該表并指明需要對應的列類型為JSON或JSONB,然后通過下文的SQL,會將address數據寫入到 Hologres中類型為JSON的列。

      CREATE TEMPORARY TABLE kafka_users (
        `id` INT NOT NULL,
        `address` STRING, -- 該列對應的數據為嵌套JSON。
        `offset` BIGINT NOT NULL METADATA,
        `partition` BIGINT NOT NULL METADATA,
        `timestamp` TIMESTAMP METADATA,
        `date` AS CAST(`timestamp` AS DATE),
        `country` AS JSON_VALUE(`address`, '$.country')
      ) WITH (
        'connector' = 'kafka',
        'properties.bootstrap.servers' = 'alikafka-host1.aliyuncs.com:9000,alikafka-host2.aliyuncs.com:9000,alikafka-host3.aliyuncs.com:9000',
        'topic' = 'users',
        'format' = 'json',
        'json.infer-schema.flatten-nested-columns.enable' = 'true', -- 自動展開嵌套列。
        'scan.startup.mode' = 'earliest-offset'
      );
      
      CREATE TEMPORARY TABLE holo (
        `id` INT NOT NULL,
        `address` STRING,
        `offset` BIGINT,
        `partition` BIGINT,
        `timestamp` TIMESTAMP,
        `date` DATE,
        `country` STRING
      ) WITH (
        'connector' = 'hologres',
        'endpoint' = 'hgpostcn-****-cn-beijing-vpc.hologres.aliyuncs.com:80',
        'username' = 'LTAI5tE572UJ44Xwhx6i****',
        'password' = 'KtyIXK3HIDKA9VzKX4tpct9xTm****',
        'dbname' = 'flink_test_db',
        'tablename' = 'sync_kafka_users'
      );
      
      INSERT INTO holo
      SELECT * FROM kafka_users;
  3. 請按您的實際配置,修改以下參數配置信息。

    參數

    示例值

    說明

    properties.bootstrap.servers

    alikafka-host1.aliyuncs.com:9000,alikafka-host2.aliyuncs.com:9000,alikafka-host3.aliyuncs.com:9000

    Kafka Broker地址。

    格式為host:port,host:port,host:port,以英文逗號(,)分割。

    topic

    users

    Kafka Topic名稱。

    endpoint

    hgpostcn-****-cn-beijing-vpc.hologres.aliyuncs.com:80

    Hologres端點。

    格式為<ip>:<port>。

    username

    LTAI5tE572UJ44Xwhx6i****

    Hologres用戶名,請填寫阿里云賬號的AccessKey ID。

    password

    KtyIXK3HIDKA9VzKX4tpct9xTm****

    Hologres密碼,請填寫阿里云賬號的AccessKey Secret。

    dbname

    flink_test_db

    Hologres數據庫名稱。

    tablename

    sync_kafka_users

    Hologres表名稱。

    說明
    • 如果您通過INSERT INTO方式同步數據,則需要提前在目標實例的數據庫中創建sync_kafka_users表和字段。

    • 如果Schema不為Public時,則tablename需要填寫為schema.tableName。

  4. 單擊保存

  5. 數據開發 > ETL頁面,單擊部署

  6. 運維中心 > 作業運維頁面,單擊目標作業名稱操作列中的啟動關于作業啟動的配置說明,請參見作業啟動

  7. 單擊啟動

    作業啟動后,您可以在作業運維界面觀察作業的運行信息和狀態。image

步驟五:觀察全量同步結果

  1. 登錄Hologres管理控制臺

  2. 實例列表頁面,單擊目標實例名稱。

  3. 在頁面右上角,單擊登錄實例

  4. 元數據管理頁簽,查看users數據庫中同步的sync_kafka_users表結構和數據。

    sync_kafka_users表

    同步后的表結構和數據如下圖所示。

    • 表結構

      雙擊sync_kafka_users表名稱,查看表結構。

      表結構

      說明

      在同步過程中,建議聲明Kafka的Metadata partition和offset作為Hologres表中的主鍵。這樣可以避免由于作業Failover,數據重發導致下游存儲多份相同數據。

    • 表數據

      在sync_kafka_users表信息頁面右上角,單擊查詢表后,輸入如下命令,單擊運行

      SELECT * FROM public.sync_kafka_users order by partition, "offset";

      表數據結果如下圖所示。表數據

步驟六:觀察自動同步表結構變更

  1. 在Kafka控制臺手動發送一條包含新增列的消息。

    1. 登錄云消息隊列 Kafka 版控制臺

    2. 實例列表頁面,單擊目標實例名稱。

    3. Topic管理頁面,單擊目標Topic名稱users。

    4. 單擊體驗發送消息

    5. 填寫消息內容。

      消息內容

      配置項

      示例

      發送方式

      選中控制臺

      消息Key

      填寫為flinktest。

      消息內容

      將以下JSON內容復制粘貼到消息內容中。

      {
        "id": 100001,
        "first_name": "Dennise",
        "last_name": "Schuppe",
        "address": {
          "country": "Isle of Man",
          "state": "Montana",
          "city": "East Coleburgh"
        },
        "house-points": {
          "house": "Pukwudgie",
          "points": 76
        }
      }
      說明

      該示例中house-points是一個新增的嵌套列。

      發送到指定分區

      選中

      分區ID

      填寫為0。

    6. 單擊確定

  2. 在Hologres控制臺,查看sync_kafka_users表結構和數據的變化。

    1. 登錄Hologres管理控制臺

    2. 實例列表頁面,單擊目標實例名稱。

    3. 在頁面右上角,單擊登錄實例

    4. 元數據管理頁簽,雙擊sync_kafka_users表名稱。

    5. 單擊查詢表后,輸入如下命令,單擊運行

      SELECT * FROM public.sync_kafka_users order by partition, "offset";
    6. 查看表數據結果。

      表數據結果如下圖所示。Hologres表結果

      可以觀察到id為100001的數據已經成功地寫入到了Hologres中。同時,Hologres中多了house-points.house和house-points.points 兩列。

      說明

      雖然插入到Kafka中的數據僅只有一個嵌套列house-points,但是由于在kafka_users表的WITH參數內聲明要求json.infer-schema.flatten-nested-columns.enable,那么Flink 就會自動展平新增的嵌套列,并用訪問該列的路徑作為展開后的列的名字。

(可選)步驟七:調整作業資源配置

根據數據量的不同,我們往往需要調節不同節點的并發和資源,以達到更優的作業性能。您可以使用資源配置的基礎模式簡單配置作業并發度和CU數,也可以使用資源配置的專家模式細粒度地調整節點的并發和資源。

  1. 登錄實時計算開發控制臺,進入作業詳情頁面。

    1. 登錄實時計算管理控制臺

    2. 單擊目標工作空間操作列下的控制臺

    3. 在左側導航欄,單擊運維中心 > 作業運維

  2. 修改資源配置。

    1. 部署詳情頁面,單擊資源配置右側的編輯資源模式選擇為專家模式

    2. 在配置計劃中單擊立即獲取

    3. 單擊展開全部

      觀察完整的拓撲圖,通過完整的拓撲圖能了解到整個數據的同步計劃,即具體同步哪些表。

    4. 手動設置每個節點的并發。

      由于Kafka users Topic有四個分區,因此可以設置作業為4并發。由于日志數據只是寫入到Hologres一張表中,為了降低Hologres的連接數,可以調節Hologres的并發為2。資源配置步驟詳情請參見配置作業部署信息。經過調節后的作業資源配置計劃如下圖所示。作業配置計劃

    5. 單擊確定

    6. 填寫基礎配置后,單擊啟動關于作業啟動的配置說明,請參見作業啟動

  3. 運維中心 > 作業運維頁面,單擊目標作業名稱。

  4. 狀態總覽頁面,查看調整效果。

    image.png

相關文檔