首頁(yè)
云數(shù)據(jù)庫(kù)HBase版
操作指南
HBase 增強(qiáng)版(推薦)
數(shù)據(jù)通道
通過(guò)Spark訪問(wèn)HBase增強(qiáng)版
通過(guò)Spark訪問(wèn)HBase增強(qiáng)版
更新時(shí)間:
本文介紹如何通過(guò)Spark訪問(wèn)HBase增強(qiáng)版集群。
前提條件
HBase增強(qiáng)版集群為2.4.3及以上版本。如何查看或升級(jí)當(dāng)前版本,請(qǐng)參見(jiàn)小版本升級(jí)。
已將客戶端IP地址添加至HBase增強(qiáng)版集群的白名單。如何添加,請(qǐng)參見(jiàn)設(shè)置白名單和安全組。
已在控制臺(tái)獲取HBase增強(qiáng)版集群的連接地址(Java API訪問(wèn)地址)。
注意事項(xiàng)
如果您想要通過(guò)公網(wǎng)訪問(wèn),在執(zhí)行文本操作前,需要將HBase社區(qū)版客戶端更換為阿里云HBase客戶端。具體操作,請(qǐng)參見(jiàn)升級(jí)HBase Java SDK。
如果應(yīng)用部署在ECS實(shí)例,通過(guò)專(zhuān)有網(wǎng)絡(luò)訪問(wèn)HBase增強(qiáng)版集群前,需要確保HBase增強(qiáng)版集群和ECS實(shí)例滿足以下條件,以保證網(wǎng)絡(luò)的連通性。
所在地域相同,并建議所在可用區(qū)相同(以減少網(wǎng)絡(luò)延時(shí))。
ECS實(shí)例與HBase增強(qiáng)版集群屬于同一專(zhuān)有網(wǎng)絡(luò)。
添加HBase增強(qiáng)版訪問(wèn)配置
方式一:通過(guò)配置文件添加訪問(wèn)配置。
在配置文件
hbase-site.xml
中增加下列配置項(xiàng):<configuration> <!-- 集群的Java API連接地址,在控制臺(tái)頁(yè)面的數(shù)據(jù)庫(kù)連接界面獲取 --> <property> <name>hbase.zookeeper.quorum</name> <value>ld-bp150tns0sjxs****-proxy-hbaseue.hbaseue.rds.aliyuncs.com:30020</value> </property> </configuration>
方式二:通過(guò)代碼在Configuration中添加參數(shù)。
// 新建一個(gè)Configuration Configuration conf = HBaseConfiguration.create(); // 集群的Java API連接地址,在控制臺(tái)頁(yè)面的數(shù)據(jù)庫(kù)連接界面獲取 conf.set("hbase.zookeeper.quorum", "ld-bp150tns0sjxs****-proxy-hbaseue.hbaseue.rds.aliyuncs.com:30020");
Spark訪問(wèn)示例
test(" test the spark sql count result") {
//1. 添加HBaseue訪問(wèn)配置
var conf = HBaseConfiguration.create
conf.set("hbase.zookeeper.quorum", "ld-bp150tns0sjxs****-proxy-hbaseue.hbaseue.rds.aliyuncs.com:30020")
//2. 創(chuàng)建表
val hbaseTableName = "testTable"
val cf = "f"
val column1 = cf + ":a"
val column2 = cf + ":b"
var rowsCount: Int = -1
var namespace = "spark_test"
val admin = ConnectionFactory.createConnection(conf).getAdmin()
val tableName = TableName.valueOf(namespace, hbaseTableName)
val htd = new HTableDescriptor(tableName)
htd.addFamily(new HColumnDescriptor(cf))
admin.createTable(htd)
//3. 插入測(cè)試數(shù)據(jù)
val rng = new Random()
val k: Array[Byte] = new Array[Byte](3)
val famAndQf = KeyValue.parseColumn(Bytes.toBytes(column))
val puts = new util.ArrayList[Put]()
var i = 0
for (b1 <- ('a' to 'z')) {
for (b2 <- ('a' to 'z')) {
for (b3 <- ('a' to 'z')) {
if(i < 10) {
k(0) = b1.toByte
k(1) = b2.toByte
k(2) = b3.toByte
val put = new Put(k)
put.addColumn(famAndQf(0), famAndQf(1), ("value_" + b1 + b2 + b3).getBytes())
puts.add(put)
i = i + 1
}
}
}
}
val conn = ConnectionFactory.createConnection(conf)
val table = conn.getTable(tableName)
table.put(puts)
//4. 創(chuàng)建spark表
val sparkTableName = "spark_hbase"
val createCmd = s"""CREATE TABLE ${sparkTableName} USING org.apache.hadoop.hbase.spark
| OPTIONS ('catalog'=
| '{"table":{"namespace":"$${hbaseTableName}", "name":"${hbaseTableName}"},"rowkey":"rowkey",
| "columns":{
| "col0":{"cf":"rowkey", "col":"rowkey", "type":"string"},
| "col1":{"cf":"cf1", "col":"a", "type":"string"},
| "col2":{"cf":"cf1", "col":"b", "type":"String"}}}'
| )""".stripMargin
println(" createCmd: \n" + createCmd + " rows : " + rowsCount)
sparkSession.sql(createCmd)
//5. 執(zhí)行count sql
val result = sparkSession.sql("select count(*) from " + sparkTableName)
val sparkCounts = result.collect().apply(0).getLong(0)
println(" sparkCounts : " + sparkCounts)
文檔內(nèi)容是否對(duì)您有幫助?