本文介紹如何使用Databricks 讀寫Redis數據。
前提條件
通過主賬號登錄阿里云 Databricks控制臺。
已創建Redis實例,具體參見創建Redis實例。
已創建DDI集群,具體請參見DDI集群創建。
創建集群并通過Knox賬號訪問NoteBook。
目前暫不支持Spark3.0和DBR7及以上的版本讀寫Redis。
打通網絡環境
進入DDI數據源點擊添加選擇通用網絡打通,選擇Redis數據庫所在的VPC和vsw。
登錄Redis控制臺添加DDI集群各個機器IP至訪問白名單,或者在VPC登錄處設置允許VPC內免密訪問。
使用Databricks 讀寫Redis數據
鏈接Redis數據庫代碼實現
%spark
import com.redislabs.provider.redis._
val redisServerDnsAddress = "your address"
val redisPortNumber = 6379
val redisPassword = "your password"
//獲取RedisConfig
val redisConfig = new RedisConfig(new RedisEndpoint(redisServerDnsAddress, redisPortNumber, redisPassword))
Redis數據庫中相關String,List,Set,Hash的數據類型讀寫代碼如下:
String 讀寫
%spark
//String 讀寫
val stringRDD = sc.parallelize(Seq(("000001", "Jack"), ("000002", "Rose")))
sc.toRedisKV(stringRDD)(redisConfig)
val keysRDD = sc.fromRedisKeyPattern("000*", 5)(redisConfig)
val stringRDD2 = keysRDD.getKV
stringRDD2.collect().foreach(println)
List讀寫
%spark
//List 讀寫
val stringListRDD = sc.parallelize(Seq("dog", "cat", "pig"))
sc.toRedisLIST(stringListRDD, "animal")(redisConfig)
val keysRDD = sc.fromRedisKeyPattern("animal*")(redisConfig)
val listRDD = keysRDD.getList
listRDD.collect().foreach(println)
HASH讀寫
%spark
//Hash讀寫
val stringHashRDD = sc.parallelize(Seq(("jack","22"), ("rose","23"),("sir","24"),("jack","24")))
sc.toRedisHASH(stringHashRDD, "message")(redisConfig)
val keysRDD = sc.fromRedisKeyPattern("message*")(redisConfig)
val hashRDD = keysRDD.getHash
hashRDD.collect().foreach(println)
SET讀寫
%spark
//Set 讀寫
val stringHashRDD = sc.parallelize(Seq("hello", "hello", "word"))
sc.toRedisSET(stringHashRDD, "setKey")(redisConfig)
val keysRDD = sc.fromRedisKeyPattern("setKey*")(redisConfig)
val setRDD = keysRDD.getSet
setRDD.collect().foreach(println)
文檔內容是否對您有幫助?