日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

數(shù)倉場景:即席查詢

本文通過示例為您介紹如何基于EMR Serverless StarRocks的視圖能力構(gòu)建數(shù)倉場景-即席查詢解決方案。

前提條件

使用限制

  • DataFlow集群、EMR Serverless StarRocks實(shí)例和RDS MySQL實(shí)例需要在同一個VPC下,并且在同一個可用區(qū)下。

  • DataFlow集群和EMR Serverless StarRocks實(shí)例均須開啟公網(wǎng)訪問。

  • RDS MySQL為5.7及以上版本。

注意事項(xiàng)

本文檔僅供測試使用,生產(chǎn)級別的Flink作業(yè)請使用阿里云實(shí)時計算Flink版產(chǎn)品進(jìn)行配置,或者使用YARN或者Kubernetes提交作業(yè)。

詳情請參見Apache Hadoop YARNNative Kubernetes。

場景介紹

隨著向量化、CBO(Cost Based Optimizer,基于代價的優(yōu)化器)、單機(jī)多核調(diào)度等技術(shù)的應(yīng)用,StarRocks的計算能力逐步提升。很多時候您在使用StarRocks進(jìn)行數(shù)倉分層建模時,大部分將數(shù)據(jù)建模到DWD層(基礎(chǔ)整合層)或DWS層(維度寬度)。在實(shí)際業(yè)務(wù)中,運(yùn)用StarRocks的計算能力,可以直接查詢DWD或DWS層數(shù)據(jù),還可以靈活地交互式即席查詢。

方案架構(gòu)

使用StarRocks實(shí)現(xiàn)數(shù)倉場景即席查詢的基本架構(gòu)如下圖所示。架構(gòu)圖

整體數(shù)據(jù)流如下:

  1. Flink清洗導(dǎo)入Kafka的日志或者通過Flink-CDC-StarRocks工具讀取MySQL Binlog導(dǎo)入StarRocks。根據(jù)需要選用明細(xì)、聚合、更新或主鍵各種模型,只物理落地ODS層(格式整理層)。

  2. 向上采用StarRocks View視圖能力,利用StarRocks向量化極速查詢和CBO優(yōu)化器滿足多表關(guān)聯(lián)、嵌套子查詢等復(fù)雜SQL,查詢時現(xiàn)場計算指標(biāo)結(jié)果,保證指標(biāo)上卷和下鉆高度同源一致。

方案特點(diǎn)

該方案主要特點(diǎn)是,計算邏輯在StarRocks側(cè)(現(xiàn)場查詢),適用于業(yè)務(wù)庫高頻數(shù)據(jù)更新的場景,實(shí)體數(shù)據(jù)只在ODS或DWD層存儲。

  • 方案優(yōu)勢

    • 靈活性強(qiáng),可隨時根據(jù)業(yè)務(wù)邏輯調(diào)整View。

    • 指標(biāo)修改簡單,上層都是View邏輯封裝,只需要更新底表數(shù)據(jù)。

  • 方案缺點(diǎn)

    當(dāng)View的邏輯較為復(fù)雜,數(shù)據(jù)量較多時,查詢性能較低。

  • 適用場景

    • 數(shù)據(jù)來源于數(shù)據(jù)庫和埋點(diǎn)系統(tǒng),適合對QPS要求不高,對靈活性要求比較高,且計算資源較為充足的場景。

    • 實(shí)時要求非常高,要求寫入即可查,更新即反饋。適合有即席查詢需求,且資源較為充足,查詢復(fù)雜度較低的場景。

操作流程

示例操作如下:

  1. 步驟一:創(chuàng)建MySQL源數(shù)據(jù)表

  2. 步驟二:創(chuàng)建StarRocks表

  3. 步驟三:執(zhí)行Flink任務(wù),啟動數(shù)據(jù)流

  4. 步驟四:驗(yàn)證數(shù)據(jù)

