CSV文件
本文中含有需要您注意的重要提示信息,忽略該信息可能對您的業(yè)務(wù)造成影響,請務(wù)必仔細(xì)閱讀。
CSV意即逗號分隔符(comma-separated value),這是一種常見的文本文件格式,其中每行表示一條記錄,用逗號分隔記錄中的每個字段。
前提條件
通過主賬號登錄阿里云 Databricks控制臺。
已創(chuàng)建集群,具體請參見創(chuàng)建集群。
已使用OSS管理控制臺創(chuàng)建非系統(tǒng)目錄存儲空間,詳情請參見創(chuàng)建存儲空間。
首次使用DDI產(chǎn)品創(chuàng)建的Bucket為系統(tǒng)目錄Bucket,不建議存放數(shù)據(jù),您需要再創(chuàng)建一個Bucket來讀寫數(shù)據(jù)。
DDI訪問OSS路徑結(jié)構(gòu):oss://BucketName/Object
BucketName為您的存儲空間名稱。
Object為上傳到OSS上的文件的訪問路徑。
例:讀取在存儲空間名稱為databricks-demo-hangzhou文件路徑為demo/The_Sorrows_of_Young_Werther.txt的文件
// 從oss地址讀取文本文檔
val dataRDD = sc.textFile("oss://databricks-demo-hangzhou/demo/The_Sorrows_of_Young_Werther.txt"
csv讀取程序的可選項
實際應(yīng)用場景中遇到的數(shù)據(jù)內(nèi)容或結(jié)構(gòu)并不是那么規(guī)范,所以CSV讀取程序包含大量選項(option),通過這些選項可以幫助解決例如忽略特定字符等問題
read/write | Key | 取值范圍 | 默認(rèn)值 | 說明 |
Both | sep | 任意單個字符串字符 | , | 用作每個字段和值的分隔符的單個字符 |
Both | header | true,false | false | 一個布爾標(biāo)記符,用于聲明文件中的第一行是否為列的名稱 |
Both | escape | 任意字符竄 | \ | 用于轉(zhuǎn)譯的字符 |
Both | inferSchema | true,false | false | 指定在讀取文件時spark是否推斷列類型 |
Both | ignoreLeadingWhiteSpace | true,false | false | 聲明是否應(yīng)跳過讀取中的前導(dǎo)空格 |
Both | ignoreTrailingWhiteSpace | true,false | false | 聲明是否應(yīng)跳過讀取中的尾部 空格 |
Both | nullValue | 任意字符串字符 | "" | 聲明在文件中什么字符表示null值 |
Both | nanValue | 任意字符串字符 | NaN | 聲明什么字符表示CSV文件中的NaN或缺失字符 |
Both | positiveInf | 任意字符串字符 | Inf | 聲明什么字符表示正無窮大 |
Both | negativeInf | 任意字符串字符 | -Inf | 聲明什么字符表示負(fù)無窮大 |
Both | Compression 或Code | None,Uncompressed,bzip2,deflate,gzip,lz4,snappy | none | 聲明spark應(yīng)該用什么壓縮解碼器來讀取或?qū)懭胛募?/p> |
Both | dataFormat | 任何符合Java的SimpleDataFormat的字符串或字符 | yyyy-MM-dd | 日期類型的日期格式 |
Both | timestampFormat | 任何符合Java的SimpleDataFormat的字符串或字符 | MMdd 'T' HH:mm ss.SSSZZ | 時間戳類型,時間戳格式 |
Read | maxColumn | 任意整數(shù) | 20480 | 聲明文件中的最大列數(shù) |
Read | maxCharsPerColumn | 任意整數(shù) | 1000000 | 聲明列中最大字符數(shù) |
Read | escapeQuote | true,false | true | 聲明spark是否應(yīng)該轉(zhuǎn)義在行中找到的引號 |
Read | maxMalformadLogPerPartition | 任意整數(shù) | 10 | 設(shè)置spark將為每個分區(qū)記錄錯誤格式的行的最大數(shù)目,超出此數(shù)目的格式錯誤的記錄將被忽略 |
Write | QuoteAll | true,false | false | 指定是否將所有值括在引號中,而不是僅轉(zhuǎn)義具有引號字符竄的值 |
Read | multiline | true,false | false | 此選項用于讀取多行CSV文件,其中CSV文件中的每個邏輯行可能跨越文件本身的多行 |
實例
本實例展示了如何使用notebook讀取文件的多種方式。
與讀取其他格式一樣,要讀取CSV文件必須首先為該特定格式創(chuàng)建一個DataFrameReader這里我們將格式指定為CSV;
%spark
spark.read.format("csv")
1.hearder 選項
默認(rèn)header = false
%spark
val path="oss://databricks-data-source/datas/input.csv"
val dtDF = spark.read.format("csv")
.option("mode", "FAILFAST")
.load(path)
dtDF.show(5)
數(shù)據(jù)展示
header = true
%spark
val path="oss://databricks-data-source/datas/input.csv"
val dtDF = spark.read.format("csv")
.option("header","true")
.option("mode", "FAILFAST")
.load(path)
dtDF.show(5)
數(shù)據(jù)展示
2.inferSchema選項
默認(rèn)inferSchema = false
%spark
val path="oss://databricks-data-source/datas/input.csv"
val dtDF = spark.read.format("csv")
.option("header","true")
.option("mode", "FAILFAST")
.load(path)
dtDF.show(5)
dtDF.printSchema()
當(dāng)inferSchema = true
%spark
val path="oss://databricks-data-source/datas/input.csv"
val dtDF = spark.read.format("csv")
.option("header","true")
.option("mode", "FAILFAST")
.option("inferSchema","true")
.load(path)
dtDF.show(5)
dtDF.printSchema()
當(dāng)深度類型轉(zhuǎn)換不是我們希望的類型是,我們可以通過自定義Schema
%spark
import org.apache.spark.sql.types._
val path="oss://databricks-data-source/datas/input.csv"
val schema = new StructType()
.add("_c0",IntegerType,true)
.add("color",StringType,true)
.add("depth",DoubleType,true)
.add("table",DoubleType,true)
.add("price",IntegerType,true)
val data_with_schema = spark.read.format("csv")
.option("header", "true")
.schema(schema)
.load(path)
data_with_schema.show(5,false)
data_with_schema.printSchema()
??自定義schema里面包含一個特殊的列_corrupt_record
,該列在數(shù)據(jù)類型解析不正確時捕獲沒有正確解析的行,方便查看
%spark
val path="oss://databricks-data-source/datas/input.csv"
val schema = new StructType()
.add("_c0",IntegerType,true)
.add("color",StringType,true)
.add("depth",IntegerType,true) //將字段自定義成整數(shù)型
.add("table",DoubleType,true)
.add("price",IntegerType,true)
.add("_corrupt_record", StringType, true) //特殊列_corrupt_record,追蹤沒有解析成功的列
val data_with_schema = spark.read.format("csv")
.option("header", "true")
.schema(schema)
.load(path)
data_with_schema.show(5,false)
data_with_schema.printSchema()
3.mode 選項
mode主要有三個值,分別是PERMISSIVE
(遇到解析不了,使用系統(tǒng)自帶轉(zhuǎn)換,實在不行就轉(zhuǎn)換成null)、DROPMALFORMED
(遇到解析不了,就放棄該記錄)和FAILFAST
(遇到解析不了,就報錯,終止代碼執(zhí)行)
CSV數(shù)據(jù)集
1,a,10000,11-03-2019,pune
2,b,10020,14-03-2019,pune
3,a,34567,15-03-2019,pune
tyui,a,fgh-03-2019,pune
4,b,10020,14-03-2019,pune
%spark
import org.apache.spark.sql.types._
val path="oss://databricks-data-source/datas/dataTest.csv"
val schema = StructType(
List(
StructField("id", DataTypes.IntegerType, false,Metadata.empty),
StructField("name", DataTypes.StringType, false,Metadata.empty),
StructField("salary", DataTypes.DoubleType, false,Metadata.empty),
StructField("dob", DataTypes.StringType, false,Metadata.empty),
StructField("loc", DataTypes.StringType, false,Metadata.empty)
)
)
val dtDF = spark.read.format("csv")
.schema(schema)
.option("mode", "DROPMALFORMED")
.load(path)
dtDF.show()
下面是注解以后的結(jié)果
如果使用FAILFAST
%spark
import org.apache.spark.sql.types._
val path="oss://databricks-data-source/datas/dataTest.csv"
val schema = StructType(
List(
StructField("id", DataTypes.IntegerType, false,Metadata.empty),
StructField("name", DataTypes.StringType, false,Metadata.empty),
StructField("salary", DataTypes.DoubleType, false,Metadata.empty),
StructField("dob", DataTypes.StringType, false,Metadata.empty),
StructField("loc", DataTypes.StringType, false,Metadata.empty)
)
)
val dtDF = spark.read.format("csv")
.schema(schema)
.option("mode", "FAILFAST")
.load(path)
dtDF.show()
寫CSV文件
%spark
val path="oss://databricks-data-source/datas/input.csv"
val dtDF = spark.read.format("csv")
.option("header","true")
.option("mode", "FAILFAST")
.load(path)
val writeDF=dtDF.withColumnRenamed("_c0","id").filter($"depth">60)
writeDF.show(5)
//寫入CSV數(shù)據(jù)到oss
writeDF.coalesce(1).write.format("csv").mode("overwrite").save("oss://databricks-data-source/datas/out")