使用Databricks Delta優(yōu)化Spark作業(yè)性能
本文介紹如何使用Databricks Delta進(jìn)行Spark作業(yè)的優(yōu)化。
前提條件
已創(chuàng)建集群,詳情請(qǐng)參見(jiàn)創(chuàng)建集群。
集群應(yīng)滿(mǎn)足以下配置:
區(qū)域 | 詳情 |
---|---|
地域(Region) | 華北2(北京) |
集群規(guī)模 | 1個(gè)Master節(jié)點(diǎn),5個(gè)Worker節(jié)點(diǎn) |
ECS實(shí)例配置 | 配置如下:
說(shuō)明 ECS實(shí)例會(huì)因庫(kù)存等原因和實(shí)際售賣(mài)頁(yè)有出入。此處參數(shù)僅供參考,具體請(qǐng)您根據(jù)實(shí)際情況選擇相應(yīng)的實(shí)例規(guī)格進(jìn)行測(cè)試。 |
OSS寬帶 | 10Gbps |
背景信息
Databricks數(shù)據(jù)洞察內(nèi)置了Databricks商業(yè)版引擎,您可以利用Databricks數(shù)據(jù)洞察創(chuàng)建集群,實(shí)現(xiàn)在秒級(jí)響應(yīng)時(shí)間內(nèi)處理PB級(jí)別的數(shù)據(jù)。本文示例制造100億條數(shù)據(jù),利用Databricks Delta的Data Skipping和ZOEDER Clustering特性,對(duì)Spark作業(yè)進(jìn)行改造,達(dá)到優(yōu)化性能的目的。Databricks Delta詳情請(qǐng)參見(jiàn)Processing Petabytes of Data in Seconds with Databricks Delta。
配置Spark
使用阿里云賬號(hào)登錄Databricks數(shù)據(jù)洞察控制臺(tái)。
在Databricks數(shù)據(jù)洞察控制臺(tái)頁(yè)面,選擇所在的地域(Region)。
創(chuàng)建的集群將會(huì)在對(duì)應(yīng)的地域內(nèi),一旦創(chuàng)建后不能修改。
在左側(cè)導(dǎo)航欄中,單擊集群。
單擊待配置集群所在行的詳情。
在集群詳情頁(yè)面,單擊上方的Spark配置。
配置以下參數(shù)。
修改以下配置。
參數(shù)
描述
spark.driver.cores
4
spark.driver.memory
8G
spark.executor.memory
23G
新增以下配置。
在配置區(qū)域,單擊spark-defaults頁(yè)簽。
單擊右側(cè)的自定義配置。
參數(shù)
描述
spark.executor.cores
3
spark.executor.instances
22
spark.yarn.executor.memoryOverhead
default
示例
準(zhǔn)備數(shù)據(jù)。
準(zhǔn)備測(cè)試數(shù)據(jù)和query腳本。
在集群中生成數(shù)據(jù)預(yù)計(jì)需要5小時(shí),生成測(cè)試數(shù)據(jù)詳情請(qǐng)參見(jiàn)Processing Petabytes of Data in Seconds with Databricks Delta。
準(zhǔn)備五張表:
conn_random:delta格式表
conn_random_parquet:parquet格式表
conn_optimize:經(jīng)過(guò)OPTIMIZE的表,主要是Compaction
conn_zorder_only_ip:ZORDER BY (src_ip, dst_ip)
conn_zorder:ZORDER BY (src_ip, src_port, dst_ip, dst_port)
使用OPTIMIZE命令進(jìn)行優(yōu)化。
詳細(xì)代碼如下:
import spark.implicits._ val seed = 0 val numRecords = 10*1000*1000*1000L val numFiles = 1000*1000 val baseLocation = "oss://mytest/records-10m(1000)3-(1000)2/data/random/" val dbName = s"mdc_random_$numFiles" val connRandom = "conn_random" val connRandomParquet = "conn_random_parquet" // val connSorted = "conn_sorted" val connOptimize = "conn_optimize" val connZorderOnlyIp = "conn_zorder_only_ip" val connZorder = "conn_zorder" spark.conf.set("spark.sql.shuffle.partitions", numFiles) spark.conf.get("spark.sql.shuffle.partitions") sql(s"drop database if exists $dbName cascade") sql(s"create database if not exists $dbName") sql(s"use $dbName") sql(s"show tables").show(false) import scala.util.Random case class ConnRecord(src_ip: String, src_port: Int, dst_ip: String, dst_port: Int) // 生成數(shù)據(jù) def randomIPv4(r: Random) = Seq.fill(4)(r.nextInt(256)).mkString(".") def randomPort(r: Random) = r.nextInt(65536) def randomConnRecord(r: Random) = ConnRecord( src_ip = randomIPv4(r), src_port = randomPort(r), dst_ip = randomIPv4(r), dst_port = randomPort(r)) val df = spark.range(0, numFiles, 1, numFiles).mapPartitions { it => val partitionID = it.toStream.head val r = new Random(seed = partitionID) Iterator.fill((numRecords / numFiles).toInt)(randomConnRecord(r)) } // 生成數(shù)據(jù)表 df.write .mode("overwrite") .format("delta") .option("path", baseLocation + connRandom) .saveAsTable(connRandom) df.write .mode("overwrite") .format("parquet") .option("path", baseLocation + connRandomParquet) .saveAsTable(connRandomParquet) spark.read.table(connRandom) .write .mode("overwrite") .format("delta") .option("path", baseLocation + connOptimize) .saveAsTable(connOptimize) spark.read.table(connRandom) .write .mode("overwrite") .format("delta") .option("path", baseLocation + connZorderOnlyIp) .saveAsTable(connZorderOnlyIp) spark.read.table(connRandom) .write .mode("overwrite") .format("delta") .option("path", baseLocation + connZorder) .saveAsTable(connZorder) spark.conf.set("spark.databricks.io.skipping.mdc.addNoise", "false") // OPTIMIZE優(yōu)化命令 sql(s"OPTIMIZE '${baseLocation + connOptimize}'") sql(s"OPTIMIZE '${baseLocation + connZorderOnlyIp}' ZORDER BY (src_ip, dst_ip)") sql(s"OPTIMIZE '${baseLocation + connZorder}' ZORDER BY (src_ip, src_port, dst_ip, dst_port)")
驗(yàn)證Spark SQL。
select count(*) from conn_random where src_ip like '157%' and dst_ip like '216.%'; select count(*) from conn_random_parquet where src_ip like '157%' and dst_ip like '216.%'; select count(*) from conn_optimize where src_ip like '157%' and dst_ip like '216.%'; select count(*) from conn_zorder_only_ip where src_ip like '157%' and dst_ip like '216.%'; select count(*) from conn_zorder where src_ip like '157%' and dst_ip like '216.%';
測(cè)試結(jié)論
本示例各表情況如下。
表名稱(chēng) | 時(shí)間(s) |
---|---|
conn_random_parquet | 2504 |
conn_random | 2324 |
conn_optimize | 112 |
conn_zorder | 65 |
conn_zorder_only_ip | 46 |
通過(guò)以上示例,可以發(fā)現(xiàn):
經(jīng)過(guò)OPTIMIZE的表,文件大小會(huì)在1G左右,而且進(jìn)行了delta元數(shù)據(jù)的優(yōu)化,提高了data-skipping的效率,在性能上提升約20倍(2504/112=22X)。
Zorder使得data-skipping的優(yōu)化效果進(jìn)一步深化,性能提升約40倍(2504/65=38X)。
當(dāng)Zorder列是查詢(xún)列時(shí),優(yōu)化效果會(huì)更加明顯,實(shí)驗(yàn)顯示性能提升約50倍(2504/46=54X)。
問(wèn)題反饋
您在使用阿里云Databricks數(shù)據(jù)洞察過(guò)程中有任何疑問(wèn),歡迎用釘釘掃描下面的二維碼加入釘釘群進(jìn)行反饋。