步驟一:創(chuàng)建MySQL源數(shù)據(jù)表

  1. 創(chuàng)建測試的數(shù)據(jù)庫和賬號,具體操作請參見創(chuàng)建數(shù)據(jù)庫和賬號。

    創(chuàng)建完數(shù)據(jù)庫和賬號后,需要授權(quán)測試賬號的讀寫權(quán)限。

    說明

    本文示例中創(chuàng)建的數(shù)據(jù)庫名稱為flink_cdc,賬號為emr_test。

  2. 使用創(chuàng)建的測試賬號連接MySQL實(shí)例,具體操作請參見通過DMS登錄RDS MySQL

  3. 執(zhí)行以下命令,創(chuàng)建數(shù)據(jù)表orders。

    create table flink_cdc.orders (
       order_id INT NOT NULL AUTO_INCREMENT,
       order_revenue FLOAT NOT NULL,
       order_region VARCHAR(40) NOT NULL,
       customer_id INT NOT NULL,
       PRIMARY KEY (order_id)
    );
  4. 執(zhí)行以下命令,創(chuàng)建數(shù)據(jù)表customers。

    create table flink_cdc.customers (
       customer_id INT NOT NULL,
       customer_age INT NOT NULL,
       customer_name VARCHAR(40) NOT NULL,
       PRIMARY KEY (customer_id)
    );

步驟二:創(chuàng)建StarRocks表

  1. 連接EMR Serverless StarRocks實(shí)例,詳情請參見通過客戶端方式連接StarRocks實(shí)例。

  2. 執(zhí)行以下命令,創(chuàng)建數(shù)據(jù)庫。

    CREATE DATABASE IF NOT EXISTS `flink_cdc`;
  3. 執(zhí)行以下命令,創(chuàng)建數(shù)據(jù)表customers。

    CREATE TABLE IF NOT EXISTS `flink_cdc`.`customers` (
      `customer_id` INT NOT NULL  COMMENT "",
      `customer_age` FLOAT NOT NULL  COMMENT "",
      `customer_name` STRING NOT NULL  COMMENT ""
    ) ENGINE=olap
    PRIMARY KEY(`customer_id`)
    COMMENT ""
    DISTRIBUTED BY HASH(`customer_id`) BUCKETS 1
    PROPERTIES (
      "replication_num" = "1"
    );
  4. 執(zhí)行以下命令,創(chuàng)建數(shù)據(jù)表orders。

    CREATE TABLE IF NOT EXISTS `flink_cdc`.`orders` (
      `order_id` INT NOT NULL  COMMENT "",
      `order_revenue` FLOAT NOT NULL  COMMENT "",
      `order_region` STRING NOT NULL  COMMENT "",
      `customer_id` INT NOT NULL  COMMENT ""
    ) ENGINE=olap
    PRIMARY KEY(`order_id`)
    COMMENT ""
    DISTRIBUTED BY HASH(`order_id`) BUCKETS 1
    PROPERTIES (
      "replication_num" = "1"
    );
  5. 執(zhí)行以下命令,基于ODS表創(chuàng)建DWD視圖。

    CREATE VIEW flink_cdc.dwd_order_customer_valid (
      order_id,
      order_revenue,
      order_region,
      customer_id,
      customer_age,
      customer_name
    )
    AS
    SELECT o.order_id, o.order_revenue, o.order_region, c.customer_id, c.customer_age, c.customer_name
    FROM flink_cdc.customers c JOIN flink_cdc.orders o
    ON c.customer_id=o.customer_id
    WHERE c.customer_id != -1;
  6. 執(zhí)行以下命令,基于DWD表創(chuàng)建DWS視圖。

    CREATE VIEW flink_cdc.dws_agg_by_region (
      order_region,
      order_cnt,
      order_total_revenue)
    AS
    SELECT order_region, count(order_region), sum(order_revenue)
    FROM flink_cdc.dwd_order_customer_valid
    GROUP BY order_region;

