RDS (MySQL)
本文介紹如何使用Databricks 讀寫阿里云RDS(MySQL)數(shù)據(jù)源數(shù)據(jù)。
前提條件
通過主賬號登錄阿里云 Databricks控制臺。
已創(chuàng)建MySQL實例,具體參見創(chuàng)建MySQL實例。
已創(chuàng)建DDI集群,具體請參見DDI集群創(chuàng)建。
創(chuàng)建集群并通過knox賬號訪問NoteBook。
使用Databricks 讀寫MySQL數(shù)據(jù)
DDI集群與MySQL實例網(wǎng)絡打通。
登錄RDS管理控制臺RDS管理控制臺
點擊右側導航欄實例列表選擇實例所在region
點擊實例ID進入實例詳情頁面
點擊實例詳情右側導航欄數(shù)據(jù)庫連接
如圖所示查看RDS實例所在的VPC和VSwitch
登錄到databricks數(shù)據(jù)洞察集群阿里云Databricks控制臺
選擇集群所在region進入集群列表
點擊集群實例進入集群詳情頁面
點擊詳情頁面上方數(shù)據(jù)源頁簽進入數(shù)據(jù)源頁面點擊添加
選擇通用網(wǎng)絡,選擇對應的VPC和VSwith點擊下一步點擊確認等待創(chuàng)建成功
將數(shù)據(jù)源ENI IP添加至RDS白名單
等待1-j步驟的數(shù)據(jù)源實例創(chuàng)建好以后找到ENI IP
進入RDS管控實例點擊數(shù)據(jù)庫連接,選擇白名單
按照白名單添加的規(guī)則將IP白名單列表添加進去。
添加白名單選配項,點擊確認
3.登錄到RDS數(shù)據(jù)庫并執(zhí)行創(chuàng)表語句
建表語句:
CREATE DATABASE case_demos;
USE case_demos;
CREATE TABLE `word_count_demo` (
`word` text,
`count` int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
4.登錄Databricks數(shù)據(jù)洞察集群進入Notebook,代碼實現(xiàn)MySQL數(shù)據(jù)讀寫。
示例文本下載:The_Sorrows_of_Young_Werther.txt
%spark
import java.sql.{Connection, DriverManager, PreparedStatement}
import java.util.Properties
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
//鏈接數(shù)據(jù)庫配置信息;
val dbName = "your dbName"
val tbName = "word_count_demo"
val dbUser = "your dbUser"
val dbPwd = "your dbPwd"
val dbUrl = "your bdUrl"
val dbPort = "3306"
val inputPath = "oss://ddi-test/The Sorrows of Young Werther"
val numPartitions = 3
//分區(qū)讀取OSS文件,并計算WordCount;
val input = sc.textFile(inputPath, numPartitions)
val counts = input.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _).map(e => Row.apply(e._1, e._2))
//創(chuàng)建自定義schema;
lazy val schema = StructType(
StructField("word", StringType) ::
StructField("count", IntegerType) :: Nil)
//讀取MySQL配置信息;
val properties = new Properties()
properties.setProperty("user", dbUser)
properties.setProperty("password", dbPwd)
//創(chuàng)建的DataFrame;
val df = sqlContext.createDataFrame(counts, schema)
//將wordCount計算好結果的DF寫入到MySQL;
df.write.mode("append").option("driver","com.mysql.cj.jdbc.Driver").jdbc(s"jdbc:mysql://$dbUrl:$dbPort/$dbName", tbName, properties)
當鏈接MySQL8.x需要加入配選項option("driver","com.mysql.cj.jdbc.Driver")
如果不加默認會用com.mysql.jdbc.Driver
只能鏈接MySQL5.x版本。
查詢數(shù)據(jù)庫是否插入成功
5.讀取數(shù)據(jù)庫數(shù)據(jù)
%spark
import java.sql.{Connection, DriverManager, PreparedStatement}
import java.util.Properties
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
//鏈接數(shù)據(jù)庫配置信息;
val dbName = "your dbName"
val tbName = "word_count_demo" //選擇過濾條件
val dbUser = "your dbUser"
val dbPwd = "your dbPwd"
val dbUrl = "your bdUrl"
val dbPort = "3306"
//讀取MySQL配置信息;
val properties = new Properties()
properties.setProperty("user", dbUser)
properties.setProperty("password", dbPwd)
//讀取數(shù)據(jù)
val rds_data=spark.read.option("driver","com.mysql.cj.jdbc.Driver").jdbc(s"jdbc:mysql://$dbUrl:$dbPort/$dbName", tbName, properties)
//數(shù)據(jù)展示
rds_data.show()