日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

Paimon與Flink集成

E-MapReduce支持通過Flink SQL對Paimon進行讀寫操作。本文通過示例為您介紹如何通過Flink SQL對Paimon進行讀寫操作。

前提條件

已創建選擇了Flink和Paimon的DataFlow或Custom類型的集群,創建集群詳情請參見創建集群

說明

如果您需要使用Hive Catalog的方式,則只能創建選擇了Flink、Paimon和Hive的Custom類型的集群,且元數據類型僅可選擇自建RDS內置MySQL

使用限制

  • EMR-3.46.0版本,暫不支持DLF Catalog和Hive Catalog的方式。

  • EMR-3.46.0至EMR-3.50.X版本、EMR-5.12.0至EMR-5.16.X版本的集群,支持使用Flink SQL對Paimon進行讀寫操作。

    說明

    EMR-3.51.X及其后續版本、EMR-5.17.X及其后續版本,建議您參考Paimon社區文檔,在EMR集群中自行配置。

操作步驟

步驟一:配置依賴

本文介紹了通過Flink SQL對Paimon進行讀寫操作的三種方法:Filesystem Catalog、Hive Catalog和DLF Catalog。每種方法針對不同的應用場景和環境需求,請根據所選方法配置相應的依賴。

Filesystem Catalog

cp /opt/apps/PAIMON/paimon-current/lib/flink/*.jar /opt/apps/FLINK/flink-current/lib/

Hive Catalog

cp /opt/apps/PAIMON/paimon-current/lib/flink/*.jar /opt/apps/FLINK/flink-current/lib/
cp /opt/apps/FLINK/flink-current/opt/catalogs/hive-2.3.6/*.jar /opt/apps/FLINK/flink-current/lib/

創建DLF Catalog

cp /opt/apps/PAIMON/paimon-current/lib/flink/*.jar /opt/apps/FLINK/flink-current/lib/
cp /opt/apps/PAIMON/paimon-current/lib/jackson/*.jar /opt/apps/FLINK/flink-current/lib/
cp /opt/apps/METASTORE/metastore-*/hive2/*.jar /opt/apps/FLINK/flink-current/lib/
cp /opt/apps/FLINK/flink-current/opt/catalogs/hive-2.3.6/*.jar /opt/apps/FLINK/flink-current/lib/

步驟二:啟動集群

本文以Session模式為例,其余模式請參見基礎使用

執行以下命令,啟動YARN Session。

yarn-session.sh --detached

步驟三:創建Catalog

Paimon將數據和元數據都保存在文件系統(例如,HDFS)或對象存儲(例如,OSS-HDFS)中,存儲的根路徑由warehouse參數指定。如果指定的warehouse路徑不存在,將會自動創建該路徑;如果指定的warehouse路徑存在,則可以通過該Catalog訪問路徑中已有的表。

您還可以將元數據額外同步到Hive或DLF中,方便其他服務訪問Paimon。

說明

EMR-3.46.0和EMR-5.17.0版本,暫不支持DLF Catalog和Hive Catalog的方式。

創建Filesystem Catalog

Filesystem Catalog僅將元數據保存在文件系統或對象存儲中。

  1. 執行以下命令,啟動Flink SQL。

    sql-client.sh
  2. 執行以下Flink SQL語句,創建Filesystem Catalog。

    CREATE CATALOG test_catalog WITH (
        'type' = 'paimon',
        'metastore' = 'filesystem',
        'warehouse' = 'oss://<yourBucketName>/warehouse'
    );

創建Hive Catalog

Hive Catalog會同步元數據到Hive MetaStore中。在Hive Catalog中創建的表可以直接在Hive中查詢。

Hive查詢Paimon,詳情請參見Paimon與Hive集成

  1. 執行以下命令,啟動Flink SQL。

    sql-client.sh
    說明

    即使您使用的是Hive3,也無需修改啟動命令。

  2. 執行以下Flink SQL語句,創建Hive Catalog。

    CREATE CATALOG test_catalog WITH (
        'type' = 'paimon',
        'metastore' = 'hive',
        'uri' = 'thrift://master-1-1:9083', -- uri參數指向Hive metastore service的地址。
        'warehouse' = 'oss://<yourBucketName>/warehouse'
    );

創建DLF Catalog

DLF Catalog會將元數據同步到DLF中。

重要

創建集群時,元數據必須為DLF統一元數據

  1. 執行以下命令,啟動Flink SQL。

    sql-client.sh
    說明

    即使您使用的是Hive3,也無需修改啟動命令。

  2. 執行以下Flink SQL語句,創建DLF Catalog。

    CREATE CATALOG test_catalog WITH (
        'type' = 'paimon',
        'metastore' = 'dlf',
        'hive-conf-dir' = '/etc/taihao-apps/flink-conf',
        'warehouse' = 'oss://<yourBucketName>/warehouse'
    );

步驟四 :流作業讀寫Paimon

執行以下Flink SQL語句,在Catalog中創建一張表,并讀寫表中的數據。

-- 設置為流作業。
SET 'execution.runtime-mode' = 'streaming';

-- Paimon在流作業中需要設置checkpoint。
SET 'execution.checkpointing.interval' = '10s';

-- 使用之前創建的catalog。
USE CATALOG test_catalog;

-- 創建并使用一個測試database。
CREATE DATABASE test_db;
USE test_db;

-- 用datagen產生隨機數據。
CREATE TEMPORARY TABLE datagen_source (
    uuid int,
    kind int,
    price int
) WITH (
    'connector' = 'datagen',
    'fields.kind.min' = '0',
    'fields.kind.max' = '9',
    'rows-per-second' = '10'
);

-- 創建Paimon表。
CREATE TABLE test_tbl (
    uuid int,
    kind int,
    price int,
    PRIMARY KEY (uuid) NOT ENFORCED
);

-- 向Paimon中寫入數據。
INSERT INTO test_tbl SELECT * FROM datagen_source;

-- 讀取表中的數據。
-- 流式查詢作業運行的過程中,上面觸發的流式寫入作業仍在運行。
-- 您需要保證Flink集群有足夠的資源(task slot)同時運行兩個作業,否則無法查到數據。
SELECT kind, SUM(price) FROM test_tbl GROUP BY kind;

步驟五:OLAP查詢Paimon

執行以下Flink SQL語句,對剛才創建的表進行OLAP查詢。

-- 設置為批作業。
RESET 'execution.checkpointing.interval';
SET 'execution.runtime-mode' = 'batch';

-- 使用tableau展示模式,在命令行中直接打出結果。
SET 'sql-client.execution.result-mode' = 'tableau';

-- 對表中數據進行查詢。
SELECT kind, SUM(price) FROM test_tbl GROUP BY kind;

步驟六:清理資源

重要

完成測試后,請手動停止流式寫入Paimon的作業,防止資源泄露。

停止作業后,執行以下Flink SQL語句,刪除剛才創建的表。

DROP TABLE test_tbl;