Tablestore
本文介紹如何使用Databricks 讀寫Tablestore數(shù)據(jù)。
前提條件
已創(chuàng)建Tablestore實(shí)例
通過(guò)主賬號(hào)登錄阿里云 Databricks控制臺(tái)。
已創(chuàng)建DDI集群,具體請(qǐng)參見DDI集群創(chuàng)建。
創(chuàng)建集群并通過(guò)knox賬號(hào)訪問NoteBook。
使用Databricks 讀寫Tablestore數(shù)據(jù)
1.通過(guò)創(chuàng)建表的方式讀取Tablestore數(shù)據(jù);
%sql
--創(chuàng)建數(shù)據(jù)庫(kù)
CREATE DATABASE IF NOT EXISTS table_store;
USE table_store;
--創(chuàng)建表
DROP TABLE IF EXISTS delta_order_source;
CREATE TABLE delta_order_source
USING tablestore
-- 配置項(xiàng)信息鏈接tablestore,定義schema
OPTIONS(
endpoint="your endpoint",
access.key.id="your akId",
access.key.secret="your ads",
instance.name="your instanceName",
table.name="your tableName",
catalog='{"columns": {"user_id": {"type": "string"}, "order_id": {"type": "string"},"price": {"type": "double"}, "name": {"type": "string"}}}'
);
-- 數(shù)據(jù)查詢
SELECT * FROM delta_order_source;
2.使用spark API讀取Tablestore數(shù)據(jù);
%spark
//讀取配置
val df = spark.read.format("tablestore")
.option("endpoint", "your endpoint")
.option("access.key.id", "your akId")
.option("access.key.secret", "your ads")
.option("instance.name", "your instanceName")
.option("table.name", "your tableName")
.option("catalog", """{"columns": {"user_id": {"type": "string"}, "order_id": {"type": "string"},"price": {"type": "double"}, "name": {"type": "string"}}}""")
.load()
df.show()
catalog為Tablestore中表的schema,當(dāng)catalog聲明的列名在Tablestore表中不存在時(shí),Tablestore表會(huì)自動(dòng)增加一列,默認(rèn)為NULL
如果報(bào)Tablestore的DataSource找不到的錯(cuò)誤,說(shuō)明依賴的數(shù)據(jù)源jar包還沒有安裝生效
注意明確元數(shù)據(jù)庫(kù)的Location,推薦使用oss,`desc database default;`命令查看
3.將數(shù)據(jù)寫入到Tablestore
%spark
//定義將要插入Tablestore的DataFrame;
val add_df = spark.createDataFrame(
Seq(
("1086", "20191118-10", 2250.0, "jack"),
("1010", "20191118-11", 2200.0, "rose")
)
).toDF("user_id", "order_id", "price", "name")
// 將定義好的DF寫入到創(chuàng)建的數(shù)據(jù)表中,同時(shí)也會(huì)寫入到Tablestore中;
add_df.write.format("Tablestore").mode("append").saveAsTable("delta_order_source")
//數(shù)據(jù)查詢
spark.table("delta_order_source").where("user_id == '1086'").show()
結(jié)果已經(jīng)寫入Tablestore
Tablestore結(jié)合spark 參考文檔:Tablestore結(jié)合spark的流批一體SQL實(shí)戰(zhàn)
Jar包Java方式參考文檔:https://github.com/aliyun/aliyun-emapreduce-datasources/blob/main/docs/Spark-on-TableStore.md