日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

Flink+DLF數據入湖與分析實踐

阿里云實時計算Flink版結合DLF Paimon Catalog,實現Flink作業結果到數據湖的高效寫入和元數據同步,支持無縫對接多種計算引擎并優化數據湖管理,本文為您介紹具體的操作流程。

背景信息

阿里云實時計算Flink版是一套基于Apache Flink構建的實時大數據分析平臺,支持多種數據源和結果表類型。Flink任務可以利用數據湖統一存儲的優勢,使用Paimon表,將作業的結果輸出到數據湖中,實現數據湖分析。在寫入數據湖的過程中,Flink可以通過設置DLF Catalog,將表的元數據同步到數據湖構建(DLF)中。依托數據湖構建產品(DLF)提供的企業級統一元數據能力,Flink+DLF方案可以實現寫入的數據湖表無縫對接阿里云上的計算引擎,如EMR、MaxCompute、Hologres等。也可以通過DLF提供的豐富的數據湖管理能力,實現數據湖生命周期管理和湖格式的優化。

前提條件

  • 已創建Flink全托管工作空間,引擎版本須為VVR 8.0.9及以上。如未創建,詳情請參見開通實時計算Flink版

  • 已創建DLF 2.0數據目錄。如未創建,詳情請參見創建數據目錄

    說明

    如果是RAM用戶,在進行數據操作之前,需要先授予相應的資源權限。詳情請參見授權管理

  • 本文以MySQL數據源為例,需要創建RDS MySQL,詳情請參見快速創建RDS MySQL實例

說明

創建的RDS MySQL需要和實時計算Flink版在同一個地域、同一個VPC內,RDS MySQL須為5.7及以上版本。

操作流程

步驟1:Flink創建DLF Paimon Catalog

  1. 登錄Flink控制臺

  2. 進入創建Catalog頁面。

    1. Flink全托管頁簽,單擊目標工作空間操作列下的控制臺

    2. 在左側導航欄,單擊元數據管理

    3. 單擊創建Catalog

  3. 創建DLF Paimon Catalog。

    1. 創建Catalog頁面,類型選擇Apache Paimon Catalog。

    2. metastore選擇dlf,下拉列表中選擇標有v2.0的Catalog,Catalog需要提前在DLF控制臺創建。

    image

步驟2:準備MySQL數據

  1. 登錄準備好的MySQL實例,詳情請參見通過DMS登錄RDS MySQL

  2. 執行如下命令,在已有數據庫下創建一張表,并插入若干測試數據。

    CREATE TABLE `student` (
      `id` bigint(20) NOT NULL,
      `name` varchar(256) DEFAULT NULL,
      `age` bigint(20) DEFAULT NULL,
      PRIMARY KEY (`id`)
    );
    
    INSERT INTO student VALUES (1,'name1',10);
    INSERT INTO student VALUES (2,'name2',20);

步驟3:創建Flink入湖作業

  1. 登錄Flink控制臺

  2. 創建數據源表和目標表。

    1. Flink全托管頁簽,單擊目標工作空間操作列下的控制臺

    2. 在左側導航欄,選擇數據開發 > 數據查詢

    3. 單擊新建,創建一個新的查詢腳本,執行如下SQL。

      說明

      關于MySQL源表的參數設置和使用條件,請參考MySQL的CDC源表

      -- Catalog名為步驟1創建的DLF Catalog Name,本例中填dlf_clg_test
      CREATE DATABASE IF NOT EXISTS dlf_clg_test.dlf_testdb;
      
      --創建目標表
      CREATE TABLE IF NOT EXISTS dlf_clg_test.dlf_testdb.student_paimon (
        id    BIGINT PRIMARY KEY NOT ENFORCED,
        name  STRING,
        age    BIGINT
      ) WITH(
        'merge-engine' = 'partial-update', -- 使用部分更新數據合并機制產生寬表
        'changelog-producer' = 'lookup', -- 使用lookup增量數據產生機制以低延時產出變更數據
        'bucket' = '3'
      );
  3. 創建Flink入湖任務。

    1. 在SQL開發頁面,創建一個新的SQL流作業,執行如下SQL。

      -- 創建CDC源表
      CREATE TEMPORARY TABLE student_source (
        id INT,
        name VARCHAR (256),
        age INT,
        PRIMARY KEY (id) NOT ENFORCED
      )
      WITH (
        'connector' = 'mysql-cdc',
        -- hostname替換為RDS的連接地址
        'hostname' = 'rm-xxxxxxxx.mysql.rds.aliyuncs.com',
        'port' = '3306',
        'username' = '<RDS username>',
        'password' = '<RDS password>',
        'database-name' = '<RDS database>',
        -- table-name為數據源表,本例中填步驟2創建的student表
        'table-name' = 'student'
      );
      
      INSERT INTO dlf_clg_test.dlf_testdb.student_paimon SELECT * FROM student_source;

    b. 單擊保存

    c. 單擊部署

  4. 在左側導航欄,單擊運維中心 > 作業運維。找到上面創建的SQL流作業,單擊啟動

  5. 啟動成功后一段時間,可以看到作業的狀態變成運行中

步驟4:使用Flink進行數據分析

  1. 執行數據查詢和分析。

    1. Flink全托管頁簽,單擊目標工作空間操作列下的控制臺

    2. 在左側導航欄,單擊數據開發 > 數據查詢

    3. 在SQL開發頁面,創建一個新的查詢腳本,執行如下SQL。

    SELECT count(1) FROM dlf_clg_test.dlf_testdb.student_paimon;
    SELECT * FROM dlf_clg_test.dlf_testdb.student_paimon;
  2. 單擊運行,可以直接對Flink寫入數據湖的數據進行查詢和分析。