日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

CSV文件

重要

本文中含有需要您注意的重要提示信息,忽略該信息可能對您的業(yè)務(wù)造成影響,請務(wù)必仔細(xì)閱讀。

CSV意即逗號分隔符(comma-separated value),這是一種常見的文本文件格式,其中每行表示一條記錄,用逗號分隔記錄中的每個字段。

前提條件

警告

首次使用DDI產(chǎn)品創(chuàng)建的Bucket為系統(tǒng)目錄Bucket,不建議存放數(shù)據(jù),您需要再創(chuàng)建一個Bucket來讀寫數(shù)據(jù)。

說明

DDI訪問OSS路徑結(jié)構(gòu):oss://BucketName/Object

  • BucketName為您的存儲空間名稱。

  • Object為上傳到OSS上的文件的訪問路徑。

例:讀取在存儲空間名稱為databricks-demo-hangzhou文件路徑為demo/The_Sorrows_of_Young_Werther.txt的文件

// 從oss地址讀取文本文檔
val dataRDD = sc.textFile("oss://databricks-demo-hangzhou/demo/The_Sorrows_of_Young_Werther.txt"

csv讀取程序的可選項

說明

實際應(yīng)用場景中遇到的數(shù)據(jù)內(nèi)容或結(jié)構(gòu)并不是那么規(guī)范,所以CSV讀取程序包含大量選項(option),通過這些選項可以幫助解決例如忽略特定字符等問題

read/write

Key

取值范圍

默認(rèn)值

說明

Both

sep

任意單個字符串字符

用作每個字段和值的分隔符的單個字符

Both

header

true,false

false

一個布爾標(biāo)記符,用于聲明文件中的第一行是否為列的名稱

Both

escape

任意字符竄

\

用于轉(zhuǎn)譯的字符

Both

inferSchema

true,false

false

指定在讀取文件時spark是否推斷列類型

Both

ignoreLeadingWhiteSpace

true,false

false

聲明是否應(yīng)跳過讀取中的前導(dǎo)空格

Both

ignoreTrailingWhiteSpace

true,false

false

聲明是否應(yīng)跳過讀取中的尾部 空格

Both

nullValue

任意字符串字符

""

聲明在文件中什么字符表示null值

Both

nanValue

任意字符串字符

NaN

聲明什么字符表示CSV文件中的NaN或缺失字符

Both

positiveInf

任意字符串字符

Inf

聲明什么字符表示正無窮大

Both

negativeInf

任意字符串字符

-Inf

聲明什么字符表示負(fù)無窮大

Both

Compression 或Code

None,Uncompressed,bzip2,deflate,gzip,lz4,snappy

none

聲明spark應(yīng)該用什么壓縮解碼器來讀取或?qū)懭胛募?/p>

Both

dataFormat

任何符合Java的SimpleDataFormat的字符串或字符

yyyy-MM-dd

日期類型的日期格式

Both

timestampFormat

任何符合Java的SimpleDataFormat的字符串或字符

MMdd 'T' HH:mm ss.SSSZZ

時間戳類型,時間戳格式

Read

maxColumn

任意整數(shù)

20480

聲明文件中的最大列數(shù)

Read

maxCharsPerColumn

任意整數(shù)

1000000

聲明列中最大字符數(shù)

Read

escapeQuote

true,false

true

聲明spark是否應(yīng)該轉(zhuǎn)義在行中找到的引號

Read

maxMalformadLogPerPartition

任意整數(shù)

10

設(shè)置spark將為每個分區(qū)記錄錯誤格式的行的最大數(shù)目,超出此數(shù)目的格式錯誤的記錄將被忽略

Write

QuoteAll

true,false

false

指定是否將所有值括在引號中,而不是僅轉(zhuǎn)義具有引號字符竄的值

Read

multiline

true,false

false

此選項用于讀取多行CSV文件,其中CSV文件中的每個邏輯行可能跨越文件本身的多行

實例

說明

本實例展示了如何使用notebook讀取文件的多種方式。

重要

與讀取其他格式一樣,要讀取CSV文件必須首先為該特定格式創(chuàng)建一個DataFrameReader這里我們將格式指定為CSV;

%spark
spark.read.format("csv")

1.hearder 選項

默認(rèn)header = false

%spark
val path="oss://databricks-data-source/datas/input.csv" 
val dtDF = spark.read.format("csv")
        .option("mode", "FAILFAST")
        .load(path)
dtDF.show(5)

數(shù)據(jù)展示

dataheader = true

%spark
val path="oss://databricks-data-source/datas/input.csv" 
val dtDF = spark.read.format("csv")
        .option("header","true")
        .option("mode", "FAILFAST")
        .load(path)
dtDF.show(5)

