日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

使用Spark Structured Streaming實(shí)時(shí)處理Kafka數(shù)據(jù)

更新時(shí)間:

本文介紹如何使用阿里云 Databricks 數(shù)據(jù)洞察創(chuàng)建的集群去訪問(wèn)外部數(shù)據(jù)源 E-MapReduce,并運(yùn)行Spark Structured Streaming作業(yè)以消費(fèi)Kafka數(shù)據(jù)。

前提條件

  • 已注冊(cè)阿里云賬號(hào),詳情請(qǐng)參見(jiàn)阿里云賬號(hào)注冊(cè)流程

  • 已開(kāi)通 E-MapReduce服務(wù)。

  • 已開(kāi)通對(duì)象存儲(chǔ) OSS服務(wù)。

  • 已開(kāi)通 Databricks數(shù)據(jù)洞察服務(wù)。

步驟一:創(chuàng)建Kafka集群和Databricks 數(shù)據(jù)洞察集群

  1. 登錄阿里云E-MapReduce控制臺(tái)

  2. 創(chuàng)建Kafka集群。

  3. 登錄Databricks數(shù)據(jù)洞察控制臺(tái)

  4. 創(chuàng)建集群,詳情參見(jiàn)創(chuàng)建集群

步驟二:Databricks 數(shù)據(jù)洞察集群添加外部數(shù)據(jù)源

  1. 登錄Databricks數(shù)據(jù)洞察控制臺(tái)

  2. 單擊左側(cè)集群按鈕,選擇已創(chuàng)建的集群。

  3. 進(jìn)入集群詳情頁(yè)面,單擊上方數(shù)據(jù)源按鈕。

  4. 在數(shù)據(jù)源頁(yè)面,單擊添加按鈕,選擇Aliyun EMR KAFKA

  5. 填入描述,選擇kafka集群

添加數(shù)據(jù)源

步驟三:獲取JAR包并上傳到對(duì)象存儲(chǔ) OSS

  1. 登錄OSS管理控制臺(tái)

  2. 創(chuàng)建Bucket存儲(chǔ)空間,詳情請(qǐng)參見(jiàn)控制臺(tái)創(chuàng)建存儲(chǔ)空間

  3. 針對(duì)創(chuàng)建Databricks版本請(qǐng)選擇性獲取對(duì)應(yīng)的Jar(DBR7+兼容高版本DBR)。

  4. 上傳JAR,詳情請(qǐng)參見(jiàn)控制臺(tái)上傳文件

步驟四:在Kafka集群上創(chuàng)建Topic

本示例將創(chuàng)建一個(gè)分區(qū)數(shù)為10、副本數(shù)為2、名稱為test的Topic。

  1. 登錄Kafka集群的Master節(jié)點(diǎn),詳情請(qǐng)參見(jiàn)登錄集群

  2. 通過(guò)如下命令創(chuàng)建Topic。

/usr/lib/kafka-current/bin/kafka-topics.sh --partitions 10 --replication-factor 2 --zookeeper emr-header-1:2181 /kafka-1.0.0 --topic log_test --create
說(shuō)明

創(chuàng)建Topic后,請(qǐng)保留該登錄窗口,后續(xù)步驟仍將使用。

步驟五:運(yùn)行Spark Structured Streaming作業(yè)

  1. 登錄Databricks數(shù)據(jù)洞察控制臺(tái)

  2. 新建項(xiàng)目空間,詳情請(qǐng)參見(jiàn)項(xiàng)目管理

  3. 在所屬項(xiàng)目空間創(chuàng)建作業(yè),詳情請(qǐng)參見(jiàn)管理作業(yè)

  4. 執(zhí)行如下作業(yè)命令,進(jìn)行流式單詞統(tǒng)計(jì)。

--deploy-mode cluster   --class demo.action.AppKafkaSparkStreaming oss://xxx/xxx/SparkStreaming-1.0-SNAPSHOT.jar 
"oss://your bucket/checkpoint_dir" "192.168.xxx.xxx:9092" "log_test" "oss://your bucket/dataOutputPath"

關(guān)鍵參數(shù)說(shuō)明如下:

參數(shù)

說(shuō)明

oss://xxx/xxx/spark-kafka-sample-1.0-SNAPSHOT-jar-with-dependencies.jar

oss對(duì)象存儲(chǔ)上JAR包的位置

192.168.xxx.xxx:9092

Kafka集群中任一Kafka Broker組件的內(nèi)網(wǎng)IP地址。IP地址如下圖所示。

log_test

Topic名稱

oss://your bucket/dataOutputPath

oss對(duì)象存儲(chǔ)上數(shù)據(jù)寫(xiě)入目錄

查看Kafka集群IP

1602211879422_C927F65E-7358-45DA-AD21-B8C7A06E2A9E

步驟六:使用Kafka發(fā)布消息

  1. 在Kafka集群的命令行窗口,執(zhí)行如下命令運(yùn)行Kafka的生產(chǎn)者。

#日志結(jié)構(gòu)
40.198.97.68 - - [19/Mar/2021:18:26:12 +0800] "GET /category/office HTTP/1.1" 200 58 "/item/electronics/975" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.7 (KHTML, like Gecko) Chrome/16.0.912.77 Safari/535.7"
104.75.85.35 - - [19/Mar/2021:18:26:12 +0800] "GET /item/networking/4019 HTTP/1.1" 200 118 "-" "Mozilla/5.0 (iPhone; CPU iPhone OS 5_0_1 like Mac OS X) AppleWebKit/534.46 (KHTML, like Gecko) Version/5.1 Mobile/9A405 Safari/7534.48.3"
/usr/lib/kafka-current/bin/kafka-console-producer.sh --topic log_test --broker-list emr-worker-1:9092

在命令行中輸入4條數(shù)據(jù)

data

步驟七:查看Spark作業(yè)執(zhí)行情況

通過(guò)Yarn UI查看Spark Structured Streaming作業(yè)的信息,詳情請(qǐng)參見(jiàn)訪問(wèn)Web UI

  1. 在Hadoop控制臺(tái),單擊作業(yè)的ID。

1602209477597_2F740886-8F2F-44F2-AAFB-E6E702922808

詳細(xì)信息如下。

1602209918260_9A7A860D-E41F-4A45-878D-CA1D1E6ED122

點(diǎn)擊Logs,查看詳細(xì)信息。

步驟八:查看數(shù)據(jù)寫(xiě)入情況

1. 進(jìn)入Databricks 數(shù)據(jù)洞察所在集群的Notebook頁(yè)面,Notebook使用詳情參見(jiàn)(Notebook使用說(shuō)明

2. 執(zhí)行代碼查詢數(shù)據(jù)的寫(xiě)入情況

%spark
for( i <- 1 to 10 ) {
    Thread.sleep(5000)
    spark.sql("select count(*) from Delta.`oss://your bucket/dataOutputPath`").show()
}

查詢數(shù)據(jù)成功寫(xiě)入4條數(shù)據(jù)data