本文帶您快速體驗Flink Python流作業和批作業的部署和啟動,以了解實時計算Flink版Python作業的操作流程。
前提條件
如果您使用RAM用戶或RAM角色等身份訪問,需要確認已具有Flink控制臺相關權限,詳情請參見權限管理。
已創建Flink工作空間,詳情請參見開通實時計算Flink版。
步驟一:準備Python代碼文件
實時計算管理控制臺不提供Python的開發環境,您需要在本地完成作業開發。有關作業調試和連接器的使用,詳情請參見Python作業開發。
本地開發依賴的Flink版本需確保與后續步驟三:部署Python作業選擇的引擎版本保持一致,在Python作業中使用其他依賴(自定義的Python虛擬環境、第三方Python包、JAR包和數據文件等)的方法請參見使用Python依賴。
為了幫助您快速熟悉Flink Python作業操作,本文已為您提供統計單詞出現頻率的測試Python文件和數據文本,您可以直接下載待后續步驟使用。
根據需要下載Python測試作業。
單擊Shakespeare,下載數據文本Shakespeare。
步驟二:上傳Python文件和數據文件
步驟三:部署Python作業
流作業
在 界面,單擊 。
填寫部署信息。
參數
說明
示例
部署模式
請選擇部署為流模式。
流模式
部署名稱
填寫對應的Python作業名稱。
flink-streaming-test-python
引擎版本
當前作業使用的Flink引擎版本。
vvr-8.0.9-flink-1.17
Python文件地址
單擊word_count_streaming.py下載測試Python文件后,再單擊右側圖標選擇文件,上傳Python文件。
-
Entry Module
程序的入口類。
如果Python作業文件為.py文件,則該項不需要填寫。
如果Python作業文件為.zip文件,則需要在此處輸入您的Entry Module,例如word_count。
無需填寫
Entry Point Main Arguments
填寫傳入參數信息,在主方法里面調用該參數。
本文填寫輸入數據文件Shakespeare的存放路徑。
部署目標
在下拉列表中,選擇目標資源隊列或者Session集群(請勿生產使用)。詳情請參見管理資源隊列和創建Session集群。
重要部署到Session集群的作業不支持顯示監控告警(或數據曲線)、配置監控告警和開啟自動調優功能。請勿將Session集群用于正式生產環境,Session集群可以作為開發測試環境。詳情請參見作業調試。
default-queue
更多配置參數詳情請參見部署作業。
單擊部署。
批作業
在 界面,單擊部署作業,選擇Python作業。
填寫部署信息。
參數
說明
示例
部署模式
請選擇部署為批模式。
批模式
部署名稱
填寫對應的Python作業名稱。
flink-batch-test-python
引擎版本
當前作業使用的Flink引擎版本。
vvr-8.0.9-flink-1.17
Python文件地址
單擊word_count_batch.py下載測試Python文件后,再單擊右側圖標選擇文件,上傳Python文件。
-
Entry Module
程序的入口類。
如果Python作業文件為.py文件,則該項不需要填寫。
如果Python作業文件為.zip文件,則需要在此處輸入您的Entry Module,例如word_count。
無需填寫
Entry Point Main Arguments
填寫傳入參數信息,在主方法里面調用該參數。
本文填寫輸入數據文件Shakespeare和輸出數據目錄batch-quickstart-test-output的存放路徑。
說明您只需指定輸出目錄的路徑名稱,無需提前在存儲服務中創建輸出目錄,輸出目錄的父目錄路徑與輸入文件保持一致即可。
存儲類型為OSS Bucket:
--input oss://<您綁定的OSS Bucket名稱>/artifacts/namespaces/<項目空間名稱>/Shakespeare
--output oss://<您綁定的OSS Bucket名稱>/artifacts/namespaces/<項目空間名稱>/python-batch-quickstart-test-output
您可以直接在文件管理中復制Shakespeare文件的完整路徑。
存儲類型為全托管存儲:
--input oss://flink-fullymanaged-<工作空間ID>/artifacts/namespaces/<項目空間名稱>/Shakespeare
--output oss://flink-fullymanaged-<工作空間ID>/artifacts/namespaces/<項目空間名稱>/python-batch-quickstart-test-output
部署目標
在下拉列表中,選擇目標資源隊列或者Session集群(請勿生產使用)。詳情請參見管理資源隊列和創建Session集群。
重要部署到Session集群的作業不支持顯示監控告警、配置監控告警和開啟自動調優功能。請勿將Session集群用于正式生產環境,Session集群可以作為開發測試環境。詳情請參見作業調試。
default-queue
更多配置參數詳情請參見部署作業。
單擊部署。
步驟四:啟動Python作業并查看Flink計算結果
流作業
在
頁面,單擊目標作業名稱操作列中的啟動。選擇無狀態啟動,單擊啟動,作業啟動詳情請參見作業啟動。
單擊啟動后,作業狀態變為運行中或已完成,則代表作業運行正常。如果您部署本文檔Python測試文件,作業最終運行狀態是已完成狀態。
作業狀態變為運行中后,查看流作業示例的計算結果。
重要如果您部署的是本文Python測試文件,流作業變為已完成狀態時會刪除作業結果,故流作業狀態為運行中才能看到計算結果。
在TaskManager中以.out結尾的日志文件中,搜索shakespeare查看Flink計算結果。
批作業
在
頁面,單擊目標作業中的啟動。在作業啟動對話框中,單擊啟動,作業啟動詳情請參見作業啟動。
作業狀態變為已完成后,查看批作業示例的計算結果。
存儲類型為OSS Bucket:登錄OSS管理控制臺,在oss://<您綁定的OSS Bucket名稱>/artifacts/namespaces/<項目空間名稱>/python-batch-quickstart-test-output目錄,單擊名稱是作業的啟動日期和時間的文件夾,然后單擊目標文件名,在彈出的面板上單擊下載。
存儲類型為全托管存儲:您可以在文件管理頁面的資源文件頁簽,單擊python-batch-quickstart-test-output.txt/yyyy-MM-dd--n/prefix-****.ext格式的文件操作列的下載,在本地進行查看。
批作業結果為ext文件,下載后可以用記事本或者Microsoft Office Word打開查看結果,計算結果如下圖所示。
(可選)步驟五:停止作業
如果您對作業進行了修改(例如更改代碼、增刪改WITH參數、更改作業版本等),且希望修改生效,則需要重新部署作業,然后停止再啟動。另外,如果作業無法復用State,希望作業全新啟動時,或者更新非動態生效的參數配置時,也需要停止后再啟動作業。作業停止詳情請參見作業停止。
相關文檔
您可以在作業啟動前配置作業資源或者作業上線后修改作業資源,支持基礎模式(粗粒度)和專家模式(細粒度)兩種資源模式,詳情請參見配置作業資源。
支持動態更新Flink作業參數,可以實現更快的參數配置生效,減少作業啟停對業務的中斷時間,詳情請參見動態擴縮容與參數動態更新。
配置作業日志級別以及配置不同級別日志分別輸出,詳情請參見配置作業日志輸出。
SQL作業完整的開發流程示例,請參見Flink SQL作業快速入門。