本文中含有需要您注意的重要提示信息,忽略該信息可能對您的業務造成影響,請務必仔細閱讀。
配置MaxCompute Catalog后,您可以在Flink全托管作業開發中直接訪問MaxCompute中存儲的表,無需再定義Schema。本文為您介紹如何在Flink全托管模式下創建、查看、使用及刪除MaxCompute Catalog。
背景信息
MaxCompute Catalog通過查詢MaxCompute服務來獲取MaxCompute中已存儲物理表的Schema信息,您無需在Flink SQL中聲明MaxCompute連接表的Schema便可以獲取具體的字段信息。MaxCompute Catalog具有以下功能特點:
MaxCompute Catalog中的數據庫名對應MaxCompute的項目名,您可以通過切換數據庫來使用不同MaxCompute項目中的表。
MaxCompute Catalog中的表名對應MaxCompute中存儲的物理表名,自動映射數據類型,無需再通過DDL語句手動注冊MaxCompute表,提升開發效率和正確性。
MaxCompute Catalog提供的表可以直接作為Flink SQL作業中的源表、維表和結果表使用。
在MaxCompute Catalog中創建表能夠自動在MaxCompute服務中創建對應的物理表,并自動映射數據類型,提升開發效率。
本文將從以下方面為您介紹如何管理MaxCompute Catalog:
使用限制
僅Flink計算引擎VVR 6.0.7及以上版本支持配置MaxCompute Catalog。
MaxCompute Catalog不支持創建數據庫,即MaxCompute中的項目。
MaxCompute Catalog不支持修改表結構。
MaxCompute Catalog不支持CREATE TABLE AS(CTAS)語句。
創建MaxCompute Catalog
支持UI與SQL命令兩種方式配置MaxCompute Catalog,推薦使用UI方式配置MaxCompute Catalog。
UI方式(推薦)
進入元數據管理頁面。
登錄實時計算控制臺,單擊目標工作空間操作列下的控制臺。
單擊元數據管理。
單擊創建Catalog,選擇ODPS后,單擊下一步。
填寫參數配置信息。
重要Catalog創建完成后,以下配置信息都不支持修改。如果需要修改,您需要刪除掉已創建的Catalog,重新進行創建。
參數
說明
類型
是否必填
備注
catalog name
MaxCompute Catalog的名稱。
String
是
請填寫為自定義的英文名。
endpoint
MaxCompute服務連接站點。
String
是
具體站點請參見Endpoint。
accessId
訪問MaxCompute服務所使用阿里云賬號的AccessKey ID。
String
是
該賬號需要對Catalog訪問的項目有admin權限。
accessKey
訪問MaxCompute服務所使用阿里云賬號的AccessKey Secret。
String
是
無。
project
Catalog中作為默認數據庫的MaxCompute項目名。
String
否
若不設置該值,默認項目為default。
說明Catalog創建成功后,元數據中將展示您填寫的項目和您上述填寫的阿里云賬號所創建的項目。
單擊確定。
創建完成后,元數據下即可查看新建的Catalog。
SQL方式
在數據查詢文本編輯區域,輸入配置MaxCompute Catalog的命令。
CREATE CATALOG `<catalogName>` WITH ( 'type' = 'odps', 'endpoint' = '<odpsEndpoint>', 'accessId' = '<aliyunAccountAccessId>', 'accessKey' = '<aliyunAccountAccessKey>', 'project' = '<defaultProject>', 'userAccount' = '<RAMUserAccount>' );
參數詳情如下表所示。
參數
說明
類型
是否必填
備注
catalogName
MaxCompute Catalog的名稱。
String
是
請填寫為自定義的英文名。
type
Catalog類型。
String
是
固定值為odps。
endpoint
MaxCompute服務連接站點。
String
是
具體站點請參見Endpoint。
accessId
訪問MaxCompute服務所使用阿里云賬號的AccessKey ID。
String
是
該賬號需要對Catalog訪問的項目有admin權限。
accessKey
訪問MaxCompute服務所使用阿里云賬號的AccessKey Secret。
String
是
無。
project
Catalog中作為默認數據庫的MaxCompute項目名。
String
否
若不設置該值,默認項目為default。
userAccount
阿里云賬號或RAM用戶名稱。
String
否
若使用的AccessKey非主賬號,僅對主賬號下的部分項目有admin權限,則需要設置該參數為賬號名稱,例如
RAM$[<account_name>:]<RAM_name>
,MaxCompute Catalog將僅展示該賬號有權限的項目列表。MaxCompute用戶權限管理參見用戶規劃與管理。
選中創建Catalog的代碼后,單擊左側代碼行數上的運行。
查看MaxCompute Catalog
UI方式(推薦)
進入元數據管理頁面。
登錄實時計算控制臺。
單擊目標工作空間操作列下的控制臺。
單擊元數據管理。
在Catalog列表頁面,查看Catalog名稱和類型。
如果您需要查看目標Catalog下的數據庫和表,請單擊查看。
SQL方式
在數據查詢文本編輯區域,輸入以下命令:
DESCRIBE `<catalogName>`.`<projectName>`.`<tableName>`;
參數
說明
catalogName
MaxCompute Catalog名稱。
projectName
MaxCompute中的項目名。
tableName
MaxCompute中存儲的物理表名。
選中查看Catalog的代碼后,單擊左側代碼行數上的運行。
運行成功后,可以在編輯區域下方的結果一欄中看到MaxCompute物理表在Flink中對應的Schema信息。
使用MaxCompute Catalog
通過Catalog創建MaxCompute物理表
通過Flink SQL DDL,在MaxCompute Catalog中創建表時,會自動在對應的MaxCompute項目中創建對應的物理表,并自動將Flink中的類型轉換為MaxCompute中的類型,支持創建非分區表和分區表。
創建非分區表示例:
CREATE TABLE `<catalogName>`.`<projectName>`.`<tableName>` (
f0 INT,
f1 BIGINT,
f2 DOUBLE,
f3 STRING
);
執行完成后,您可以在MaxCompute中查看對應項目中的表,可以看到已創建對應名字的非分區表,其列名稱、類型與Flink DDL中對應。
創建分區表示例:
CREATE TABLE `<catalogName>`.`<projectName>`.`<tableName>` (
f0 INT,
f1 BIGINT,
f2 DOUBLE,
f3 STRING,
ds STRING
) PARTITIONED BY (ds);
在Flink DDL的Schema末尾添加分區列,并在PARTITIONED BY
語句中聲明分區列名,執行完成后查看對應MaxCompute項目中的表,可以看到已創建對應名字的分區表,其普通列為f0、f1、f2、f3,分區列為ds。
MaxCompute中列名均為小寫,而Flink中列名區分大小寫,若DDL中列名包含大寫字母將被自動轉換成小寫,若DDL中包含多個轉換成小寫后同名的列,則會報錯。
從MaxCompute Catalog表中讀取數據
MaxCompute Catalog能夠從MaxCompute服務讀取物理表的Schema,因此無需在Flink中聲明對應Schema即可直接讀取數據。例如:
SELECT * FROM `<catalogName>`.`<projectName>`.`<tableName>`;
不聲明任何參數的默認行為為全量讀取所有分區,若您需要讀取特定分區,或使用增量源表模式,可以參考大數據計算服務MaxCompute中的參數設置,在SQL注釋中聲明,例如:
讀取特定分區:
SELECT * FROM `<catalogName>`.`<projectName>`.`<tableName>`
/*+ OPTIONS('partition' = 'ds=230613') */;
使用增量源表模式:
SELECT * FROM `<catalogName>`.`<projectName>`.`<tableName>`
/*+ OPTIONS('startPartition' = 'ds=230613') */;
使用維表模式:
SELECT * FROM `<anotherTable>` AS l LEFT JOIN
`<catalogName>`.`<projectName>`.`<tableName>`
/*+ OPTIONS('partition' = 'max_pt()', 'cache' = 'ALL') */
FOR SYSTEM_TIME AS OF l.proc_time AS r
ON l.id = r.id;
其他大數據計算服務MaxCompute中支持的源表和維表參數均可以通過該方式進行設置。但需要注意的是,MaxCompute Catalog中不保存Watermark信息,若需要在以源表讀取數據時指定Watermark,可以使用CREATE TABLE ... LIKE ...
語句,例如:
CREATE TABLE `<newTable>` ( WATERMARK FOR ts AS ts )
LIKE `<catalogName>`.`<projectName>`.`<tableName>`;
其中ts為MaxCompute物理表中類型為DATETIME的列,該類型可以在Flink中被設置為事件時間并添加Watermark信息,創建完成后,從newTable讀取的數據均帶有Watermark。
向MaxCompute Catalog表中寫入數據
MaxCompute Catalog支持以固定分區或動態分區模式寫入數據,參見結果表示例。例如有MaxCompute物理表有二級分區ds和hh,可以使用如下語句寫入數據:
-- 寫入固定分區
INSERT INTO `<catalogName>`.`<projectName>`.`<tableName>`
/*+ OPTIONS('partition' = 'ds=20231024,hh=09') */
SELECT <otherColumns>, '20231024', '09' FROM `<anotherTable>`;
-- 寫入動態分區
INSERT INTO `<catalogName>`.`<projectName>`.`<tableName>`
/*+ OPTIONS('partition' = 'ds,hh') */
SELECT <otherColumns>, ds, hh FROM `<anotherTable>`;
SELECT中,分區列需要按分區層級順序放置在其他普通列之后。
刪除MaxCompute Catalog
刪除MaxCompute Catalog不會影響已運行的作業,但會導致使用該Catalog下表的作業,在上線或重啟時報無法找到該表的錯誤,請您謹慎操作。
UI方式
進入元數據管理頁面。
登錄實時計算控制臺。
單擊目標工作空間操作列下的控制臺。
單擊元數據管理。
在Catalog列表頁面,單擊目標Catalog名稱對應操作列下的刪除。
在彈出的對話框中,單擊刪除。
說明刪除完成后,在左側元數據區域下即可查看目標Catalog已刪除。
SQL命令方式
在數據查詢文本編輯區域,輸入以下命令。
DROP CATALOG `<catalogName>`;
其中,<catalogName>為您要刪除的目標MaxCompute Catalog名稱。
警告刪除MaxCompute Catalog不會影響已運行的作業,但對未上線或者需要暫停恢復的作業均產生影響,請您謹慎操作。
選中刪除Catalog的命令,鼠標右鍵選擇運行。
在左側元數據區域,查看目標Catalog是否已刪除。
MaxCompute與Flink的類型映射
MaxCompute支持的類型參見2.0數據類型版本。
MaxCompute至Flink
讀取已有MaxCompute物理表時,字段的MaxCompute類型將按下表映射為Flink類型。
MaxCompute類型 | Flink類型 |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INTEGER |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL(precision, scale) | DECIMAL(precision, scale) |
CHAR(n) | CHAR(n) |
VARCHAR(n) | VARCHAR(n) |
STRING | STRING |
BINARY | BYTES |
DATE | DATE |
DATETIME | TIMESTAMP(3) |
TIMESTAMP | TIMESTAMP(9) |
ARRAY | ARRAY |
MAP | MAP |
STRUCT | ROW |
JSON | STRING |
Flink至MaxCompute
通過Flink DDL在Catalog創建MaxCompute表時,Flink DDL中的字段類型將按下表映射為MaxCompute類型。
Flink類型 | MaxCompute類型 |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INTEGER | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DECIMAL(precision, scale) | DECIMAL(precision, scale) |
CHAR(n) | CHAR(n) |
VARCHAR / STRING | STRING |
BINARY | BINARY |
VARBINARY / BYTES | BINARY |
DATE | DATE |
TIMESTAMP(n<=3) | DATETIME |
TIMESTAMP(3<n<=9) | TIMESTAMP |
ARRAY | ARRAY |
MAP | MAP |
ROW | STRUCT |