本文匯總了Spark使用時的常見問題。
Spark Core
Spark SQL
PySpark
Spark Streaming
在哪里可以查看Spark歷史作業?
您可以在EMR控制臺目標集群的訪問鏈接與端口頁簽,單擊Spark UI鏈接,即查看到Spark歷史作業運行信息。訪問UI詳情,請參見訪問鏈接與端口。
是否支持Standalone模式提交Spark作業?
不支持。E-MapReduce支持使用Spark on YARN以及Spark on Kubernetes模式提交作業,不支持Standalone和Mesos模式。
如何減少Spark2命令行工具的日志輸出?
EMR DataLake集群選擇Spark2服務后,使用spark-sql和spark-shell等命令行工具時默認輸出INFO級別日志,如果想減少日志輸出,可以修改log4j日志級別。具體操作如下:
在運行命令行工具的節點(例如,master節點)新建一個log4j.properties配置文件,也可以從默認配置文件復制,復制命令如下所示。
cp /etc/emr/spark-conf/log4j.properties /new/path/to/log4j.properties
修改新配置文件的日志級別。
log4j.rootCategory=WARN, console
修改Spark服務spark-defaults.conf配置文件中的配置項spark.driver.extraJavaOptions,將參數值中的-Dlog4j.configuration=file:/etc/emr/spark-conf/log4j.properties替換為-Dlog4j.configuration=file:/new/path/to/log4j.properties。
重要路徑需要添加file:前綴。
如何使用Spark3的小文件合并功能?
您可以通過設置參數spark.sql.adaptive.merge.output.small.files.enabled為true,來自動合并小文件。由于合并后的文件會壓縮,如果您覺得合并后的文件太小,可以適當調大參數spark.sql.adaptive.advisoryOutputFileSizeInBytes的值,默認值為256 MB。
如何處理SparkSQL數據傾斜?
針對Spark2,處理方式如下:
讀取表時過濾無關數據,例如null。
廣播小表(Broadcast)。
select /*+ BROADCAST (table1) */ * from table1 join table2 on table1.id = table2.id
根據傾斜key,分離傾斜數據。
select * from table1_1 join table2 on table1_1.id = table2.id union all select /*+ BROADCAST (table1_2) */ * from table1_2 join table2 on table1_2.id = table2.id
傾斜key已知時,打散數據。
select id, value, concat(id, (rand() * 10000) % 3) as new_id from A select id, value, concat(id, suffix) as new_id from ( select id, value, suffix from B Lateral View explode(array(0, 1, 2)) tmp as suffix)
傾斜key未知時,打散數據。
select t1.id, t1.id_rand, t2.name from ( select id , case when id = null then concat(‘SkewData_’, cast(rand() as string)) else id end as id_rand from test1 where statis_date = ‘20221130’) t1 left join test2 t2 on t1.id_rand = t2.id
針對Spark3,可以在EMR控制臺Spark3服務的配置頁簽,修改spark.sql.adaptive.enabled和spark.sql.adaptive.skewJoin.enabled的參數值為true。
如何指定PySpark使用Python 3版本?
下面內容以可選服務為Spark2,EMR-5.7.0版本的DataLake集群為例,介紹如何指定PySpark使用Python 3版本。
您可以通過以下兩種方式修改Python的版本:
臨時生效方式
通過SSH方式登錄集群,詳情請參見登錄集群。
執行以下命令,修改Python的版本。
export PYSPARK_PYTHON=/usr/bin/python3
執行以下命令,查看Python的版本。
pyspark
當返回信息中包含如下信息時,表示已修改Python版本為Python 3。
Using Python version 3.6.8
永久生效方式
通過SSH方式登錄集群,詳情請參見登錄集群。
修改配置文件。
執行以下命令,打開文件profile。
vi /etc/profile
按下
i
鍵進入編輯模式。在profile文件末尾添加以下信息,以修改Python的版本。
export PYSPARK_PYTHON=/usr/bin/python3
按下
Esc
鍵退出編輯模式,輸入:wq
保存并關閉文件。
執行以下命令,重新執行剛修改的配置文件,使之立即生效。
source /etc/profile
執行以下命令,查看Python的版本。
pyspark
當返回信息中包含如下信息時,表示已修改Python版本為Python 3。
Using Python version 3.6.8
為什么Spark Streaming作業運行一段時間后無故結束?
首先檢查Spark版本是否是1.6之前版本,如果是的話更新Spark版本。
Spark 1.6之前版本存在內存泄漏的問題,會導致Container被中止掉。
檢查自己的代碼在內存使用上有沒有做好優化。
為什么Spark Streaming作業已經結束,但是E-MapReduce控制臺顯示作業狀態還處于“運行中”?
檢查作業提交方式是否為Yarn-Client模式,因為E-MapReduce對Yarn-Client模式的Spark Streaming作業的狀態監控存在問題,所以請修改為Yarn-Cluster模式。