日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

通過Serverless Spark提交PySpark流任務(wù)

更新時(shí)間:

在大數(shù)據(jù)快速發(fā)展的時(shí)代,流式處理技術(shù)對(duì)于實(shí)時(shí)數(shù)據(jù)分析至關(guān)重要。EMR Serverless Spark提供了一個(gè)強(qiáng)大而可擴(kuò)展的平臺(tái),它不僅簡(jiǎn)化了實(shí)時(shí)數(shù)據(jù)處理流程,還免去了服務(wù)器管理的煩惱,提升了效率。本文將指導(dǎo)您使用EMR Serverless Spark提交PySpark流式任務(wù),展示其在流處理方面的易用性和可運(yùn)維性。

前提條件

已創(chuàng)建工作空間,詳情請(qǐng)參見創(chuàng)建工作空間

操作流程

步驟一:創(chuàng)建實(shí)時(shí)數(shù)據(jù)流集群并產(chǎn)生消息

  1. 在EMR on ECS頁(yè)面,創(chuàng)建包含Kafka服務(wù)的實(shí)時(shí)數(shù)據(jù)流集群,詳情請(qǐng)參見創(chuàng)建集群

  2. 登錄EMR集群的Master節(jié)點(diǎn),詳情請(qǐng)參見登錄集群

  3. 執(zhí)行以下命令,切換目錄。

    cd /var/log/emr/taihao_exporter
  4. 執(zhí)行以下命令,創(chuàng)建Topic。

    # 創(chuàng)建名為taihaometrics的Topic,分區(qū)數(shù)10,副本因子2。
    kafka-topics.sh --partitions 10 --replication-factor 2 --bootstrap-server core-1-1:9092 --topic taihaometrics --create
  5. 執(zhí)行以下命令,發(fā)送消息。

    # 使用kafka-console-producer發(fā)送消息到taihaometrics Topic。
    tail -f metrics.log | kafka-console-producer.sh --broker-list core-1-1:9092 --topic taihaometrics

步驟二:新增網(wǎng)絡(luò)連接

  1. 進(jìn)入網(wǎng)絡(luò)連接頁(yè)面。

    1. 在EMR控制臺(tái)的左側(cè)導(dǎo)航欄,選擇EMR Serverless > Spark

    2. Spark頁(yè)面,單擊目標(biāo)工作空間名稱。

    3. EMR Serverless Spark頁(yè)面,單擊左側(cè)導(dǎo)航欄中的網(wǎng)絡(luò)連接

  2. 網(wǎng)絡(luò)連接頁(yè)面,單擊新增網(wǎng)絡(luò)連接

  3. 新增網(wǎng)絡(luò)連接對(duì)話框中,配置以下信息,單擊確定

    參數(shù)

    說明

    連接名稱

    輸入新增連接的名稱。例如,connection_to_emr_kafka。

    專有網(wǎng)絡(luò)

    選擇與EMR集群相同的專有網(wǎng)絡(luò)。

    如果當(dāng)前沒有可選擇的專有網(wǎng)絡(luò),請(qǐng)單擊創(chuàng)建專有網(wǎng)絡(luò),前往專有網(wǎng)絡(luò)控制臺(tái)創(chuàng)建,詳情請(qǐng)參見創(chuàng)建和管理專有網(wǎng)絡(luò)

    交換機(jī)

    選擇與EMR集群部署在同一專有網(wǎng)絡(luò)下的相同交換機(jī)。

    如果當(dāng)前可用區(qū)沒有交換機(jī),請(qǐng)單擊虛擬交換機(jī),前往專有網(wǎng)絡(luò)控制臺(tái)創(chuàng)建,詳情請(qǐng)參見創(chuàng)建和管理交換機(jī)

    當(dāng)狀態(tài)顯示為已成功時(shí),表示新增網(wǎng)絡(luò)連接成功。

