日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

Delta Lake 快速開始二

重要

本文中含有需要您注意的重要提示信息,忽略該信息可能對您的業(yè)務造成影響,請務必仔細閱讀。

本文介紹如何創(chuàng)建數(shù)據(jù)庫、表批讀寫、表流讀寫、表刪除、更新、合并以及版本控制等功能。

前提條件

警告

首次使用DDI產(chǎn)品創(chuàng)建的Bucket為系統(tǒng)目錄Bucket,不建議存放數(shù)據(jù),您需要再創(chuàng)建一個Bucket來讀寫數(shù)據(jù)。

說明

DDI訪問OSS路徑結(jié)構(gòu):oss://BucketName/Object

  • BucketName為您的存儲空間名稱。

  • Object為上傳到OSS上的文件的訪問路徑。

例:讀取在存儲空間名稱為databricks-demo-hangzhou文件路徑為demo/The_Sorrows_of_Young_Werther.txt的文件

// 從oss地址讀取文本文檔
val dataRDD = sc.textFile("oss://databricks-demo-hangzhou/demo/The_Sorrows_of_Young_Werther.txt")

創(chuàng)建Notebook、導入數(shù)據(jù)、進行數(shù)據(jù)分析

  1. 創(chuàng)建數(shù)據(jù)庫

    1. 不指定路徑創(chuàng)建數(shù)據(jù)庫,創(chuàng)建的數(shù)據(jù)庫會存儲在當前集群hive路徑中

      %pyspark
      # 創(chuàng)建數(shù)據(jù)庫目錄,你也可以選擇自定義目錄
      database="db_test"
      spark.sql("DROP DATABASE IF EXISTS {} CASCADE".format(database))
      spark.sql("CREATE DATABASE {}".format(database))
      spark.sql("USE {}".format(database))
    2. 指定路徑創(chuàng)建數(shù)據(jù)庫,可以使用OSS路徑創(chuàng)建

      %pyspark
      database="db_dome" 
      #指定路徑創(chuàng)建庫和表;本掩飾路徑為dome路徑,您可以使用真實的路徑
      location='oss://dome-test/case6/';
      spark.sql("DROP DATABASE IF EXISTS {} CASCADE".format(database))
      spark.sql("CREATE DATABASE {} location 'oss://dome-test/case6/' ".format(database))
      spark.sql("USE {}".format(database))
    說明

    您可以在Databricks數(shù)據(jù)洞察控制臺中的元數(shù)據(jù)管理中查看創(chuàng)建的數(shù)據(jù)庫和數(shù)據(jù)表

  2. 創(chuàng)建delta表/delta分區(qū)表/parquet格式表/元存儲表

    %pyspark
    from pyspark.sql.functions import expr
    from pyspark.sql.functions import from_unixtime, to_date
    # 定義路徑
    inputPath = "/dome-test/events_data.json"
    deltaPath = "/dome-test/delta/events"
    partitionDeltaPath = "/dome-test/delta/events_partition"
    parquetPath = "/dome-test/parquet/events"
    jsonPath = "/dome-test/json/events"
    events = spark.read \
      .option("inferSchema", "true") \
      .json(inputPath) \
      .withColumn("date", to_date('date', 'yyyy-MM-dd'))
    events.show()
    events.printSchema()
    # 創(chuàng)建delta表/delta分區(qū)表/parquet格式表/元存儲表
    events.write.format("delta").mode("overwrite").save(deltaPath)
    events.write.format("delta").mode("overwrite").partitionBy("date").save(partitionDeltaPath)
    events.write.format("parquet").mode("overwrite").save(parquetPath)
    events.write.format("delta").saveAsTable("events")

    代碼展示

  3. 將parquet文件轉(zhuǎn)換為delta

    %sql
    --可以將parquet文件轉(zhuǎn)換為delta
    SELECT * FROM  parquet.`/dome-test/parquet/events`;
    CONVERT TO DELTA parquet.`/dome-test/parquet/events`;
    SELECT * FROM  delta.`/dome-test/parquet/events`;

    將parquet文件轉(zhuǎn)換為delta

  4. 將delta文件還原為parquet文件

    1. 清空delta數(shù)據(jù)

      %sql
      -- 可以將delta表還原為parquet表,需要先vacuum操作清空delta數(shù)據(jù)文件,然后刪除_delta_log目錄
      SELECT * FROM  delta.`/dome-test/parquet/events`;
      set spark.databricks.delta.retentionDurationCheck.enabled = false;
      vacuum delta.`/dome-test/parquet/events` retain 0 hours
    2. 前往OSS路徑中找到_delta_log目錄并刪除

    3. 查詢轉(zhuǎn)換之后的delta表數(shù)據(jù)

      %sql
      -- 此操作前要刪除表對應的_delta_log目錄
      SELECT * FROM  delta.`/dome-test/parquet/events`;
  5. 查詢表

    %sql
    select * from events;

    查詢表

  6. 通過元存儲創(chuàng)建表并顯示表中內(nèi)容

    %pyspark
    # 通過路徑在元存儲中創(chuàng)建表
    spark.sql("CREATE TABLE IF NOT EXISTS events_partition USING DELTA LOCATION '/dome-test/delta/events_partition'")
    # 從元存儲和路徑中讀取表
    df1 = spark.table("events")
    df2 = spark.read.format("delta").load("/dome-test/delta/events_partition")
    df1.show()
    df2.show()
  7. 查詢表的舊快照

    %sql
    DESCRIBE HISTORY events;
    -- 您需要將以下日期時間修改為具體日期
    SELECT * FROM events TIMESTAMP AS OF '2020-12-09 19:15:30';
    SELECT * FROM events VERSION AS OF 0
    

    查詢表的日期版本

  8. 更新表中數(shù)據(jù),然后查詢表更改歷史時間戳和版本信息

    %sql
    -- 更新表中數(shù)據(jù),然后查詢表更改歷史時間戳和版本信息
    UPDATE events SET data = 'update-case1' where eventId = 1;
    UPDATE delta.`/dome-test/delta/events` SET data = 'update-case1' where eventId = 1;
    DESCRIBE HISTORY delta.`/dome-test/delta/events`;

    更新表中數(shù)據(jù)并查找

  9. 通過不同時間戳表達式讀取歷史版本

    %pyspark
    # 可以讀取歷史版本數(shù)據(jù),time travel,以下時間為對應表的歷史時間
    d1 = spark.read.format("delta").option("timestampAsOf", "2020-12-09 20:23:40").load("/dome-test/delta/events")
    d11 = spark.read.format("delta").load("/dome-test/delta/events@20201209202340000")
    d2 = spark.read.format("delta").option("versionAsOf", 0).load("/dome-test/delta/events")
    d22 = spark.read.format("delta").load("/dome-test/delta/events@v0")
    # 讀取最新版本數(shù)據(jù)或者其它版本數(shù)據(jù)
    latest_version = spark.sql("SELECT * FROM (DESCRIBE HISTORY delta.`/dome-test/delta/events`)").collect()
    df = spark.read.format("delta").option("versionAsOf", latest_version[0][0]).load("/dome-test/delta/events")
    d1.show()
    d11.show()
    d2.show()
    d22.show()
    df.show()

    不同時間戳表達式查詢版本

  10. 向表中追加數(shù)據(jù)

    %pyspark
    # 往表中追加數(shù)據(jù)
    df = spark.createDataFrame([("case21", '2020-10-12', 21, 'INFO'),("case22", '2020-10-13', 22, 'INFO')], ['data', 'date', 'eventId', 'eventType'])
    df1 = df.select('data', to_date('date', 'yyyy-MM-dd').alias('date'), 'eventId', 'eventType')
    df1.write.format("delta").mode("append").saveAsTable("events")
    spark.table("events").show(30)
    

    向表中追加數(shù)據(jù)

  11. 替換部分數(shù)據(jù)

    %pyspark
    df = spark.createDataFrame([("case21", '2020-10-12', 21, 'INFO'),("case22", '2020-10-13', 22, 'INFO')], ['data', 'date', 'eventId', 'eventType'])
    df1 = df.select('data', to_date('date', 'yyyy-MM-dd').alias('date'), 'eventId', 'eventType')
    # 可以替換分區(qū)部分數(shù)據(jù)(replaceWhere中字段必須是分區(qū)字段);此處替換大于等于10-4的數(shù)據(jù),并將10-12、10-13號的數(shù)據(jù)填入
    df1.write.format("delta").mode("overwrite").option("replaceWhere", "date >= '2020-10-4'").saveAsTable("events_partition")
    spark.table("events_partition").show(50)

    替換數(shù)據(jù)

  12. 添加列和修改列

    %sql
    ALTER TABLE events ADD COLUMNS (name string COMMENT '名稱' FIRST);
    ALTER TABLE events CHANGE COLUMN name COMMENT '姓名';

  13. 重寫表

    修改表中字段的類型或名稱需要重寫表,需要加上overwriteSchema選項

    %pyspark
    # 修改表中字段的類型或者名稱需要重寫表,需要加上overwriteSchema選項
    spark.read.table("events").withColumnRenamed("name", "fullname") \
      .write.format("delta").mode("overwrite") \
      .option("overwriteSchema", "true") \
      .saveAsTable("events")
    %pyspark
    from pyspark.sql.functions import col
    spark.read.table("events").withColumn("fullname2", col("fullname").cast("int")) \
      .write.format("delta").mode("overwrite") \
      .option("overwriteSchema", "true") \
      .saveAsTable("events")

  14. 展示歷史版本表

    %sql
    describe detail events;
    describe history events;

    展示歷史版本表

  15. 查詢版本為2的表

    %sql
    select * from events@v2;

    查詢版本為2的表

  16. 查詢?nèi)掌诖笥诘扔?0月10號的數(shù)據(jù)

    %sql
    delete from events where date >= '2020-10-10';

  17. 從結(jié)構(gòu)化的輸入流中讀取數(shù)據(jù),經(jīng)過處理后結(jié)構(gòu)化流輸出到delta文件

    %pyspark
    
    # 從結(jié)構(gòu)化的輸入流中讀取數(shù)據(jù),經(jīng)過處理后結(jié)構(gòu)化流輸出到delta文件
    spark.readStream.format("delta").table("events").groupBy("date").count() \
    .writeStream.format("delta").outputMode("complete").option("checkpointLocation", "/dome-test/delta/eventsByDate/_checkpoints/streaming-agg").start("/dome-test/delta/eventsByDate")

  18. 查詢上述文件輸出結(jié)果

    %pyspark
    # 上述流文件輸出完成之后,可以查詢生成結(jié)果
    events = spark.read.option("inferSchema", "true").format("delta").load("/dome-test/delta/eventsByDate")
    events.show()

    查詢結(jié)果

  19. 更新表

    %pyspark
    from delta.tables import *
    from pyspark.sql.functions import *
    
    deltaTable = DeltaTable.forPath(spark, "/dome-test/delta/events/")
    
    deltaTable.delete("date > '2020-10-03'") 
    deltaTable.toDF().show()
    deltaTable.update("eventType = 'Error'", { "eventType": "'error'" })
    deltaTable.toDF().show()

    更新表

  20. 合并表

    %pyspark
    from delta.tables import *
    
    
    deltaTable = DeltaTable.forPath(spark, "/dome-test/delta/events/")
    df = spark.createDataFrame([("update-case2", '2020-10-12', 2, 'INFO'),("case25", '2020-10-13', 25, 'INFO')], ['data', 'date', 'eventId', 'eventType'])
    updatesDF = df.select('data', to_date('date', 'yyyy-MM-dd').alias('date'), 'eventId', 'eventType')
    
    # 合并表,eventId相等則更新數(shù)據(jù),不相等就插入數(shù)據(jù)
    deltaTable.alias("events").merge(
        updatesDF.alias("updates"),
        "events.eventId = updates.eventId") \
      .whenMatchedUpdate(set = { "data" : "updates.data" } ) \
      .whenNotMatchedInsert(values =
        {
          "date": "updates.date",
          "eventId": "updates.eventId",
          "data": "updates.data",
          "eventType": "updates.eventType"
        }
      ).execute()
    deltaTable.toDF().show()

    合并表