本文為您介紹Hudi與Spark SQL集成后,支持的DML語句。
前提條件
已創建包含Spark和Hudi服務的集群,詳情請參見創建集群。
使用限制
EMR-3.36.0及后續版本和EMR-5.2.0及后續版本,支持Spark SQL對Hudi進行讀寫操作。
啟動方式
- Spark2和Spark3 hudi0.11以下版本
spark-sql \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
- Spark3 hudi0.11及以上版本
spark-sql \ --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \ --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \ --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
MERGE INTO
表示根據匹配條件執行插入、更新或刪除操作。
語法
MERGE INTO tableIdentifier AS target_alias USING (sub_query | tableIdentifier) AS source_alias ON <merge_condition> WHEN MATCHED [ AND <condition> ] THEN <matched_action> [ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ] [ WHEN NOT MATCHED [ AND <condition> ] THEN <not_matched_action> ] <merge_condition> =A equal bool condition <matched_action> = DELETE | UPDATE SET * | UPDATE SET column1 = value1 [, column2 = value2 ...] <not_matched_action> = INSERT * | INSERT (column1 [, column2 ...]) VALUES (value1 [, value2 ...])
示例
-- without delete merge into h0 as target using ( select 1 as id, 'a1' as name, 10.0 as price ) source on target.id = source.id when matched then update set * when not matched then insert *; -- with delete merge into h0 as target using ( select 1 as id, 'a1' as name, 10.0 as price ) source on target.id = source.id when matched then update set id = source.id, name = source.name, price = source.price when matched and name = 'delete' then delete when not matched then insert (id,name,price) values(id, name, price);
INSERT INTO
表示向分區表或非分區表插入數據。
代碼示例如下所示:
向非分區表h0中插入數據。
insert into h0 select 1, 'a1', 20;
向靜態分區表h_p0中插入數據。
insert into h_p0 partition(dt='2021-01-02') select 1, 'a1';
向動態分區表h_p0中插入數據。
insert into h_p0 partition(dt) select 1, 'a1', dt from s;
向動態分區(分區字段放在SELECT表達式最后面)h_p1表中插入數據。
insert into h_p1 select 1 as id, 'a1', '2021-01-03' as dt, '19' as hh;
重寫表h0中的數據。
insert overwrite table h0 select 1, 'a1', 20;
UPDATE語句
表示更新分區表或非分區表中行對應的單列或多列數據。
語法
UPDATE tableIdentifier SET column = EXPRESSION(,column = EXPRESSION);
示例:將表h0中id為1的price字段值更新為20。
update h0 set price=20 where id=1;
DELETE語句
表示刪除分區表或非分區表中滿足指定條件的單行或多行數據。
語法
DELETE FROM tableIdentifier [WHERE BOOL_EXPRESSION];
示例:刪除表h0中id大于100的數據。
delete from h0 where id>100;
文檔內容是否對您有幫助?