日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

Delta Lake 快速開始一

重要

本文中含有需要您注意的重要提示信息,忽略該信息可能對您的業務造成影響,請務必仔細閱讀。

該Notebook展示了如何將JSON數據轉換為Delta Lake格式,創建Delta表,在Delta表中Append數據,最后使用Delta Lake元數據命令來顯示表的歷史記錄、格式和詳細信息。

前提條件

  1. 通過主賬號登錄阿里云 Databricks控制臺

  2. 已創建集群,具體請參見創建集群

  3. 已使用OSS管理控制臺創建非系統目錄存儲空間,詳情請參見創建存儲空間

    警告

    首次使用DDI產品創建的Bucket為系統目錄Bucket,不建議存放數據,您需要再創建一個Bucket來讀寫數據。

    說明

    DDI支持免密訪問OSS路徑,結構為:oss://BucketName/Object

    • BucketName為您的存儲空間名稱;

    • Object為上傳到OSS上的文件的訪問路徑。

    例:讀取在存儲空間名稱為databricks-demo-hangzhou文件路徑為demo/The_Sorrows_of_Young_Werther.txt的文件

    // 從oss地址讀取文本文檔
    val text = sc.textFile("oss://databricks-demo-hangzhou/demo/The_Sorrows_of_Young_Werther.txt")

詳情可參考Databricks官網Blog文章

步驟一:創建集群并通過knox賬號訪問Notebook

創建集群參考:創建集群,需注意要設置RAM子賬號及保存好knox密碼,登錄WebUI時候需要用到。

步驟二:創建Notebook、導入數據、進行數據分析

  1. 定義Notebook中使用的路徑path

    %pyspark
    
    # 注意需要將數據文件events.json上傳至您的OSS對應bucket下,events.json數據來源Databricks站點的open/close數據
    inputPath = "oss://databricks-huhehaote/delta-demo/events.json"
    deltaPath = "/delta/events"
    database = "Delta_QuickStart_Database"

  2. 導入數據到Dataframe中 & 打印數據data

    %pyspark
    
    from pyspark.sql.functions import expr
    from pyspark.sql.functions import from_unixtime
    
    # spark.read讀取json數據,并將表頭time轉換為date格式
    events = spark.read \
      .option("inferSchema", "true") \
      .json(inputPath) \
      .withColumn("date", expr("time")) \
      .drop("time") \
      .withColumn("date", from_unixtime("date", 'yyyy-MM-dd'))
    events.show()
    運行結果
  3. 將數據使用Delta格式寫入

    %pyspark
    events.write.format("delta").mode("overwrite").partitionBy("date").save(deltaPath)

  4. 再次讀取數據查看是否成功保存

    %pyspark
    events_delta = spark.read.format("delta").load(deltaPath)
    events_delta.printSchema()
  5. 重置數據庫

    %pyspark
    
    spark.sql("DROP DATABASE IF EXISTS {} CASCADE".format(database)) # 注意{}是在pyspark里spark.sql()中使用的變量,參數在.format中指定 (參考:https://stackoverflow.com/questions/44582450/how-to-pass-variables-in-spark-sql-using-python)
    spark.sql("CREATE DATABASE IF NOT EXISTS {}".format(database))

  6. 使用Delta創建表

    %pyspark
    
    spark.sql("USE {}".format(database))
    spark.sql("CREATE TABLE events USING DELTA LOCATION \"{}\"".format(deltaPath))

  7. 查看表中的數據

    %sql
    select * from events limit 10;
    查看表數據
  8. 對數據執行一個簡單的count

    %pyspark
    
    events_delta.count()
  9. 查看events詳情

    %sql
    DESCRIBE DETAIL events;

  10. 查看表歷史,這個功能只有Delta表中可用

    %sql
    
    DESCRIBE HISTORY events
    查看表歷史

  11. 做一個聚合(aggregation)操作并將結果保存在一個臨時表中

    %pyspark
    
    from pyspark.sql.functions import count
    events_delta.groupBy("action","date").agg(count("action").alias("action_count")).orderBy("date", "action").createOrReplaceTempView("tempDisplay")

  12. 可視化該臨時表

    %sql
    
    select * from tempDisplay
    可視化表

  13. 我們可以很方便的使用append模式追加新數據

    %pyspark
    
    # 該操作主要將數據中時間前移2天(172800秒)
    historical_events = spark.read \
      .option("inferSchema", "true") \
      .json(inputPath) \
      .withColumn("date", expr("time-172800")) \
      .drop("time") \
      .withColumn("date", from_unixtime("date", 'yyyy-MM-dd'))
      
    historical_events.write.format("delta").mode("append").partitionBy("date").save(deltaPath)
    
    historical_events.show()
    追加數據
  14. 聚合(aggregate)并查看新數據

    %pyspark
    
    events_delta.groupBy("action","date").agg(count("action").alias("action_count")).orderBy("date", "action").createOrReplaceTempView("tempDisplay2")

  15. 可視化新表

    %sql
    
    select * from tempDisplay2
    可視化新表
  16. 執行count操作查看新append數據行

    %pyspark 
    
    events_delta.count()

  17. Describe表來展示表詳情,并能夠看到表中有6個文件

    %sql
    DESCRIBE DETAIL events
    詳情展示
  18. Databricks支持優化(OPTIMIZE)合并文件以提升性能

    %pyspark
    
    spark.sql("OPTIMIZE events")

  19. 可以看到優化(OPTIMIZE)命令也在事務日志中增加了日志(z-order)

    %sql
    
    DESCRIBE HISTORY events
    優化

  20. 優化后,文件被自動合并做性能優化,表中只有5個文件

    %sql
    
    DESCRIBE DETAIL events
    優化后
  21. Describe formatted 命令也在DDI中支持

    %sql
    
    DESCRIBE FORMATTED events
    Decribe formatted

附錄

清理數據庫

%sql
/*
DROP DATABASE IF EXISTS Delta_QuickStart_Database1 CASCADE;
*/