日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

RDS (MySQL)

本文介紹如何使用Databricks 讀寫阿里云RDS(MySQL)數(shù)據(jù)源數(shù)據(jù)。

前提條件

使用Databricks 讀寫MySQL數(shù)據(jù)

  1. DDI集群與MySQL實例網(wǎng)絡打通。

    1. 登錄RDS管理控制臺RDS管理控制臺

    2. 點擊右側導航欄實例列表選擇實例所在region

    3. 點擊實例ID進入實例詳情頁面

    4. 點擊實例詳情右側導航欄數(shù)據(jù)庫連接

    5. 如圖所示查看RDS實例所在的VPCVSwitchdata

    6. 登錄到databricks數(shù)據(jù)洞察集群阿里云Databricks控制臺

    7. 選擇集群所在region進入集群列表

    8. 點擊集群實例進入集群詳情頁面

    9. 點擊詳情頁面上方數(shù)據(jù)源頁簽進入數(shù)據(jù)源頁面點擊添加打他

    10. 選擇通用網(wǎng)絡,選擇對應的VPC和VSwith點擊下一步點擊確認等待創(chuàng)建成功data

  2. 將數(shù)據(jù)源ENI IP添加至RDS白名單

    1. 等待1-j步驟的數(shù)據(jù)源實例創(chuàng)建好以后找到ENI IP打他

    2. 進入RDS管控實例點擊數(shù)據(jù)庫連接,選擇白名單data

    3. 按照白名單添加的規(guī)則將IP白名單列表添加進去。data

    4. 添加白名單選配項,點擊確認data

3.登錄到RDS數(shù)據(jù)庫并執(zhí)行創(chuàng)表語句

dat

建表語句:

CREATE DATABASE case_demos;
USE case_demos;
CREATE TABLE `word_count_demo` (
  `word` text,
  `count` int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

4.登錄Databricks數(shù)據(jù)洞察集群進入Notebook,代碼實現(xiàn)MySQL數(shù)據(jù)讀寫。

示例文本下載:The_Sorrows_of_Young_Werther.txt

%spark
import java.sql.{Connection, DriverManager, PreparedStatement}
import java.util.Properties
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
//鏈接數(shù)據(jù)庫配置信息;
val dbName = "your dbName"
val tbName = "word_count_demo"
val dbUser = "your dbUser"
val dbPwd = "your dbPwd"
val dbUrl = "your bdUrl"
val dbPort = "3306"
val inputPath = "oss://ddi-test/The Sorrows of Young Werther"
val numPartitions = 3
//分區(qū)讀取OSS文件,并計算WordCount;
val input = sc.textFile(inputPath, numPartitions)
val counts = input.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_ + _).map(e => Row.apply(e._1, e._2))
//創(chuàng)建自定義schema;
lazy val schema = StructType(
    StructField("word", StringType) ::
    StructField("count", IntegerType) :: Nil)
//讀取MySQL配置信息;
val properties = new Properties()
properties.setProperty("user", dbUser)
properties.setProperty("password", dbPwd)
//創(chuàng)建的DataFrame;
val df = sqlContext.createDataFrame(counts, schema)
//將wordCount計算好結果的DF寫入到MySQL;
df.write.mode("append").option("driver","com.mysql.cj.jdbc.Driver").jdbc(s"jdbc:mysql://$dbUrl:$dbPort/$dbName", tbName, properties)
說明

當鏈接MySQL8.x需要加入配選項option("driver","com.mysql.cj.jdbc.Driver") 如果不加默認會用com.mysql.jdbc.Driver只能鏈接MySQL5.x版本。

查詢數(shù)據(jù)庫是否插入成功

data

5.讀取數(shù)據(jù)庫數(shù)據(jù)

%spark
import java.sql.{Connection, DriverManager, PreparedStatement}
import java.util.Properties
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
//鏈接數(shù)據(jù)庫配置信息;
val dbName = "your dbName"
val tbName = "word_count_demo"  //選擇過濾條件
val dbUser = "your dbUser"
val dbPwd = "your dbPwd"
val dbUrl = "your bdUrl"
val dbPort = "3306"
//讀取MySQL配置信息;
val properties = new Properties()
properties.setProperty("user", dbUser)
properties.setProperty("password", dbPwd)
//讀取數(shù)據(jù)
val rds_data=spark.read.option("driver","com.mysql.cj.jdbc.Driver").jdbc(s"jdbc:mysql://$dbUrl:$dbPort/$dbName", tbName, properties)
//數(shù)據(jù)展示
rds_data.show()
data