通過外表導入至湖倉版
云原生數據倉庫 AnalyticDB MySQL 版在通過外表訪問并導入MaxCompute數據時,默認使用Tunnel Record API方式。您也可以進一步選擇Tunnel Arrow API方式,相較于Tunnel Record API方式,Tunnel Arrow API方式可以列式讀取MaxCompute的數據,從而提高數據訪問和處理的效率。
前提條件
AnalyticDB for MySQL集群的產品系列為企業版、基礎版或湖倉版。
MaxCompute項目與AnalyticDB for MySQL集群位于同一地域。
AnalyticDB for MySQL集群已開啟ENI訪問。
說明登錄云原生數據倉庫AnalyticDB MySQL控制臺,在 的網絡信息區域,打開ENI網絡開關。
已添加AnalyticDB for MySQL的VPC網段到MaxCompute項目的白名單中。
說明登錄云原生數據倉庫AnalyticDB MySQL控制臺,在集群信息頁面查詢VPC ID。然后登錄專有網絡控制臺,在專有網絡頁面根據VPC ID查詢網段。設置MaxCompute白名單的操作,請參見管理IP白名單。
使用Tunnel Arrow API方式訪問并導入MaxCompute數據時,AnalyticDB for MySQL集群需為3.2.2.1及以上版本。
說明查看企業版或湖倉版集群的內核版本,請執行
SELECT adb_version();
。如需升級內核版本,請聯系技術支持。
示例數據
本文示例中的MaxCompute項目為test_adb
,示例表person
。示例如下:
CREATE TABLE IF NOT EXISTS person (
id INT,
name VARCHAR(1023),
age INT)
partitioned BY (dt string);
在person
表中添加分區,示例如下:
ALTER TABLE person
ADD
PARTITION (dt='202207');
向分區中添加數據,示例如下:
INSERT INTO test_adb.person
PARTITION (dt='202207')
VALUES (1,'james',10),(2,'bond',20),(3,'jack',30),(4,'lucy',40);
開啟Arrow API(可選)
默認情況下,AnalyticDB for MySQL集群會使用Tunnel Record API方式訪問并導入MaxCompute數據。 若您需要通過Tunnel Arrow API方式訪問并導入MaxCompute數據,請先開啟Arrow API功能。開啟后,AnalyticDB for MySQL集群會使用Tunnel Arrow API方式進行導入。
開啟方法
您可以通過SET命令或Hint在集群級別和查詢級別開啟Arrow API:
集群級別開啟Arrow API:
SET ADB_CONFIG <config_name>= <value>;
查詢級別開啟Arrow API:
/*<config_name>= <value>*/ SELECT * FROM table;
Arrow API相關配置參數
參數(config_name) | 說明 |
ODPS_TUNNEL_ARROW_ENABLED | 是否開啟Arrow API。取值:
|
ODPS_TUNNEL_SPLIT_BY_SIZE_ENABLED | 是否開啟動態Split切分。取值:
|
操作步驟
數據導入方式分為常規導入(默認)和彈性導入。常規導入在計算節點中讀取源數據,然后在存儲節點中構建索引,消耗計算資源和存儲資源。彈性導入在Serverless Spark Job中讀取源數據和構建索引,消耗Job型資源組的資源。僅內核版本3.1.10.0及以上且已創建Job型資源組的集群支持彈性導入數據。相較于常規導入,彈性導入可以大幅減少資源的消耗,降低導入過程中對在線讀寫業務的影響,提升資源隔離性和數據導入效率。更多內容,請參見數據導入方式介紹。
常規導入
進入SQL編輯器。
登錄云原生數據倉庫AnalyticDB MySQL控制臺,在左上角選擇集群所在地域。在左側導航欄,單擊集群列表,在企業版、基礎版或湖倉版頁簽下,單擊目標集群ID。
在左側導航欄,單擊 。
創建外部數據庫。示例如下:
CREATE EXTERNAL DATABASE adb_external_db;
創建外表。本文示例為
test_adb
。CREATE EXTERNAL TABLE IF NOT EXISTS adb_external_db.test_adb ( id int, name varchar(1023), age int, dt string ) ENGINE='ODPS' TABLE_PROPERTIES='{ "accessid":"LTAILd4****", "endpoint":"http://service.cn-hangzhou.maxcompute.aliyun.com/api", "accesskey":"4A5Q7ZVzcYnWMQPysX****", "partition_column":"dt", "project_name":"test_adb", "table_name":"person" }';
說明AnalyticDB for MySQL外表和MaxCompute中表的字段名稱、字段數量、字段順序需要一致,字段類型需要兼容。
外表的參數說明,請參見CREATE EXTERNAL TABLE。
查詢數據。
SELECT * FROM adb_external_db.test_adb;
返回結果如下:
+------+-------+------+---------+ | id | name | age | dt | +------+-------+------+---------+ | 1 | james | 10 | 202207 | | 2 | bond | 20 | 202207 | | 3 | jack | 30 | 202207 | | 4 | lucy | 40 | 202207 | +------+-------+------+---------+ 4 rows in set (0.35 sec)
執行以下步驟將MaxCompute數據導入至AnalyticDB for MySQL。
在AnalyticDB for MySQL中創建數據庫,示例如下:
CREATE DATABASE adb_demo;
在AnalyticDB for MySQL中創建表用于存儲從MaxCompute中導入的數據,示例如下:
說明新表和步驟3中創建的外表的字段順序和字段數量需要一致,字段類型兼容。
CREATE TABLE IF NOT EXISTS adb_demo.adb_import_test( id int, name string, age int, dt string PRIMARY KEY(id,dt) ) DISTRIBUTED BY HASH(id) PARTITION BY VALUE('dt');
向表中寫入數據,示例如下:
方式一:執行INSERT INTO導入數據,當主鍵重復時會自動忽略當前寫入數據,不做更新,作用等同于
INSERT IGNORE INTO
,詳情請參見INSERT INTO。示例如下:INSERT INTO adb_demo.adb_import_test SELECT * FROM adb_external_db.test_adb;
如果需要將特定分區的數據導入
adb_demo.adb_import_test
,可以執行:INSERT INTO adb_demo.adb_import_test SELECT * FROM adb_external_db.test_adb WHERE dt = '202207';
方式二:執行INSERT OVERWRITE INTO導入數據,會覆蓋表中原有的數據。示例如下:
INSERT OVERWRITE INTO adb_demo.adb_import_test SELECT * FROM adb_external_db.test_adb;
方式三:異步執行INSERT OVERWRITE INTO導入數據。通常使用
SUBMIT JOB
提交異步任務,由后臺調度,可以在寫入任務前增加Hint(/*+ direct_batch_load=true*/
)加速寫入任務。詳情請參見異步寫入。示例如下:SUBMIT job INSERT OVERWRITE INTO adb_demo.adb_import_test SELECT * FROM adb_external_db.test_adb;
返回結果如下:
+---------------------------------------+ | job_id | +---------------------------------------+ | 2020112122202917203100908203303****** | +---------------------------------------+
關于異步提交任務詳情,請參見異步提交導入任務。
彈性導入
進入SQL編輯器。
登錄云原生數據倉庫AnalyticDB MySQL控制臺,在左上角選擇集群所在地域。在左側導航欄,單擊集群列表,在企業版、基礎版或湖倉版頁簽下,單擊目標集群ID。
在左側導航欄,單擊 。
創建數據庫。如果有已創建的數據庫,可以忽略本步驟。示例如下:
CREATE DATABASE adb_demo;
創建外表。
說明AnalyticDB for MySQL外表的名稱需要和MaxCompute項目的名稱相同,否則創建外表會失敗。
AnalyticDB for MySQL外表和MaxCompute中表的字段名稱、字段數量、字段順序需要一致,字段類型需要兼容。
彈性導入僅支持
CREATE TABLE
語句創建外表。
CREATE TABLE IF NOT EXISTS test_adb ( id int, name string, age int, dt string ) ENGINE='ODPS' TABLE_PROPERTIES='{ "endpoint":"http://service.cn-hangzhou.maxcompute.aliyun-inc.com/api", "accessid":"LTAILd4****", "accesskey":"4A5Q7ZVzcYnWMQPysX****", "partition_column":"dt", "project_name":"test_adb", "table_name":"person" }';
外表支持設置的參數及參數說明,請參見參數說明。
查詢數據。
SELECT * FROM adb_demo.test_adb;
返回結果如下:
+------+-------+------+---------+ | id | name | age | dt | +------+-------+------+---------+ | 1 | james | 10 | 202207 | | 2 | bond | 20 | 202207 | | 3 | jack | 30 | 202207 | | 4 | lucy | 40 | 202207 | +------+-------+------+---------+ 4 rows in set (0.35 sec)
在AnalyticDB for MySQL中創建表用于存儲從MaxCompute中導入的數據。示例如下:
說明創建的內表和步驟3中創建的外表的字段名稱、字段數量、字段順序、字段類型必須相同。
CREATE TABLE IF NOT EXISTS adb_import_test ( id int, name string, age int, dt string, PRIMARY KEY(id,dt) ) DISTRIBUTED BY HASH(id) PARTITION BY VALUE('dt') LIFECYCLE 30;
導入數據。
重要彈性導入僅支持通過
INSERT OVERWRITE INTO
語句導入數據。方法一:執行INSERT OVERWRITE INTO彈性導入數據,會覆蓋表中原有的數據。示例如下:
/*+ elastic_load=true, elastic_load_configs=[adb.load.resource.group.name=resource_group|spark.adb.eni.vswitchId=vsw-bp12ldm83z4zu9k4d****]*/ INSERT OVERWRITE INTO adb_demo.adb_import_test SELECT * FROM adb_demo.test_adb;
方法二:異步執行INSERT OVERWRITE INTO彈性導入數據。通常使用
SUBMIT JOB
提交異步任務,由后臺調度。/*+ elastic_load=true, elastic_load_configs=[adb.load.resource.group.name=resource_group|spark.adb.eni.vswitchId=vsw-bp12ldm83z4zu9k4d****]*/ SUBMIT JOB INSERT OVERWRITE INTO adb_demo.adb_import_test SELECT * FROM adb_demo.test_adb;
重要異步提交彈性導入任務時,不支持設置優先級隊列。
返回結果如下:
+---------------------------------------+ | job_id | +---------------------------------------+ | 2023081517192220291720310090151****** | +---------------------------------------+
使用
SUBMIT JOB
提交異步任務后,返回結果僅表示異步任務提交成功。您可以通過job_id終止異步任務或查詢異步任務狀態,判斷任務是否執行成功。具體操作,請參見異步提交導入任務。Hint參數說明:
elastic_load:是否使用彈性導入方式。取值:true或false(默認值)。
elastic_load_configs:彈性導入方式支持配置的參數。參數需使用方括號([ ])括起來,且多個參數之間以豎線(|)分隔,支持配置的參數如下表所示:
參數
是否必填
說明
adb.load.resource.group.name
是
執行彈性導入任務的Job資源組名稱。
adb.load.job.max.acu
否
單個彈性導入任務最多使用的資源。單位為ACU,最小值為5 ACU。默認值為集群Shard個數+1。
執行如下語句可查詢集群Shard個數:
SELECT count(1) FROM information_schema.kepler_meta_shards;
spark.driver.resourceSpec
否
Spark driver的資源規格。默認值為small。取值范圍,請參見Spark資源規格列表的型號列。
spark.executor.resourceSpec
否
Spark executor的資源規格。默認值為large。取值范圍,請參見Spark資源規格列表的型號列。
spark.adb.executorDiskSize
否
Spark executor的磁盤容量,取值范圍為(0,100],單位為GiB,默認值為10 Gi。更多信息,請參見指定Driver和Executor資源。
(可選)查看已提交的導入任務是否為彈性導入任務。
SELECT job_name, (job_type = 3) AS is_elastic_load FROM INFORMATION_SCHEMA.kepler_meta_async_jobs WHERE job_name = "2023081818010602101701907303151******";
返回結果如下:
+---------------------------------------+------------------+ | job_name | is_elastic_load | +---------------------------------------+------------------+ | 2023081517195203101701907203151****** | 1 | +---------------------------------------+------------------+
is_elastic_load
的返回值為1,表示已提交的導入任務是彈性導入任務;若為0,則表示已提交的導入任務是常規導入任務。