日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

使用Databricks Delta優(yōu)化Spark作業(yè)性能

更新時(shí)間:

本文介紹如何使用Databricks Delta進(jìn)行Spark作業(yè)的優(yōu)化。

前提條件

已創(chuàng)建集群,詳情請(qǐng)參見(jiàn)創(chuàng)建集群

集群應(yīng)滿(mǎn)足以下配置:

區(qū)域

詳情

地域(Region)

華北2(北京)

集群規(guī)模

1個(gè)Master節(jié)點(diǎn),5個(gè)Worker節(jié)點(diǎn)

ECS實(shí)例配置

配置如下:

  • CPU:32核

  • 內(nèi)存:128GiB

  • ECS規(guī)格:ecs.g6.8xlarge

  • 數(shù)據(jù)盤(pán)配置:ESSD云盤(pán)300GB X 4塊

  • 系統(tǒng)盤(pán)配置:ESSD云盤(pán)120GB X 1塊

說(shuō)明

ECS實(shí)例會(huì)因庫(kù)存等原因和實(shí)際售賣(mài)頁(yè)有出入。此處參數(shù)僅供參考,具體請(qǐng)您根據(jù)實(shí)際情況選擇相應(yīng)的實(shí)例規(guī)格進(jìn)行測(cè)試。

OSS寬帶

10Gbps

背景信息

Databricks數(shù)據(jù)洞察內(nèi)置了Databricks商業(yè)版引擎,您可以利用Databricks數(shù)據(jù)洞察創(chuàng)建集群,實(shí)現(xiàn)在秒級(jí)響應(yīng)時(shí)間內(nèi)處理PB級(jí)別的數(shù)據(jù)。本文示例制造100億條數(shù)據(jù),利用Databricks Delta的Data Skipping和ZOEDER Clustering特性,對(duì)Spark作業(yè)進(jìn)行改造,達(dá)到優(yōu)化性能的目的。Databricks Delta詳情請(qǐng)參見(jiàn)Processing Petabytes of Data in Seconds with Databricks Delta

配置Spark

  1. 使用阿里云賬號(hào)登錄Databricks數(shù)據(jù)洞察控制臺(tái)

  2. 在Databricks數(shù)據(jù)洞察控制臺(tái)頁(yè)面,選擇所在的地域(Region)。

    創(chuàng)建的集群將會(huì)在對(duì)應(yīng)的地域內(nèi),一旦創(chuàng)建后不能修改。

  3. 在左側(cè)導(dǎo)航欄中,單擊集群

  4. 單擊待配置集群所在行的詳情

  5. 集群詳情頁(yè)面,單擊上方的Spark配置

  6. 配置以下參數(shù)。

    1. 修改以下配置。

      參數(shù)

      描述

      spark.driver.cores

      4

      spark.driver.memory

      8G

      spark.executor.memory

      23G

    2. 新增以下配置。

      1. 在配置區(qū)域,單擊spark-defaults頁(yè)簽。

      2. 單擊右側(cè)的自定義配置。

      參數(shù)

      描述

      spark.executor.cores

      3

      spark.executor.instances

      22

      spark.yarn.executor.memoryOverhead

      default