數(shù)據(jù)展示

data1

2.inferSchema選項

默認(rèn)inferSchema = false

%spark
val path="oss://databricks-data-source/datas/input.csv" 
val dtDF = spark.read.format("csv")
        .option("header","true")
        .option("mode", "FAILFAST")
        .load(path)
     
dtDF.show(5)    
dtDF.printSchema()
data2

當(dāng)inferSchema = true

%spark
val path="oss://databricks-data-source/datas/input.csv" 
val dtDF = spark.read.format("csv")
        .option("header","true")
        .option("mode", "FAILFAST")
        .option("inferSchema","true")
        .load(path) 
dtDF.show(5)    
dtDF.printSchema()
data3

當(dāng)深度類型轉(zhuǎn)換不是我們希望的類型是,我們可以通過自定義Schema

%spark
import org.apache.spark.sql.types._

val path="oss://databricks-data-source/datas/input.csv" 
val schema = new StructType()
  .add("_c0",IntegerType,true)
  .add("color",StringType,true)
  .add("depth",DoubleType,true)
  .add("table",DoubleType,true)
  .add("price",IntegerType,true)
val data_with_schema = spark.read.format("csv")
  .option("header", "true")
  .schema(schema)
  .load(path)
  data_with_schema.show(5,false)
  data_with_schema.printSchema()
data4

??自定義schema里面包含一個特殊的列_corrupt_record,該列在數(shù)據(jù)類型解析不正確時捕獲沒有正確解析的行,方便查看

%spark
val path="oss://databricks-data-source/datas/input.csv" 

val schema = new StructType()
  .add("_c0",IntegerType,true)
  .add("color",StringType,true)
  .add("depth",IntegerType,true) //將字段自定義成整數(shù)型
  .add("table",DoubleType,true)
  .add("price",IntegerType,true)
  .add("_corrupt_record", StringType, true) //特殊列_corrupt_record,追蹤沒有解析成功的列
  
val data_with_schema = spark.read.format("csv")
  .option("header", "true")
  .schema(schema)
  .load(path)
  data_with_schema.show(5,false)
  data_with_schema.printSchema()
data5

3.mode 選項

說明

mode主要有三個值,分別是PERMISSIVE(遇到解析不了,使用系統(tǒng)自帶轉(zhuǎn)換,實在不行就轉(zhuǎn)換成null)、DROPMALFORMED(遇到解析不了,就放棄該記錄)和FAILFAST(遇到解析不了,就報錯,終止代碼執(zhí)行)

CSV數(shù)據(jù)集

1,a,10000,11-03-2019,pune
2,b,10020,14-03-2019,pune
3,a,34567,15-03-2019,pune
tyui,a,fgh-03-2019,pune
4,b,10020,14-03-2019,pune
%spark
import org.apache.spark.sql.types._

val path="oss://databricks-data-source/datas/dataTest.csv" 
val schema = StructType(
        List(
          StructField("id", DataTypes.IntegerType, false,Metadata.empty),
          StructField("name", DataTypes.StringType, false,Metadata.empty),
          StructField("salary", DataTypes.DoubleType, false,Metadata.empty),
          StructField("dob", DataTypes.StringType, false,Metadata.empty),
          StructField("loc", DataTypes.StringType, false,Metadata.empty)
        )
      )
val dtDF = spark.read.format("csv")
        .schema(schema)
        .option("mode", "DROPMALFORMED")
        .load(path)
        
dtDF.show()
data5

下面是注解以后的結(jié)果

data6

如果使用FAILFAST

%spark
import org.apache.spark.sql.types._

val path="oss://databricks-data-source/datas/dataTest.csv" 
val schema = StructType(
        List(
          StructField("id", DataTypes.IntegerType, false,Metadata.empty),
          StructField("name", DataTypes.StringType, false,Metadata.empty),
          StructField("salary", DataTypes.DoubleType, false,Metadata.empty),
          StructField("dob", DataTypes.StringType, false,Metadata.empty),
          StructField("loc", DataTypes.StringType, false,Metadata.empty)
        )
      )
val dtDF = spark.read.format("csv")
        .schema(schema)
        .option("mode", "FAILFAST") 
        .load(path)        
dtDF.show()
data6

寫CSV文件

%spark
val path="oss://databricks-data-source/datas/input.csv" 
val dtDF = spark.read.format("csv")
        .option("header","true")
        .option("mode", "FAILFAST")
        .load(path)
val writeDF=dtDF.withColumnRenamed("_c0","id").filter($"depth">60)
writeDF.show(5)
//寫入CSV數(shù)據(jù)到oss
writeDF.coalesce(1).write.format("csv").mode("overwrite").save("oss://databricks-data-source/datas/out")
data