步驟三:執(zhí)行Flink任務(wù),啟動數(shù)據(jù)流

  1. 下載Flink CDC connectorFlink StarRocks Connector,并上傳到DataFlow集群的/opt/apps/FLINK/flink-current/lib目錄下。

  2. 使用SSH方式登錄DataFlow集群,詳情請參見登錄集群。

  3. 添加端口配置,并修改并行執(zhí)行的任務(wù)槽數(shù)量。

    1. 執(zhí)行以下命令,編輯文件flink-conf.yaml

      vim /etc/taihao-apps/flink-conf/flink-conf.yaml
    2. 添加以下內(nèi)容至文件最后一行。

      rest.port: 8083
    3. 修改參數(shù)taskmanager.numberOfTaskSlots的值為3,默認(rèn)值為1。

  4. 執(zhí)行以下命令,啟動集群。

    重要

    本文示例僅供測試,如果是生產(chǎn)級別的Flink作業(yè)請使用YARN或Kubernetes方式提交,詳情請參見Apache Hadoop YARNNative Kubernetes

    /opt/apps/FLINK/flink-current/bin/start-cluster.sh
  5. 編寫Flink SQL作業(yè),并保存為demo.sql。

    執(zhí)行以下命令,編輯demo.sql文件。

    vim demo.sql

    文件內(nèi)容如下所示。

    CREATE DATABASE IF NOT EXISTS `default_catalog`.`flink_cdc`;
    
    CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`customers_src` (
      `customer_id` INT NOT NULL,
      `customer_age` FLOAT NOT NULL,
      `customer_name` STRING NOT NULL,
      PRIMARY KEY(`customer_id`)
     NOT ENFORCED
    ) with (
      'connector' = 'mysql-cdc',
      'hostname' = 'rm-2ze8398257383****.mysql.rds.aliyuncs.com',
      'port' = '3306',
      'username' = 'emr_test',
      'password' = 'Yz12****',
      'database-name' = 'flink_cdc',
      'table-name' = 'customers'
    );
    
    CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`customers_sink` (
      `customer_id` INT NOT NULL,
      `customer_age` FLOAT NOT NULL,
      `customer_name` STRING NOT NULL,
      PRIMARY KEY(`customer_id`)
     NOT ENFORCED
    ) with (
      'load-url' = 'fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030',
      'database-name' = 'flink_cdc',
      'jdbc-url' = 'jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030',
      'sink.buffer-flush.interval-ms' = '15000',
      'sink.properties.format' = 'json',
      'username' = 'admin',
      'table-name' = 'customers',
      'sink.properties.strip_outer_array' = 'true',
      'password' = '1qaz!QAZ',
      'sink.max-retries' = '10',
      'connector' = 'starrocks'
    );
    INSERT INTO `default_catalog`.`flink_cdc`.`customers_sink` SELECT * FROM `default_catalog`.`flink_cdc`.`customers_src`;
    
    CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`orders_src` (
      `order_id` INT NOT NULL,
      `order_revenue` FLOAT NOT NULL,
      `order_region` STRING NOT NULL,
      `customer_id` INT NOT NULL,
      PRIMARY KEY(`order_id`)
     NOT ENFORCED
    ) with (
      'database-name' = 'flink_cdc',
      'table-name' = 'orders',
      'connector' = 'mysql-cdc',
      'hostname' = 'rm-2ze8398257383****.mysql.rds.aliyuncs.com',
      'port' = '3306',
      'username' = 'emr_test',
      'password' = 'Yz12****'
    );
    
    CREATE TABLE IF NOT EXISTS `default_catalog`.`flink_cdc`.`orders_sink` (
      `order_id` INT NOT NULL,
      `order_revenue` FLOAT NOT NULL,
      `order_region` STRING NOT NULL,
      `customer_id` INT NOT NULL,
      PRIMARY KEY(`order_id`)
     NOT ENFORCED
    ) with (
      'sink.properties.strip_outer_array' = 'true',
      'password' = '1qaz!QAZ',
      'sink.max-retries' = '10',
      'connector' = 'starrocks',
      'table-name' = 'orders',
      'jdbc-url' = 'jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030',
      'sink.buffer-flush.interval-ms' = '15000',
      'sink.properties.format' = 'json',
      'username' = 'admin',
      'load-url' = 'fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030',
      'database-name' = 'flink_cdc'
    );
    
    INSERT INTO `default_catalog`.`flink_cdc`.`orders_sink` SELECT * FROM `default_catalog`.`flink_cdc`.`orders_src`;

    涉及參數(shù)如下所示:

    • 創(chuàng)建數(shù)據(jù)表customers_src。

      參數(shù)

      描述

      connector

      固定值為mysql-cdc。

      hostname

      RDS的內(nèi)網(wǎng)地址。

      您可以在RDS的數(shù)據(jù)庫連接頁面,單擊內(nèi)網(wǎng)地址進(jìn)行復(fù)制。例如,rm-2ze8398257383****.mysql.rds.aliyuncs.com。

      port

      固定值為3306。

      username

      步驟一:創(chuàng)建MySQL源數(shù)據(jù)表中創(chuàng)建的賬號名。本示例為emr_test。

      password

      步驟一:創(chuàng)建MySQL源數(shù)據(jù)表中創(chuàng)建的賬號的密碼。本示例為Yz12****。

      database-name

      步驟一:創(chuàng)建MySQL源數(shù)據(jù)表中創(chuàng)建的數(shù)據(jù)庫名。本示例為flink_cdc。

      table-name

      步驟一:創(chuàng)建MySQL源數(shù)據(jù)表中創(chuàng)建的數(shù)據(jù)表。本示例為customers。

    • 創(chuàng)建數(shù)據(jù)表customers_sink和orders_sink。

      參數(shù)

      描述

      load-url

      指定FE節(jié)點(diǎn)的內(nèi)網(wǎng)地址和HTTP端口,格式為EMR Serverless StarRocks實(shí)例FE節(jié)點(diǎn)的內(nèi)網(wǎng)地址:8030。例如,fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030。

      說明

      關(guān)于如何獲取EMR Serverless StarRocks實(shí)例FE節(jié)點(diǎn)的內(nèi)網(wǎng)地址,請參見查看實(shí)例列表與詳情。

      database-name

      步驟一:創(chuàng)建MySQL源數(shù)據(jù)表中創(chuàng)建的數(shù)據(jù)庫名。本示例為flink_cdc。

      jdbc-url

      用于在StarRocks中執(zhí)行查詢操作。

      例如,jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030。其中,fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com為EMR Serverless StarRocks實(shí)例FE節(jié)點(diǎn)的內(nèi)網(wǎng)地址。

      說明

      關(guān)于如何獲取EMR Serverless StarRocks實(shí)例FE節(jié)點(diǎn)的內(nèi)網(wǎng)地址,請參見查看實(shí)例列表與詳情。

      username

      StarRocks連接用戶名。固定為admin。

      table-name

      本示例固定值為customers。

      connector

      固定值為starrocks。

  6. 執(zhí)行以下命令,啟動Flink任務(wù)。

     /opt/apps/FLINK/flink-current/bin/sql-client.sh -f demo.sql

