MaxCompute新增支持將MapReduce作業指定為SQL運行時(Runtime)執行模式,基于SQL運行時,MapReduce作業可以應用SQL引擎的各種新特性,實現之前不支持的功能。本文為您介紹如何將MapReduce作業指定為SQL運行時執行模式。
背景信息
MaxCompute提供了MapReduce編程接口,您可以使用MapReduce提供的接口(Java API)編寫MapReduce程序處理MaxCompute的中的數據。
新版本MaxCompute支持將MapReduce作業指定為SQL運行時執行模式,基于SQL運行時,MapReduce可以使用MaxCompute SQL引擎編譯器、基于代價的優化器和向量化執行的執行引擎,同時可以復用SQL引擎開發的各種新特性,包括功能、性能、穩定性等方面不斷的優化改進。
MapReduce指定為SQL運行時執行后可以使用MaxCompute SQL的新特性,可以滿足原來MapReduce不支持的功能。相比原來MapReduce,新增如下功能:
支持輸入源為視圖。
支持輸入源為外部表。
支持分布式文件系統讀寫。
支持Hash或Range聚簇表讀寫。
此外還新增支持如下能力:
可以持續獲得SQL在CBO優化器和向量化執行引擎等方面的持續性能優化收益。
可以使用新的存儲格式壓縮機制。
支持動態調整并發度,提高輸入表為HashCluster等超大表Join場景的性能。
可以獲得經過大量作業和壓力驗證的SQL引擎穩定性優勢,Failover和PVC等機制更有保障。
MaxCompute Studio和Logview針對SQL作業提供了更詳細的執行信息、執行計劃、編譯信息、作業配置信息等,可以對任務運行各階段輸入輸出及整體流程一目了然,能方便的定位問題和優化,提高開發和運維效率。
注意事項
該功能不需要您對原接口和作業邏輯做任何改動,只需要指定執行模式。
該功能僅支持MapReduce接口編寫的MapReduce作業,MapReduce接口請參見原生SDK概述。
基于SQL運行時執行模式運行MapReduce作業,MapReduce作業仍然按照MapReduce計費規則進行計費,詳情請參見MapReduce按量付費。
使用說明
設置執行模式。
執行模式可以通過
odps.mr.run.mode
開關控制,取值如下:lot
(默認):使用MapReduce引擎執行MapReduce任務。sql
:使用SQL引擎執行MapReduce任務,即SQL運行時執行模式。如果執行不通過,則會報錯。hybrid
:優先嘗試使用SQL引擎執行,不通過時會回退到MapReduce引擎執行。
您可通過兩種方式設置執行模式:
Project級別控制
全局性統一打開,針對所有作業有效,需要Project管理員執行如下命令:
setproject odps.mr.run.mode=<lot/sql/hybrid>;
Session級控制
只針對當前作業有效,有如下兩種方式:
運行JAR語句前添加
set odps.mr.run.mode=<lot/sql/hybrid>
語句。在作業代碼中通過
job
設置,如下所示:JobConf job = new JobConf(); job.set("odps.mr.run.mode","hybrid")
說明對于特殊功能場景StreamJob和SecondarySort的功能,需要設置以下Flag:
StreamJob:
set odps.mr.sql.stream.enable=true;
。SecondarySort:
set odps.mr.sql.group.enable=true;
。
查看運行詳情。
您可通過Logview、MaxCompute Studio等查看MapReduce基于SQL運行時執行在客戶端生成的SQL表達式,以及作業的運行詳情,Logview操作說明請參見使用Logview 2.0查看作業運行信息。
Logview XML
打開Logview,在Source XML頁簽中查看客戶端提交的XML信息,顯示了MapReduce作業同義的SQL表達。示例如下所示:
create temporary function mr2sql_mapper_152955927079392291755 as 'com.aliyun.odps.mapred.bridge.LotMapperUDTF' using ; create temporary function mr2sql_reducer_152955927079392291755 as 'com.aliyun.odps.mapred.bridge.LotReducerUDTF' using ; @sub_query_mapper := SELECT k_id,v_gmt_create,v_gmt_modified,v_product_id,v_admin_seq,v_sku_attr,v_sku_price,v_sku_stock,v_sku_code,v_sku_image,v_delivery_time,v_sku_bulk_order,v_sku_bulk_discount,v_sku_image_version,v_currency_code FROM( SELECT mr2sql_mapper_152955927079392291755(id,gmt_create,gmt_modified,product_id,admin_seq,sku_attr,sku_price,sku_stock,sku_code,sku_image,delivery_time,sku_bulk_order,sku_bulk_discount,sku_image_version,currency_code ) as (k_id,v_gmt_create,v_gmt_modified,v_product_id,v_admin_seq,v_sku_attr,v_sku_price,v_sku_stock,v_sku_code,v_sku_image,v_delivery_time,v_sku_bulk_order,v_sku_bulk_discount,v_sku_image_version,v_currency_code) FROM ae_antispam.product_sku_tt_inc WHERE ds = "20180615" AND hh = "21" UNION ALL SELECT mr2sql_mapper_152955927079392291755(id,gmt_create,gmt_modified,product_id,admin_seq,sku_attr,sku_price,sku_stock,sku_code,sku_image,delivery_time,sku_bulk_order,sku_bulk_discount,sku_image_version,currency_code ) as (k_id,v_gmt_create,v_gmt_modified,v_product_id,v_admin_seq,v_sku_attr,v_sku_price,v_sku_stock,v_sku_code,v_sku_image,v_delivery_time,v_sku_bulk_order,v_sku_bulk_discount,v_sku_image_version,v_currency_code) FROM ae_antispam.product_sku ) open_mr_alias1 DISTRIBUTE BY k_id SORT BY k_id ASC; @sub_query_reducer := SELECT mr2sql_reducer_152955927079392291755(k_id,v_gmt_create,v_gmt_modified,v_product_id,v_admin_seq,v_sku_attr,v_sku_price,v_sku_stock,v_sku_code,v_sku_image,v_delivery_time,v_sku_bulk_order,v_sku_bulk_discount,v_sku_image_version,v_currency_code) as (id,gmt_create,gmt_modified,product_id,admin_seq,sku_attr,sku_price,sku_stock,sku_code,sku_image,delivery_time,sku_bulk_order,sku_bulk_discount,sku_image_version,currency_code) FROM @sub_query_mapper; FROM @sub_query_reducer INSERT OVERWRITE TABLE ae_antispam.product_sku SELECT id,gmt_create,gmt_modified,product_id,admin_seq,sku_attr,sku_price,sku_stock,sku_code,sku_image,delivery_time,sku_bulk_order,sku_bulk_discount,sku_image_version,currency_code ;
Logview Summary
在Logview的Summary頁簽您可以看到作業的執行采用了SQL運行時的執行引擎
execution engine
,示例如下:說明未開啟SQL運行時執行模式的MapReduce作業無執行引擎信息,未開啟SQL運行時執行模式的MaxCompute擴展MapReduce(MR2)作業的執行引擎為
cganjiang
。Job run mode: fuxi job Job run engine: execution engine
Logview JSONSummary
MapReduce的JSONSummary信息僅包含了簡單的Map和Reduce輸入輸出信息。 SQL的JSONSummary信息可以查看SQL執行各階段的詳細信息,包含所有執行參數、邏輯計劃、物理計劃和執行詳情等。示例如下所示:
"midlots" : [ "LogicalTableSink(table=[[odps_flighting.flt_20180621104445_step1_ad_quality_tech_qp_algo_antifake_wordbag_filter_bag_change_result_lv2_20, auctionid,word,match_word(3) {0, 1, 2}]]) OdpsLogicalProject(auctionid=[$0], word=[$1], match_word=[$2]) OdpsLogicalProject(auctionid=[$0], word=[$1], match_word=[$2]) OdpsLogicalProject(auctionid=[$0], word=[$1], match_word=[$2]) OdpsLogicalProject(auctionid=[$2], word=[$3], match_word=[$4]) OdpsLogicalTableFunctionScan(invocation=[[MR2SQL_MAPPER_152955294118813063732($0, $1)]()], rowType=[RecordType(VARCHAR(2147483647) item_id, VARCHAR(2147483647) text, VARCHAR(2147483647) __tf_0_0, VARCHAR(2147483647) __tf_0_1, VARCHAR(2147483647) __tf_0_2)]) OdpsLogicalTableScan(table=[[ad_quality_tech.qp_algo_antifake_wordbag_filter_bag_change_lv2_20, item_id,text(2) {0, 1}]]) ]