通過Serverless Spark提交PySpark流任務(wù)
在大數(shù)據(jù)快速發(fā)展的時(shí)代,流式處理技術(shù)對(duì)于實(shí)時(shí)數(shù)據(jù)分析至關(guān)重要。EMR Serverless Spark提供了一個(gè)強(qiáng)大而可擴(kuò)展的平臺(tái),它不僅簡(jiǎn)化了實(shí)時(shí)數(shù)據(jù)處理流程,還免去了服務(wù)器管理的煩惱,提升了效率。本文將指導(dǎo)您使用EMR Serverless Spark提交PySpark流式任務(wù),展示其在流處理方面的易用性和可運(yùn)維性。
前提條件
已創(chuàng)建工作空間,詳情請(qǐng)參見創(chuàng)建工作空間。
操作流程
步驟一:創(chuàng)建實(shí)時(shí)數(shù)據(jù)流集群并產(chǎn)生消息
在EMR on ECS頁(yè)面,創(chuàng)建包含Kafka服務(wù)的實(shí)時(shí)數(shù)據(jù)流集群,詳情請(qǐng)參見創(chuàng)建集群。
登錄EMR集群的Master節(jié)點(diǎn),詳情請(qǐng)參見登錄集群。
執(zhí)行以下命令,切換目錄。
cd /var/log/emr/taihao_exporter
執(zhí)行以下命令,創(chuàng)建Topic。
# 創(chuàng)建名為taihaometrics的Topic,分區(qū)數(shù)10,副本因子2。 kafka-topics.sh --partitions 10 --replication-factor 2 --bootstrap-server core-1-1:9092 --topic taihaometrics --create
執(zhí)行以下命令,發(fā)送消息。
# 使用kafka-console-producer發(fā)送消息到taihaometrics Topic。 tail -f metrics.log | kafka-console-producer.sh --broker-list core-1-1:9092 --topic taihaometrics
步驟二:新增網(wǎng)絡(luò)連接
進(jìn)入網(wǎng)絡(luò)連接頁(yè)面。
在EMR控制臺(tái)的左側(cè)導(dǎo)航欄,選擇
。在Spark頁(yè)面,單擊目標(biāo)工作空間名稱。
在EMR Serverless Spark頁(yè)面,單擊左側(cè)導(dǎo)航欄中的網(wǎng)絡(luò)連接。
在網(wǎng)絡(luò)連接頁(yè)面,單擊新增網(wǎng)絡(luò)連接。
在新增網(wǎng)絡(luò)連接對(duì)話框中,配置以下信息,單擊確定。
參數(shù)
說明
連接名稱
輸入新增連接的名稱。例如,connection_to_emr_kafka。
專有網(wǎng)絡(luò)
選擇與EMR集群相同的專有網(wǎng)絡(luò)。
如果當(dāng)前沒有可選擇的專有網(wǎng)絡(luò),請(qǐng)單擊創(chuàng)建專有網(wǎng)絡(luò),前往專有網(wǎng)絡(luò)控制臺(tái)創(chuàng)建,詳情請(qǐng)參見創(chuàng)建和管理專有網(wǎng)絡(luò)。
交換機(jī)
選擇與EMR集群部署在同一專有網(wǎng)絡(luò)下的相同交換機(jī)。
如果當(dāng)前可用區(qū)沒有交換機(jī),請(qǐng)單擊虛擬交換機(jī),前往專有網(wǎng)絡(luò)控制臺(tái)創(chuàng)建,詳情請(qǐng)參見創(chuàng)建和管理交換機(jī)。
當(dāng)狀態(tài)顯示為已成功時(shí),表示新增網(wǎng)絡(luò)連接成功。
步驟三:為EMR集群添加安全組規(guī)則
獲取集群節(jié)點(diǎn)交換機(jī)的網(wǎng)段。
您可以在節(jié)點(diǎn)管理頁(yè)面,單擊節(jié)點(diǎn)組名稱,查看關(guān)聯(lián)的交換機(jī)信息,然后登錄專有網(wǎng)絡(luò)管理控制臺(tái),在交換機(jī)頁(yè)面獲取交換機(jī)的網(wǎng)段。
添加安全組規(guī)則。
在集群管理頁(yè)面,單擊目標(biāo)集群的集群ID。
在基礎(chǔ)信息頁(yè)面,單擊集群安全組后面的鏈接。
在安全組規(guī)則頁(yè)面,單擊手動(dòng)添加,填寫端口范圍和授權(quán)對(duì)象,然后單擊保存。
參數(shù)
說明
端口范圍
填寫9092端口。
授權(quán)對(duì)象
填寫前一步驟中獲取的指定交換機(jī)的網(wǎng)段。
重要為防止被外部的用戶攻擊導(dǎo)致安全問題,授權(quán)對(duì)象禁止填寫為0.0.0.0/0。
步驟四:上傳JAR包至OSS
解壓文件kafka.zip,并將文件中的所有JAR包上傳至OSS,上傳操作請(qǐng)參見簡(jiǎn)單上傳。
步驟五:上傳資源文件
在EMR Serverless Spark頁(yè)面,單擊左側(cè)導(dǎo)航欄中的文件管理。
在文件管理頁(yè)面,單擊上傳文件。
在上傳文件對(duì)話框中,單擊待上傳文件區(qū)域選擇pyspark_ss_demo.py文件。
步驟六:新建并啟動(dòng)流任務(wù)
在EMR Serverless Spark頁(yè)面,單擊左側(cè)的數(shù)據(jù)開發(fā)。
單擊新建。
輸入名稱,任務(wù)類型選擇
,然后單擊確定。在新建的開發(fā)中,配置以下信息,其余參數(shù)無需配置,然后單擊保存。
參數(shù)
說明
主Python資源
選擇前一個(gè)步驟中在資源上傳頁(yè)面上傳的pyspark_ss_demo.py文件。
引擎版本
Spark的版本,詳情請(qǐng)參見引擎版本介紹。
運(yùn)行參數(shù)
EMR集群core-1-1節(jié)點(diǎn)的內(nèi)網(wǎng)IP地址。您可以在EMR集群的節(jié)點(diǎn)管理頁(yè)面的Core節(jié)點(diǎn)組下查看。
Spark配置
Spark的配置信息。本文示例如下。
spark.jars oss://path/to/commons-pool2-2.11.1.jar,oss://path/to/kafka-clients-2.8.1.jar,oss://path/to/spark-sql-kafka-0-10_2.12-3.3.1.jar,oss://path/to/spark-token-provider-kafka-0-10_2.12-3.3.1.jar spark.emr.serverless.network.service.name connection_to_emr_kafka
說明spark.jars
:指定Spark任務(wù)運(yùn)行時(shí)需要加載的外部JAR包路徑。請(qǐng)根據(jù)實(shí)際情況替換為步驟四中上傳的所有JAR包文件路徑。spark.emr.serverless.network.service.name
:指定網(wǎng)絡(luò)連接的名稱。請(qǐng)根據(jù)實(shí)際情況替換為步驟二中創(chuàng)建的網(wǎng)絡(luò)連接名稱。
單擊發(fā)布。
在發(fā)布任務(wù)對(duì)話框中,單擊確定。
啟動(dòng)流任務(wù)。
單擊前往運(yùn)維。
單擊啟動(dòng)。
步驟七:查看日志
單擊日志探查頁(yè)簽。
在Driver日志列表中,單擊stdOut.log。
在打開的日志文件中,您可以看到應(yīng)用程序執(zhí)行的相關(guān)信息以及返回的結(jié)果。
相關(guān)文檔
PySpark的開發(fā)流程示例,請(qǐng)參見PySpark開發(fā)快速入門。