日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

Flink VVP+DLF數據入湖與分析實踐

數據湖構建(DLF)可以結合阿里云實時計算Flink版(Flink VVP),以及Flink CDC相關技術,實現靈活定制化的數據入湖。并利用DLF統一元數據管理、權限管理等能力,實現數據湖多引擎分析、數據湖管理等功能。本文為您介紹Flink+DLF數據湖方案具體步驟。

背景信息

阿里云實時計算Flink版是一套基于Apache Flink構建的實時大數據分析平臺,支持多種數據源和結果表類型。Flink任務可以利用數據湖統一存儲的優勢,使用Hudi結果表或Iceberg結果表,將作業的結果輸出到數據湖中,實現數據湖分析。在寫入數據湖的過程中,Flink可以通過設置DLF Catalog,將表的元數據同步到數據湖構建(DLF)中。依托數據湖構建產品(DLF)提供的企業級統一元數據能力,Flink+DLF方案可以實現寫入的數據湖表無縫對接阿里云上的計算引擎,如EMR、MaxCompute、Hologres等。也可以通過DLF提供的豐富的數據湖管理能力,實現數據湖生命周期管理和湖格式的優化。

image

前提條件

  • 已開通實時計算Flink版,創建Flink全托管工作空間。

  • 已開通阿里云數據湖構建(DLF)服務。如果您沒有開通,則可以在DLF產品首頁,單擊開通

  • 本文以MySQL數據源為例,需要創建RDS MySQL,詳情請參見創建RDS MySQL實例。如果使用其他數據源入湖可忽略。

重要

創建的RDS MySQL需要和實時計算Flink版在同一個地域同一個VPC內,RDS MySQL須為5.7及以上版本。

操作流程

步驟一:準備MySQL數據

  1. 登錄準備好的MySQL實例,詳情請參見通過DMS登錄RDS MySQL

  2. 執行如下命令,創建一張表,并插入若干測試數據。

    CREATE DATABASE testdb;
    CREATE TABLE testdb.student (
      `id` bigint(20) NOT NULL,
      `name` varchar(256) DEFAULT NULL,
      `age` bigint(20) DEFAULT NULL,
      PRIMARY KEY (`id`)
    );
    
    INSERT INTO testdb.student VALUES (1,'name1',10);
    INSERT INTO testdb.student VALUES (2,'name2',20);

步驟二:Flink創建DLF Catalog

  1. 登錄實時計算管理控制臺

  2. 進入創建Catalog頁面。

    1. Flink全托管頁簽,單擊目標工作空間操作列下的控制臺

    2. 在左側導航欄,單擊元數據管理

    3. 單擊創建Catalog

  3. 創建DLF Catalog。

    1. 創建Catalog頁面,選擇DLF,單擊下一步

    2. 填寫以下參數配置信息,單擊確定。詳情請參見管理DLF Catalog

    image

    當您成功創建DLF之后,可在元數據管理中看到新增的dlf數據目錄,默認鏈接的是DLF的default數據目錄。

    image

步驟三:創建Flink入湖作業

  1. 登錄實時計算管理控制臺

  2. Flink全托管頁簽,單擊目標工作空間操作列下的控制臺

  3. 創建數據源表和目標表。

    1. 在左側導航欄,單擊數據開發 > 數據查詢

    2. 在SQL編輯區域,輸入以下代碼,單擊運行

      -- 創建數據源表
      CREATE TABLE IF NOT EXISTS student_source (
        id INT,
        name VARCHAR (256),
        age INT,
        PRIMARY KEY (id) NOT ENFORCED
      )
      WITH (
        'connector' = 'mysql-cdc',
        -- hostname替換為RDS的連接地址
        'hostname' = 'rm-xxxxxxxx.mysql.rds.aliyuncs.com',
        'port' = '3306',
        'username' = '<RDS user name>',
        'password' = '<RDS password>',
        'database-name' = '<RDS database>',
        -- table-name為數據源表,本例中填步驟二創建的student表
        'table-name' = 'student'
      );
      
      -- catalog名為步驟二創建的dlf catalog name,本例中填dlf
      CREATE DATABASE IF NOT EXISTS dlf.dlf_testdb;
      
      -- 創建目標表
      CREATE TABLE IF NOT EXISTS dlf.dlf_testdb.student_hudi (
        id    BIGINT PRIMARY KEY NOT ENFORCED,
        name  STRING,
        age    BIGINT
      ) WITH(
        'connector' = 'hudi'
      );

      創建成功后,可在元數據管理中看到新增的數據源表和目標表。

      image

  4. 創建Flink SQL入湖作業。

    1. 在左側導航欄,單擊數據開發 > ETL

    2. 單擊新建后,在新建作業草稿對話框,選擇空白的流作業草稿,單擊下一步

    3. 填寫作業信息,單擊創建

    4. SQL編輯區域,輸入以下代碼,創建一個Flink SQL作業。

      -- 創建流SQL作業
      INSERT INTO dlf.dlf_testdb.student_hudi SELECT * FROM student_source  /*+ OPTIONS('server-id'='123456') */;
      說明
    5. 在SQL編輯區域右上方,單擊部署部署新版本對話框,可根據需要填寫或選中相關內容,單擊確定

  5. 啟動作業。

    1. 在左側導航欄,單擊運維中心 > 作業運維

    2. 單擊目標作業名稱操作列中的啟動

      選擇無狀態啟動后,單擊啟動。當您看到作業狀態變為運行中,則代表作業運行正常。作業啟動參數配置,詳情請參見作業啟動

步驟四:使用DLF數據湖分析

  1. 在左側導航欄,單擊數據開發 > 數據查詢

  2. 在SQL編輯區域,輸入以下代碼,單擊運行

    SELECT * FROM dlf.dlf_testdb.student_hudi;

    查詢結果如下圖所示,可以直接對Flink寫入數據湖的數據進行查詢和分析

    image

說明

如果您購買了EMR集群,并且開啟了數據湖DLF元數據,也可以直接通過EMR集群對Flink入湖結果進行數據湖分析,參考Hudi與Spark SQL集成

相關資料

如果您想要通過EMR DataFlow集群的Flink讀寫DLF,請參考文章通過數據湖元數據DLF讀寫Hudi