示例

  1. 準(zhǔn)備數(shù)據(jù)。

    • 準(zhǔn)備測(cè)試數(shù)據(jù)和query腳本。

      在集群中生成數(shù)據(jù)預(yù)計(jì)需要5小時(shí),生成測(cè)試數(shù)據(jù)詳情請(qǐng)參見(jiàn)Processing Petabytes of Data in Seconds with Databricks Delta

    • 準(zhǔn)備五張表:

      • conn_random:delta格式表

      • conn_random_parquet:parquet格式表

      • conn_optimize:經(jīng)過(guò)OPTIMIZE的表,主要是Compaction

      • conn_zorder_only_ip:ZORDER BY (src_ip, dst_ip)

      • conn_zorder:ZORDER BY (src_ip, src_port, dst_ip, dst_port)

  2. 使用OPTIMIZE命令進(jìn)行優(yōu)化。

    詳細(xì)代碼如下:

    import spark.implicits._
    
    val seed = 0
    val numRecords = 10*1000*1000*1000L
    val numFiles = 1000*1000
    
    val baseLocation = "oss://mytest/records-10m(1000)3-(1000)2/data/random/"
    
    val dbName = s"mdc_random_$numFiles"
    val connRandom = "conn_random"
    val connRandomParquet = "conn_random_parquet"
    // val connSorted = "conn_sorted"
    val connOptimize = "conn_optimize"
    val connZorderOnlyIp = "conn_zorder_only_ip"
    val connZorder = "conn_zorder"
    
    spark.conf.set("spark.sql.shuffle.partitions", numFiles)
    spark.conf.get("spark.sql.shuffle.partitions")
    
    sql(s"drop database if exists $dbName cascade")
    sql(s"create database if not exists $dbName")
    sql(s"use $dbName")
    sql(s"show tables").show(false)
    
    import scala.util.Random
    
    case class ConnRecord(src_ip: String, src_port: Int, dst_ip: String, dst_port: Int)
    
    // 生成數(shù)據(jù)
    def randomIPv4(r: Random) = Seq.fill(4)(r.nextInt(256)).mkString(".")
    def randomPort(r: Random) = r.nextInt(65536)
    
    def randomConnRecord(r: Random) = ConnRecord(
      src_ip = randomIPv4(r), src_port = randomPort(r),
      dst_ip = randomIPv4(r), dst_port = randomPort(r))
    
    val df = spark.range(0, numFiles, 1, numFiles).mapPartitions { it =>
      val partitionID = it.toStream.head
      val r = new Random(seed = partitionID)
      Iterator.fill((numRecords / numFiles).toInt)(randomConnRecord(r))
    }
    
    // 生成數(shù)據(jù)表
    df.write
    .mode("overwrite")
    .format("delta")
    .option("path", baseLocation + connRandom)
    .saveAsTable(connRandom)
    
    df.write
    .mode("overwrite")
    .format("parquet")
    .option("path", baseLocation + connRandomParquet)
    .saveAsTable(connRandomParquet)
    
    spark.read.table(connRandom)
    .write
    .mode("overwrite")
    .format("delta")
    .option("path", baseLocation + connOptimize)
    .saveAsTable(connOptimize)
    
    spark.read.table(connRandom)
    .write
    .mode("overwrite")
    .format("delta")
    .option("path", baseLocation + connZorderOnlyIp)
    .saveAsTable(connZorderOnlyIp)
    
    spark.read.table(connRandom)
    .write
    .mode("overwrite")
    .format("delta")
    .option("path", baseLocation + connZorder)
    .saveAsTable(connZorder)
    
    spark.conf.set("spark.databricks.io.skipping.mdc.addNoise", "false")
    
    // OPTIMIZE優(yōu)化命令
    sql(s"OPTIMIZE '${baseLocation + connOptimize}'")
    sql(s"OPTIMIZE '${baseLocation + connZorderOnlyIp}' ZORDER BY (src_ip, dst_ip)")
    sql(s"OPTIMIZE '${baseLocation + connZorder}' ZORDER BY (src_ip, src_port, dst_ip, dst_port)")

  3. 驗(yàn)證Spark SQL。

    select count(*) from conn_random where src_ip like '157%' and dst_ip like '216.%';
    
    select count(*) from conn_random_parquet where src_ip like '157%' and dst_ip like '216.%';
    
    select count(*) from conn_optimize where src_ip like '157%' and dst_ip like '216.%';
    
    select count(*) from conn_zorder_only_ip where src_ip like '157%' and dst_ip like '216.%';
    
    select count(*) from conn_zorder where src_ip like '157%' and dst_ip like '216.%';

測(cè)試結(jié)論

本示例各表情況如下。

表名稱(chēng)

時(shí)間(s)

conn_random_parquet

2504

conn_random

2324

conn_optimize

112

conn_zorder

65

conn_zorder_only_ip

46

說(shuō)明

通過(guò)以上示例,可以發(fā)現(xiàn):

  • 經(jīng)過(guò)OPTIMIZE的表,文件大小會(huì)在1G左右,而且進(jìn)行了delta元數(shù)據(jù)的優(yōu)化,提高了data-skipping的效率,在性能上提升約20倍(2504/112=22X)。

  • Zorder使得data-skipping的優(yōu)化效果進(jìn)一步深化,性能提升約40倍(2504/65=38X)。

  • 當(dāng)Zorder列是查詢(xún)列時(shí),優(yōu)化效果會(huì)更加明顯,實(shí)驗(yàn)顯示性能提升約50倍(2504/46=54X)。

問(wèn)題反饋

您在使用阿里云Databricks數(shù)據(jù)洞察過(guò)程中有任何疑問(wèn),歡迎用釘釘掃描下面的二維碼加入釘釘群進(jìn)行反饋。

產(chǎn)品釘釘群