步驟四:驗(yàn)證數(shù)據(jù)

  1. 使用步驟一:創(chuàng)建MySQL源數(shù)據(jù)表中創(chuàng)建的測試賬號連接MySQL實(shí)例,具體操作請參見通過DMS登錄RDS MySQL。

  2. 在RDS數(shù)據(jù)庫窗口執(zhí)行以下命令,向表orders和customers中插入數(shù)據(jù)。

    INSERT INTO flink_cdc.orders(order_id,order_revenue,order_region,customer_id) VALUES(1,10,"beijing",1);
    INSERT INTO flink_cdc.orders(order_id,order_revenue,order_region,customer_id) VALUES(2,10,"beijing",1);
    INSERT INTO flink_cdc.customers(customer_id,customer_age,customer_name) VALUES(1, 22, "emr_test");
  3. 連接EMR Serverless StarRocks實(shí)例,詳情請參見通過客戶端方式連接StarRocks實(shí)例。

  4. 執(zhí)行以下命令,查詢ODS層數(shù)據(jù)。

    1. 執(zhí)行以下命令,查看orders表信息。

      select * from flink_cdc.orders;

      返回信息如下所示。

      +----------+---------------+--------------+-------------+
      | order_id | order_revenue | order_region | customer_id |
      +----------+---------------+--------------+-------------+
      |        1 |            10 | beijing      |           1 |
      |        2 |            10 | beijing      |           1 |
      +----------+---------------+--------------+-------------+
    2. 執(zhí)行以下命令,查看customers表信息。

      select * from flink_cdc.customers;

      返回信息如下所示。

      +-------------+--------------+---------------+
      | customer_id | customer_age | customer_name |
      +-------------+--------------+---------------+
      |           1 |           22 | emr_test      |
      +-------------+--------------+---------------+
  5. 執(zhí)行以下命令,查詢DWD層數(shù)據(jù)。

    select * from flink_cdc.dwd_order_customer_valid;

    返回信息如下所示。

    +----------+---------------+--------------+-------------+--------------+---------------+
    | order_id | order_revenue | order_region | customer_id | customer_age | customer_name |
    +----------+---------------+--------------+-------------+--------------+---------------+
    |        1 |            10 | beijing      |           1 |           22 | emr_test      |
    |        2 |            10 | beijing      |           1 |           22 | emr_test      |
    +----------+---------------+--------------+-------------+--------------+---------------+
    2 rows in set (0.00 sec)
  6. 執(zhí)行以下命令,查詢DWS層數(shù)據(jù)。

    select * from flink_cdc.dws_agg_by_region;

    返回信息如下所示。

    +--------------+-----------+---------------------+
    | order_region | order_cnt | order_total_revenue |
    +--------------+-----------+---------------------+
    | beijing      |         2 |                  20 |
    +--------------+-----------+---------------------+
    1 row in set (0.01 sec)