訪問RDS MySQL數(shù)據(jù)源
云原生數(shù)據(jù)倉庫 AnalyticDB MySQL 版支持訪問同賬號(hào)或其他阿里云賬號(hào)(跨賬號(hào))下RDS MySQL中的數(shù)據(jù)。您可以通過ENI和SSL鏈路兩種方式訪問RDS MySQL數(shù)據(jù),SSL鏈路訪問RDS MySQL數(shù)據(jù)時(shí)可以加密網(wǎng)絡(luò)連接,保證數(shù)據(jù)的安全性,對(duì)比ENI訪問方式更加安全。本文主要介紹通過ENI和SSL鏈路訪問RDS MySQL數(shù)據(jù)的具體方法。
前提條件
AnalyticDB for MySQL集群的產(chǎn)品系列為企業(yè)版、基礎(chǔ)版或湖倉版。
AnalyticDB for MySQL集群與RDS MySQL實(shí)例位于同一地域。具體操作,請(qǐng)參見創(chuàng)建集群和創(chuàng)建RDS MySQL實(shí)例。
已在AnalyticDB for MySQL集群中創(chuàng)建Job型資源組。具體操作,請(qǐng)參見新建資源組。
已創(chuàng)建AnalyticDB for MySQL集群的數(shù)據(jù)庫賬號(hào)。
如果是通過阿里云賬號(hào)訪問,只需創(chuàng)建高權(quán)限賬號(hào)。具體操作,請(qǐng)參見創(chuàng)建高權(quán)限賬號(hào)。
如果是通過RAM用戶訪問,需要?jiǎng)?chuàng)建高權(quán)限賬號(hào)和普通賬號(hào)并且將RAM用戶綁定到普通賬號(hào)上。具體操作,請(qǐng)參見創(chuàng)建數(shù)據(jù)庫賬號(hào)和綁定或解綁RAM用戶與數(shù)據(jù)庫賬號(hào)。
已將RDS MySQL實(shí)例添加到安全組中,且安全組規(guī)則的入方向與出方向放行RDS MySQL端口的訪問請(qǐng)求。具體操作,請(qǐng)參見設(shè)置安全組和添加安全組規(guī)則。
已開通OSS服務(wù),并創(chuàng)建與AnalyticDB for MySQL集群位于相同地域的存儲(chǔ)空間。具體操作,請(qǐng)參見開通OSS服務(wù)和創(chuàng)建存儲(chǔ)空間。
已完成授權(quán)操作。具體操作,請(qǐng)參見賬號(hào)授權(quán)。
重要同賬號(hào)訪問時(shí),需具備AliyunADBSparkProcessingDataRole權(quán)限;跨賬號(hào)訪問時(shí),需要對(duì)其他阿里云賬號(hào)授權(quán)。
數(shù)據(jù)準(zhǔn)備
在RDS MySQL中創(chuàng)建數(shù)據(jù)庫和表,并插入數(shù)據(jù)。示例語句如下:
CREATE DATABASE `test`;
CREATE TABLE `test`.`persons` (
`id` int(11) DEFAULT NULL,
`first_name` varchar(32) DEFAULT NULL,
`laster_name` varchar(32) DEFAULT NULL,
`age` int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8
;
INSERT INTO persons VALUES(1,'a','b',5);
INSERT INTO persons VALUES(2,'c','d',6);
INSERT INTO persons VALUES(3,'e','f',7);
通過ENI訪問RDS MySQL數(shù)據(jù)
上傳驅(qū)動(dòng)程序及Spark作業(yè)依賴的Jar包
編寫訪問RDS MySQL表的示例程序(即Spark作業(yè)依賴的Jar包),并進(jìn)行編譯打包。本文生成的Jar包名稱為
rds_test.jar
。示例代碼如下:package com.aliyun.spark import org.apache.spark.sql.SparkSession object SparkRDS { def main(args: Array[String]): Unit = { val sparkSession = SparkSession.builder() .appName("rds mysql test") .getOrCreate() // RDS MySQL實(shí)例的內(nèi)網(wǎng)地址。查看方法,請(qǐng)參見查看或修改內(nèi)外網(wǎng)地址和端口。 val url = "jdbc:mysql://rm-bp11mpql1e01****.mysql.rds.aliyuncs.com" // RDS MySQL的表名。格式為"db_name.table_name"。 val dbtable = "test.persons" // 連接RDS MySQL數(shù)據(jù)庫的賬號(hào)。 val user = "mysql_username" // RDS MySQL數(shù)據(jù)庫賬號(hào)的密碼。 val password = "mysql_password" val jdbcDF = sparkSession.read .format("jdbc") .option("url", url) .option("driver", "com.mysql.jdbc.Driver") .option("dbtable", dbtable) .option("user", user) .option("password", password) .load() jdbcDF.show() } }
在官方網(wǎng)站下載適配RDS MySQL版本的驅(qū)動(dòng)程序。下載地址,請(qǐng)參見https://dev.mysql.com/downloads/connector/j/。
本文以mysql-connector-java-8.0.11.jar為例。
將Spark作業(yè)依賴的Jar包及RDS MySQL驅(qū)動(dòng)程序上傳至OSS中。具體操作,請(qǐng)參見上傳文件。
同賬號(hào)訪問RDS MySQL數(shù)據(jù)
登錄云原生數(shù)據(jù)倉庫AnalyticDB MySQL控制臺(tái),在左上角選擇集群所在地域。在左側(cè)導(dǎo)航欄,單擊集群列表。在集群列表上方,選擇產(chǎn)品系列,然后單擊目標(biāo)集群ID。
在左側(cè)導(dǎo)航欄,單擊
。在編輯器窗口上方,選擇Job型資源組和作業(yè)類型。本文以Batch類型為例。
在編輯器中輸入以下作業(yè)內(nèi)容。
{ "name": "rds-mysql-example", "jars": [ "oss://testBucketName/mysql-connector-java-8.0.11.jar" ], "file": "oss://testBucketName/rds_test.jar", "className": "com.aliyun.spark.SparkRDS", "conf": { "spark.adb.eni.enabled": "true", "spark.adb.eni.vswitchId": "vsw-bp17jqw3lrrobn6y****", "spark.adb.eni.securityGroupId": "sg-bp163uxgt4zandx****", "spark.driver.resourceSpec": "small", "spark.executor.instances": 1, "spark.executor.resourceSpec": "small" } }
參數(shù)說明:
參數(shù)
說明
name
Spark作業(yè)名稱。
jars
RDS MySQL驅(qū)動(dòng)程序所在的OSS路徑。
本文示例為mysql-connector-java-8.0.11.jar包所在的OSS路徑。
file
Spark作業(yè)依賴的Jar包所在的OSS路徑。
className
Java或者Scala程序入口類名稱。
本文示例為com.aliyun.spark.SparkRDS。
spark.adb.eni.enabled
開啟ENI訪問。
spark.adb.eni.vswitchId
交換機(jī)ID。在RDS MySQL實(shí)例的數(shù)據(jù)庫連接頁面,將鼠標(biāo)移動(dòng)至VPC處,獲取交換機(jī)ID。
spark.adb.eni.securityGroupId
RDS MySQL實(shí)例中添加的安全組ID。如未添加安全組,請(qǐng)參見設(shè)置安全組。
conf其他參數(shù)
與開源Spark中的配置項(xiàng)基本一致,參數(shù)格式為
key:value
形式,多個(gè)參數(shù)之間以英文逗號(hào)(,)分隔。更多應(yīng)用配置參數(shù),請(qǐng)參見Spark應(yīng)用配置參數(shù)說明。單擊立即執(zhí)行。
Spark作業(yè)執(zhí)行成功后,您可以在Spark日志中查看RDS MySQL表的數(shù)據(jù)。如何查看日志,請(qǐng)參見查看Spark應(yīng)用信息。
跨賬號(hào)訪問RDS MySQL數(shù)據(jù)
登錄云原生數(shù)據(jù)倉庫AnalyticDB MySQL控制臺(tái),在左上角選擇集群所在地域。在左側(cè)導(dǎo)航欄,單擊集群列表。在集群列表上方,選擇產(chǎn)品系列,然后單擊目標(biāo)集群ID。
在左側(cè)導(dǎo)航欄,單擊
。在編輯器窗口上方,選擇Job型資源組和作業(yè)類型。本文以Batch類型為例。
在編輯器中輸入以下作業(yè)內(nèi)容。
{ "name": "rds-mysql-example", "jars": [ "oss://testBucketName/mysql-connector-java-8.0.11.jar" ], "file": "oss://testBucketName/rds_test.jar", "className": "com.aliyun.spark.SparkRDS", "conf": { "spark.adb.eni.enabled": "true", "spark.adb.eni.vswitchId": "vsw-bp17jqw3lrrobn6y****", "spark.adb.eni.securityGroupId": "sg-bp163uxgt4zandx****", "spark.driver.resourceSpec": "small", "spark.executor.instances": 1, "spark.executor.resourceSpec": "small" "spark.adb.eni.roleArn":"acs:ram::testAccountID:role/testUserName" } }
參數(shù)說明:
參數(shù)
說明
spark.adb.eni.roleArn
跨賬號(hào)訪問RDS數(shù)據(jù)源時(shí)使用的RAM角色。多個(gè)角色之間使用英文逗號(hào)(,)隔開。格式為
acs:ram::testAccountID:role/testUserName
。testAccountID
:RDS數(shù)據(jù)源所屬的阿里云賬號(hào)ID。testUserName
:跨賬號(hào)授權(quán)時(shí)創(chuàng)建的RAM角色。詳細(xì)信息,請(qǐng)參見跨賬號(hào)授權(quán)。
更多參數(shù),請(qǐng)參見參數(shù)說明。
單擊立即執(zhí)行。
Spark作業(yè)執(zhí)行成功后,您可以在Spark日志中查看RDS MySQL表的數(shù)據(jù)。如何查看日志,請(qǐng)參見查看Spark應(yīng)用信息。
通過SSL鏈路訪問RDS MySQL數(shù)據(jù)
通過SSL鏈路訪問RDS MySQL數(shù)據(jù)時(shí),RDS MySQL實(shí)例需開啟SSL加密,且開啟SSL加密時(shí)必須選擇加密內(nèi)網(wǎng)鏈路。具體操作,請(qǐng)參見使用云端證書快速開啟SSL鏈路加密。
下載CA證書并上傳至OSS
登錄RDS管理控制臺(tái),在左上角選擇集群所在地域。在左側(cè)導(dǎo)航欄,單擊實(shí)例列表,單擊目標(biāo)集群ID。
在左側(cè)導(dǎo)航欄單擊數(shù)據(jù)安全性。
單擊下載CA證書。
重要CA證書的默認(rèn)有效期為1年,過期需要重新生成。 使用過期的CA證書則無法通過SSL鏈路訪問RDS的數(shù)據(jù)。
解壓CA證書壓縮包,并將JKS文件上傳至OSS中。具體操作,請(qǐng)參見上傳文件。
上傳驅(qū)動(dòng)程序及Spark作業(yè)依賴的Jar包
編寫訪問RDS MySQL表的示例程序,并進(jìn)行編譯打包。本文生成的Jar包名稱為
test.jar
。示例代碼如下:package org.example import org.apache.spark.sql.SparkSession object Test { def main(args: Array[String]): Unit = { // JKS文件所在的OSS路徑oss://testBucketName/folder/ApsaraDB-CA-Chain.jks。 val JKS_FILE_PATH = args(0) // 連接RDS MySQL數(shù)據(jù)庫的賬號(hào)。 val USERNAME = args(1) // RDS MySQL數(shù)據(jù)庫賬號(hào)的密碼。 val PASSWORD = args(2) // RDS MySQL的數(shù)據(jù)庫名。 val DATABASE_NAME = args(3) // RDS MySQL的表名。 val TABLE_NAME = args(4) // RDS MySQL實(shí)例的內(nèi)網(wǎng)地址。 val mysqlUrl = "jdbc:mysql://rm-bp11mpql1e01****.mysql.rds.aliyuncs.com:3306/?" + "useSSL=true" + s"&trustCertificateKeyStoreUrl=file:///tmp/testBucketName/folder/ApsaraDB-CA-Chain.jks" + "&trustCertificateKeyStorePassword=apsaradb" + "&trustCertificateKeyStoreType=JKS" + val spark = SparkSession.builder().getOrCreate() spark.read.format("jdbc") .option("driver", "com.mysql.cj.jdbc.Driver") .option("url", mysqlUrl) .option("user", USERNAME) .option("password", PASSWORD) .option("dbtable", s"${DATABASE_NAME}.${TABLE_NAME}") .load() .show() } }
參數(shù)說明:
參數(shù)
說明
useSSL
是否使用SSL加密鏈接。取值:
true:是。
false(默認(rèn)值):否。
本文示例需選擇true。
trustCertificateKeyStoreUrl
JKS證書所在的本地路徑,格式為
file:///tmp/<JKS_FILE_PATH>
,其中,JKS_FILE_PATH
為JKS證書所在的OSS路徑。例如:JKS證書所在的OSS路徑為
oss://testBucketName/folder/ApsaraDB-CA-Chain.jks
,則JKS證書所在的本地路徑為file:///tmp/testBucketName/folder/ApsaraDB-CA-Chain.jks
。trustCertificateKeyStorePassword
JKS證書的密碼,固定為apsaradb。
trustCertificateKeyStoreType
證書的存儲(chǔ)格式,固定為JKS。
將
test.jar
包上傳至OSS中。具體操作,請(qǐng)參見上傳文件。
同賬號(hào)訪問RDS MySQL數(shù)據(jù)
登錄云原生數(shù)據(jù)倉庫AnalyticDB MySQL控制臺(tái),在左上角選擇集群所在地域。在左側(cè)導(dǎo)航欄,單擊集群列表。在集群列表上方,選擇產(chǎn)品系列,然后單擊目標(biāo)集群ID。
在左側(cè)導(dǎo)航欄,單擊
。在編輯器窗口上方,選擇Job型資源組和作業(yè)類型。本文以Batch類型為例。
在編輯器中輸入以下作業(yè)內(nèi)容。
{ "file": "oss://testBucketName/test.jar", "className": "org.example.Test", "name": "MYSQL PEM Test", "conf": { "spark.kubernetes.driverEnv.ADB_SPARK_DOWNLOAD_FILES": "oss://testBucketName/folder/ApsaraDB-CA-Chain.jks", "spark.executorEnv.ADB_SPARK_DOWNLOAD_FILES": "oss://testBucketName/folder/ApsaraDB-CA-Chain.jks", "spark.driver.resourceSpec": "small", "spark.executor.instances": 1, "spark.executor.resourceSpec": "small", "spark.adb.eni.enabled": "true", "spark.adb.eni.vswitchId": "vsw-bp17jqw3lrrobn6y****", "spark.adb.eni.securityGroupId": "sg-bp163uxgt4zandx****" } }
參數(shù)說明:
參數(shù)
說明
name
Spark作業(yè)名稱。
file
Spark作業(yè)依賴的Jar包所在的OSS路徑。
className
Java或者Scala程序入口類名稱。
本文示例為org.example.Test。
spark.kubernetes.driverEnv.ADB_SPARK_DOWNLOAD_FILES
Spark Driver節(jié)點(diǎn)參數(shù),用于指定JKS證書所在的OSS路徑。多個(gè)JKS證書中間用英文逗號(hào)(,)分隔。例如:
spark.kubernetes.driverEnv.ADB_SPARK_DOWNLOAD_FILES: "oss://testBucketName/a.jks,oss://testBucketName/b.jks"
。spark.executorEnv.ADB_SPARK_DOWNLOAD_FILES
Spark Executor節(jié)點(diǎn)參數(shù),用于指定JKS證書所在的OSS路徑。多個(gè)JKS證書中間用英文逗號(hào)(,)分隔。例如:
spark.executor.ADB_SPARK_DOWNLOAD_FILES: "oss://testBucketName/a.jks,oss://testBucketName/b.jks"
。spark.adb.eni.enabled
開啟ENI訪問。
spark.adb.eni.vswitchId
交換機(jī)ID。在RDS MySQL實(shí)例的數(shù)據(jù)庫連接頁面,將鼠標(biāo)移動(dòng)至VPC處,獲取交換機(jī)ID。
spark.adb.eni.securityGroupId
RDS MySQL實(shí)例中添加的安全組ID。如未添加安全組,請(qǐng)參見設(shè)置安全組。
conf其他參數(shù)
與開源Spark中的配置項(xiàng)基本一致,參數(shù)格式為
key:value
形式,多個(gè)參數(shù)之間以英文逗號(hào)(,)分隔。更多應(yīng)用配置參數(shù),請(qǐng)參見Spark應(yīng)用配置參數(shù)說明。單擊立即執(zhí)行。
Spark作業(yè)執(zhí)行成功后,您可以在Spark日志中查看RDS MySQL表的數(shù)據(jù)。如何查看日志,請(qǐng)參見查看Spark應(yīng)用信息。
跨賬號(hào)訪問RDS MySQL數(shù)據(jù)
登錄云原生數(shù)據(jù)倉庫AnalyticDB MySQL控制臺(tái),在左上角選擇集群所在地域。在左側(cè)導(dǎo)航欄,單擊集群列表。在集群列表上方,選擇產(chǎn)品系列,然后單擊目標(biāo)集群ID。
在左側(cè)導(dǎo)航欄,單擊
。在編輯器窗口上方,選擇Job型資源組和作業(yè)類型。本文以Batch類型為例。
在編輯器中輸入以下作業(yè)內(nèi)容。
{ "file": "oss://testBucketName/test.jar", "className": "org.example.Test", "name": "MYSQL PEM Test", "conf": { "spark.kubernetes.driverEnv.ADB_SPARK_DOWNLOAD_FILES": "oss://testBucketName/folder/ApsaraDB-CA-Chain.jks", "spark.executorEnv.ADB_SPARK_DOWNLOAD_FILES": "oss://testBucketName/folder/ApsaraDB-CA-Chain.jks", "spark.driver.resourceSpec": "small", "spark.executor.instances": 1, "spark.executor.resourceSpec": "small", "spark.adb.eni.enabled": "true", "spark.adb.eni.vswitchId": "vsw-bp17jqw3lrrobn6y****", "spark.adb.eni.securityGroupId": "sg-bp163uxgt4zandx****", "spark.adb.eni.roleArn":"acs:ram::testAccountID:role/testUserName", } }
參數(shù)說明:
參數(shù)
說明
spark.adb.eni.roleArn
跨賬號(hào)訪問RDS數(shù)據(jù)源時(shí)使用的RAM角色。多個(gè)角色之間使用英文逗號(hào)(,)隔開。格式為
acs:ram::testAccountID:role/testUserName
。testAccountID
:RDS數(shù)據(jù)源所屬的阿里云賬號(hào)ID。testUserName
:跨賬號(hào)授權(quán)時(shí)創(chuàng)建的RAM角色。詳細(xì)信息,請(qǐng)參見跨賬號(hào)授權(quán)。
更多參數(shù),請(qǐng)參見參數(shù)說明。
單擊立即執(zhí)行。
Spark作業(yè)執(zhí)行成功后,您可以在Spark日志中查看RDS MySQL表的數(shù)據(jù)。如何查看日志,請(qǐng)參見查看Spark應(yīng)用信息。