步驟三:為EMR集群添加安全組規(guī)則

  1. 獲取集群節(jié)點(diǎn)交換機(jī)的網(wǎng)段。

    您可以在節(jié)點(diǎn)管理頁(yè)面,單擊節(jié)點(diǎn)組名稱,查看關(guān)聯(lián)的交換機(jī)信息,然后登錄專有網(wǎng)絡(luò)管理控制臺(tái),在交換機(jī)頁(yè)面獲取交換機(jī)的網(wǎng)段。

    image

  2. 添加安全組規(guī)則。

    1. 集群管理頁(yè)面,單擊目標(biāo)集群的集群ID。

    2. 基礎(chǔ)信息頁(yè)面,單擊集群安全組后面的鏈接。

    3. 在安全組規(guī)則頁(yè)面,單擊手動(dòng)添加,填寫端口范圍和授權(quán)對(duì)象,然后單擊保存

      參數(shù)

      說明

      端口范圍

      填寫9092端口。

      授權(quán)對(duì)象

      填寫前一步驟中獲取的指定交換機(jī)的網(wǎng)段。

      重要

      為防止被外部的用戶攻擊導(dǎo)致安全問題,授權(quán)對(duì)象禁止填寫為0.0.0.0/0。

步驟四:上傳JAR包至OSS

解壓文件kafka.zip,并將文件中的所有JAR包上傳至OSS,上傳操作請(qǐng)參見簡(jiǎn)單上傳

步驟五:上傳資源文件

  1. 在EMR Serverless Spark頁(yè)面,單擊左側(cè)導(dǎo)航欄中的文件管理

  2. 文件管理頁(yè)面,單擊上傳文件

  3. 上傳文件對(duì)話框中,單擊待上傳文件區(qū)域選擇pyspark_ss_demo.py文件。

步驟六:新建并啟動(dòng)流任務(wù)

  1. 在EMR Serverless Spark頁(yè)面,單擊左側(cè)的數(shù)據(jù)開發(fā)

  2. 單擊新建

  3. 輸入名稱,任務(wù)類型選擇Application(流任務(wù)) > PySpark,然后單擊確定

  4. 在新建的開發(fā)中,配置以下信息,其余參數(shù)無需配置,然后單擊保存

    參數(shù)

    說明

    主Python資源

    選擇前一個(gè)步驟中在資源上傳頁(yè)面上傳的pyspark_ss_demo.py文件。

    引擎版本

    Spark的版本,詳情請(qǐng)參見引擎版本介紹

    運(yùn)行參數(shù)

    EMR集群core-1-1節(jié)點(diǎn)的內(nèi)網(wǎng)IP地址。您可以在EMR集群的節(jié)點(diǎn)管理頁(yè)面的Core節(jié)點(diǎn)組下查看。

    Spark配置

    Spark的配置信息。本文示例如下。

    spark.jars oss://path/to/commons-pool2-2.11.1.jar,oss://path/to/kafka-clients-2.8.1.jar,oss://path/to/spark-sql-kafka-0-10_2.12-3.3.1.jar,oss://path/to/spark-token-provider-kafka-0-10_2.12-3.3.1.jar
    spark.emr.serverless.network.service.name connection_to_emr_kafka
    說明
    • spark.jars:指定Spark任務(wù)運(yùn)行時(shí)需要加載的外部JAR包路徑。請(qǐng)根據(jù)實(shí)際情況替換為步驟四中上傳的所有JAR包文件路徑。

    • spark.emr.serverless.network.service.name:指定網(wǎng)絡(luò)連接的名稱。請(qǐng)根據(jù)實(shí)際情況替換為步驟二中創(chuàng)建的網(wǎng)絡(luò)連接名稱。

  5. 單擊發(fā)布

  6. 發(fā)布任務(wù)對(duì)話框中,單擊確定

  7. 啟動(dòng)流任務(wù)。

    1. 單擊前往運(yùn)維

    2. 單擊啟動(dòng)

步驟七:查看日志

  1. 單擊日志探查頁(yè)簽。

  2. Driver日志列表中,單擊stdOut.log

    在打開的日志文件中,您可以看到應(yīng)用程序執(zhí)行的相關(guān)信息以及返回的結(jié)果。

    image

相關(guān)文檔

PySpark的開發(fā)流程示例,請(qǐng)參見PySpark開發(fā)快速入門