本文介紹如何使用Databricks DDI訪問MongoDB數據源數據。
前提條件
通過主賬號登錄阿里云 Databricks控制臺。
已創(chuàng)建MongoDB實例。
已創(chuàng)建DDI集群,具體請參見DDI集群創(chuàng)建。
創(chuàng)建集群并通過knox賬號訪問NoteBook。
使用Databricks 讀寫MongoDB數據
MongoDB數據源與DDI網絡打通
登錄MongoDB管理控制臺云數據庫MongoDB管理控制臺
點擊上部選擇實例所在region
點擊實例ID進入實例詳情頁面
查看基本信息找到對應的VPV和VSwitch
登錄到databricks數據洞察集群阿里云Databricks控制臺
選擇集群所在region進入集群列表
點擊集群實例進入集群詳情頁面
點擊詳情頁面上方數據源頁簽進入數據源頁面點擊添加
選擇通用網絡,選擇對應的VPC和VSwith點擊下一步點擊確認等待創(chuàng)建成功
添加集群ENI IP至MongoDB數據庫白名單(登錄MongoDB云產品管理控制臺->白名單設置)
讀寫MongoDB數據(非SSL)
表讀取mongodb數據
%spark //讀取mongodb數據 val mongoDF = spark.read.format("com.mongodb.spark.sql").option("uri", 'your connection uri').option("database","your database").option("collection", "your collection").load() mongoDF.show(2)
使用創(chuàng)建Delta表的方式讀取MongoDB數據源
創(chuàng)建Delta表
%sql create database if not exists mongodb; use mongodb; create table mongodb_test( id string, name string )using com.mongodb.spark.sql options ( uri 'your connection uri', database 'your database', collection 'your collection' )
向表中插入數據
%spark //定義內部類 case class Student(id: String, name: String) //創(chuàng)建自定義數據源DataFrame val df = spark.createDataFrame(sc.parallelize(List(Student("2172","Torcuato"), Student("3142", "Rosalinda")))) //數據寫入到mongodb df.write.format("com.mongodb.spark.sql").mode("append").saveAsTable("mongodb_test")
SQL查詢
%sql select * from mongodb_test limit 2;
讀寫MongoDB數據源(開啟SSL)
MongoDB 數據庫SSL證書庫jks下載
說明阿里云MongoDB證書下載,可以到云數據庫MongoDB管控進行下載,證書庫的默認密碼:apsaradb
將jks證書庫分發(fā)到DDI所有服務器節(jié)點(此步驟可以聯系DDI開發(fā)運維人員協(xié)助)
在notebook讀寫數據
引入spark.conf調用jks證書庫,將證書加載到spark-session中。
%spark.conf spark.executor.extraJavaOptions -Djavax.net.ssl.trustStore=/home/hadoop/cer/ApsaraDB-CA-Chain.jks -Djavax.net.ssl.trustStorePassword=**** spark.driver.extraJavaOptions -Djavax.net.ssl.trustStore=/home/hadoop/cer/ApsaraDB-CA-Chain.jks -Djavax.net.ssl.trustStorePassword=****
執(zhí)行代碼訪問MongoDB數據源
%spark val mongoDF = spark.read.format("com.mongodb.spark.sql").option("uri", "mongodb://root:***@dds-******.mongodb.rds.aliyuncs.com:3717/admin?ssl=true&tlsAllowInvalidHostnames=true").option("collection", "****").load() mongoDF.show(1)
文檔內容是否對您有幫助?