本文通過簡單的示例,在實時計算開發控制臺帶您快速體驗Paimon的基本功能,包括創建和刪除Paimon Catalog,創建和刪除Paimon Catalog表,向Paimon表寫入、更新以及消費數據。
前提條件
如果您使用RAM用戶或RAM角色等身份訪問,需要確認已具有實時計算開發控制臺相關權限,詳情請參見權限管理。
已創建Flink工作空間,詳情請參見開通實時計算Flink版。
已開通對象存儲OSS且Bucket的存儲類型為標準存儲,詳情請參見OSS控制臺快速入門。OSS用于存儲Paimon表的相關文件,包括數據文件與元數據文件等。
僅實時計算引擎VVR 8.0.5及以上版本支持Paimon表。
步驟一:創建Paimon Catalog
進入查詢腳本頁面。
登錄實時計算控制臺。
單擊目標工作空間操作列下的控制臺。
在左側導航欄,單擊
,在查詢腳本頁簽,新建查詢腳本。
在文本編輯區域,輸入如下代碼創建Paimon Catalog。
-- my-Catalog為自定義的Catalag名稱 CREATE Catalog `my-catalog` WITH ( 'type' = 'paimon', 'metastore' = 'filesystem', 'warehouse' = '<warehouse>', 'fs.oss.endpoint' = '<fs.oss.endpoint>', 'fs.oss.accessKeyId' = '<fs.oss.accessKeyId>', 'fs.oss.accessKeySecret' = '<fs.oss.accessKeySecret>' );
參數配置項如下:
配置項
說明
是否必填
備注
type
Catalog類型。
是
固定值為Paimon。
metastore
元數據存儲類型。
是
本文示例填寫filesystem,其他類型詳情請參考管理Paimon Catalog。
warehouse
OSS服務中所指定的數倉目錄。
是
格式為oss://<bucket>/<object>。其中:
bucket:表示您創建的OSS Bucket名稱。
object:表示您存放數據的路徑。
請在OSS管理控制臺上查看您的Bucket和Object名稱。
fs.oss.endpoint
OSS服務的連接地址。
否
當warehouse指定的OSS Bucket與Flink工作空間不在同一地域,或使用其它賬號下的OSS bucket時需要填寫。
請參見訪問域名和數據中心。
說明如果需要將Paimon表存儲在OSS-HDFS中,則fs.oss.endpoint、fs.oss.accessKeyId與fs.oss.accessKeySecret均需要填寫,其中fs.oss.endpoint的值為cn-<region>.oss-dls.aliyuncs.com,例如cn-hangzhou.oss-dls.aliyuncs.com。
fs.oss.accessKeyId
擁有讀寫OSS權限的阿里云賬號或RAM賬號的AccessKey。
否
當warehouse指定的OSS Bucket與Flink工作空間不在同一地域,或使用其它賬號下的OSS Bucket時需要填寫。獲取方法請參見創建AccessKey。
fs.oss.accessKeySecret
擁有讀寫OSS權限的阿里云賬號或RAM賬號的AccessKey Secret。
否
選中創建Paimon Catalog代碼,單擊左側的運行。
返回??
The following statement has been executed successfully!
信息表示Catalog創建成功。
步驟二:創建Paimon Catalog表
在查詢腳本頁面,輸入如下代碼創建名為my_db的Paimon數據庫以及名為my_tbl的Paimon表。
CREATE DATABASE `my-catalog`.`my_db`; CREATE TABLE `my-catalog`.`my_db`.`my_tbl` ( dt STRING, id BIGINT, content STRING, PRIMARY KEY (dt, id) NOT ENFORCED ) PARTITIONED BY (dt) WITH ( 'changelog-producer' = 'lookup' );
說明為了后續能夠流式消費Paimon表,本示例在WITH中指定了
changelog-producer
參數為lookup
,表示采用查找策略生成變更日志,詳情請參見變更數據產生機制。選中創建Paimon數據庫以及Paimon表的代碼,單擊左側的運行。
返回??
The following statement has been executed successfully!
信息表示名為my_db的Paimon數據庫以及名為my_tbl的Paimon表創建成功。
步驟三:向Paimon表寫入數據
在SQL作業開發。將如下插入語句復制到SQL編輯器。
頁面的作業草稿頁簽,新建空白流作業草稿,詳情請參見-- Paimon結果表在每次檢查點完成之后才會正式提交數據。 -- 此處將檢查點間隔縮短為10s,是為了更快地提交數據。 -- 在生產環境下,系統檢查點的間隔與兩次系統檢查點之間的最短時間間隔根據業務對延時要求的不同,一般設置為1分鐘到10分鐘。 SET 'execution.checkpointing.interval'='10s'; INSERT INTO `my-catalog`.`my_db`.`my_tbl` VALUES ('20240108',1,'apple'), ('20240108',2,'banana'), ('20240109',1,'cat'), ('20240109',2,'dog');
在SQL編輯區域右上方,單擊部署,在部署新版本對話框中,根據需要填寫或選中相關內容,單擊確定。
在
頁面,單擊目標作業名稱操作列下的啟動,選擇無狀態啟動,單擊啟動。當作業狀態變為已完成,表示已完成數據的寫入。
步驟四:從Paimon表中流式消費數據
新建空白流作業草稿,將如下SQL代碼復制到SQL編輯器,使用print連接器,輸出my_tbl表中所有數據到日志中。
CREATE TEMPORARY TABLE Print ( dt STRING, id BIGINT, content STRING ) WITH ( 'connector' = 'print' ); INSERT INTO Print SELECT * FROM `my-catalog`.`my_db`.`my_tbl`;
在SQL編輯區域右上方,單擊部署,在部署新版本對話框中,根據需要填寫或選中相關內容,單擊確定。
在
頁面,單擊目標作業操作列下的啟動,選擇無狀態啟動,單擊啟動。在作業運維詳情頁面,查看Flink計算結果。
在
頁面,單擊目標作業名稱。在作業日志頁簽下的運行日志頁面,單擊運行Task Managers頁簽下的Path, ID。
單擊Stdout后,查看消費的Paimon數據。
步驟五:向Paimon表更新數據
新建空白流作業草稿,將如下SQL代碼復制到SQL編輯器。
SET 'execution.checkpointing.interval' = '10s'; INSERT INTO `my-catalog`.`my_db`.`my_tbl` VALUES ('20240108', 1, 'hello'), ('20240109', 2, 'world');
在SQL編輯區域右上方,單擊部署,在部署新版本對話框中,根據需要填寫或選中相關內容,單擊確定。
在
頁面,單擊目標作業操作列下的啟動,選擇無狀態啟動,單擊啟動。當作業狀態變為已完成,表示已完成數據的寫入。
在步驟四作業的作業運維頁面的Stdout頁簽下,查看向Paimon表中更新的數據。
步驟六(可選):停止流式消費作業并清理資源
測試完畢后,如果您需要停止流式消費作業并清理資源,可參考以下步驟:
在
頁面,單擊目標作業操作列的停止,停止作業運行。在查詢腳本頁簽的文本編輯區域,輸入如下代碼,刪除Paimon數據文件以及Paimon Catalog。
DROP DATABASE `my-catalog`.`my_db` CASCADE; --將刪除存儲在OSS上的整個Paimon數據庫的數據文件。 DROP CATALOG `my-catalog`; --將Paimon Catalog從實時計算開發控制臺元數據中刪除,但對OSS上的數據文件沒有影響。
返回??
The following statement has been executed successfully!
信息表示Paimon數據文件以及Paimon Catalog刪除成功。
相關文檔
向Paimon表寫入數據或從Paimon表消費數據,詳情請參見Paimon表數據寫入和消費。
修改Paimon表結構(例如增加列、修改列類型),以及臨時修改表參數,詳情請參見修改表結構。
不同場景下Paimon主鍵表和Append Scalable表的常用優化,詳情請參見Paimon性能優化。
關于Paimon的常見問題,詳情請參見上下游存儲。