您可以在Flink Python作業中使用自定義的Python虛擬環境、第三方Python包、JAR包和數據文件等,本文為您介紹如何在Python作業中使用這些依賴。
背景信息
本文通過以下場景為您介紹如何使用Python依賴:
使用自定義的Python虛擬環境
VVR 4.x僅支持3.7版本的Python虛擬環境,VVR 6.x及以上的版本無此限制,您可以使用更高版本的Python虛擬環境。
Python支持構建虛擬環境,每個Python虛擬環境都有一套完整的Python運行環境,并且可以在這套虛擬環境中安裝一系列的Python依賴包。關于Python虛擬環境更詳細的介紹,請參見Python文檔創建虛擬環境。下文為您介紹如何準備Python的虛擬環境。
準備Python的虛擬環境。
在本地準備setup-pyflink-virtual-env.sh腳本,其內容如下。
miniconda.sh腳本信息:修改為您的目標版本地址信息。
apache-flink:修改為您作業使用的VVR版本對應的Flink版本,Flink版本查看方法請參見如何查看當前作業的Flink版本?。
在本地準備build.sh腳本,其內容如下。
#!/bin/bash set -e -x sed -i 's/mirrorlist/#mirrorlist/g' /etc/yum.repos.d/CentOS-* sed -i 's|#baseurl=http://mirror.centos.org|baseurl=http://vault.centos.org|g' /etc/yum.repos.d/CentOS-* yum install -y zip wget cd /root/ bash /build/setup-pyflink-virtual-env.sh mv venv.zip /build/
在命令行,執行如下命令,完成python虛擬環境的安裝。
docker run -it --rm -v $PWD:/build -w /build quay.io/pypa/manylinux2014_x86_64 ./build.sh
執行完該命令后,會生成一個名字為venv.zip的文件,本示例為Python 3.10的虛擬環境。
您也可以修改上述腳本,在虛擬環境中安裝所需的第三方Python包。
set -e # 下載Python 3.10 miniconda.sh腳本。 wget "https://repo.continuum.io/miniconda/Miniconda3-py310_24.7.1-0-Linux-x86_64.sh" -O "miniconda.sh" # 為Python 3.10 miniconda.sh腳本添加執行權限。 chmod +x miniconda.sh # 創建Python的虛擬環境。 ./miniconda.sh -b -p venv # 激活Conda Python虛擬環境。 source venv/bin/activate "" # 安裝PyFlink依賴。 # update the PyFlink version if needed pip install "apache-flink==1.17.0" # 關閉Conda Python虛擬環境。 conda deactivate # 刪除緩存的包。 rm -rf venv/pkgs # 將準備好的Conda Python虛擬環境打包。 zip -r venv.zip venv
說明本文以作業為VVR 8.x,Python 3.10為例為您介紹,如果您需要使用其他VVR版本或安裝其他版本的Python的虛擬環境,則需要修改以下兩個參數:
在Python作業中使用Python虛擬環境。
登錄實時計算控制臺。
單擊目標工作空間操作列下的控制臺。
在左側導航欄,單擊文件管理,上傳venv.zip文件。
在
頁面,單擊目標作業名稱。在部署詳情頁簽基礎配置區域Python Archives,選擇venv.zip文件。
如果SQL作業在虛擬環境中使用Python UDF,需要在運行參數配置區域的其他配置項,添加如下配置信息。
python.archives: oss://.../venv.zip
在運行參數配置區域其他配置項,按照您作業的VVR版本添加對應的指定Python虛擬環境的安裝路徑的配置信息。
vvr-6.x及以上版本
python.executable: venv.zip/venv/bin/python python.client.executable: venv.zip/venv/bin/python
vvr-6.x以下版本
python.executable: venv.zip/venv/bin/python
使用第三方Python包
下面將從以下兩個場景為您介紹如何使用第三方Python包:
使用可直接Import的第三方Python包
如果您的第三方Python包是Zip Safe的,即不需要安裝即可直接在Python作業中使用。操作步驟如下:
使用需要編譯的第三方Python包
如果您的第三方Python包是tar.gz格式的壓縮包,或從其他地方下載的源碼包,且壓縮包的根目錄下存在setup.py文件,則這種類型的第三方Python包通常需要先編譯才能被使用。您需要先在與Flink兼容的環境下編譯第三方Python包,然后才可在Python作業中調用第三方Python包。
推薦使用quay.io/pypa/manylinux2014_x86_64鏡像容器中的Python 3.7來編譯第三方Python包,使用該容器編譯生成的包兼容絕大多數Linux環境,關于該鏡像容器的更多信息請參見manylinux。
說明Python 3.7的安裝路徑為 /opt/python/cp37-cp37m/bin/python3。
下面以opencv-python-headless第三方Python包為例,介紹一下如何編譯和使用該第三方Python包。
編譯第三方Python包。
在本地準備requirements.txt文件,其內容如下。
opencv-python-headless
在本地準備build.sh腳本,其內容如下。
#!/bin/bash set -e -x sed -i 's/mirrorlist/#mirrorlist/g' /etc/yum.repos.d/CentOS-* sed -i 's|#baseurl=http://mirror.centos.org|baseurl=http://vault.centos.org|g' /etc/yum.repos.d/CentOS-* yum install -y zip PYBIN=/opt/python/cp37-cp37m/bin #PYBIN=/opt/python/cp38-cp38/bin #PYBIN=/opt/python/cp39-cp39/bin #PYBIN=/opt/python/cp310-cp310/bin "${PYBIN}/pip" install --target __pypackages__ -r requirements.txt cd __pypackages__ && zip -r deps.zip . && mv deps.zip ../ && cd .. rm -rf __pypackages__
在CMD命令行,執行如下命令。
docker run -it --rm -v $PWD:/build -w /build quay.io/pypa/manylinux2014_x86_64 /bin/bash build.sh
該命令執行完后,會生成一個名字為deps.zip的文件,該文件為編譯之后的第三方Python包。
您也可以修改requirements.txt,安裝其他所需的第三方Python包。此外,requirements.txt文件中可以指定多個Python依賴。
在Python作業中使用第三方Python包deps.zip。
登錄實時計算控制臺。
單擊目標工作空間操作列下的控制臺。
在左側導航欄,單擊文件管理,上傳deps.zip。
在deps.zip。
頁面單擊目標作業名稱,在部署詳情頁簽基礎配置區域,單擊編輯后,在Python Libraries項,選擇
單擊保存。
使用JAR包
如果您的Flink Python作業中使用了Java類,例如作業中使用了Connector或者Java自定義函數時,可以通過如下方式來指定Connector或者Java自定義函數的JAR包。
登錄實時計算控制臺。
單擊目標工作空間操作列下的控制臺。
在左側導航欄,單擊文件管理,上傳需要使用的JAR包。
在附加依賴文件項,選擇需要使用的JAR包。 頁面單擊目標作業名稱,在部署詳情頁簽基礎配置區域,單擊編輯后,在
在運行參數配置區域其他配置項,添加配置信息。
假如需要依賴多個JAR包,且名字分別為jar1.jar和jar2.jar,配置內容如下。
pipeline.classpaths: 'file:///flink/usrlib/jar1.jar;file:///flink/usrlib/jar2.jar'
單擊保存。
使用數據文件
Flink暫不支持通過上傳數據文件的方式來進行python作業調試。
下面將從兩個場景為您介紹如何使用數據文件:
通過Python Archives選項方式
如果您的數據文件的數量比較多時,您可以將數據文件打包成一個ZIP包,然后通過如下方式在Python作業中使用。操作步驟如下:
登錄實時計算控制臺。
單擊目標工作空間操作列下的控制臺。
在左側導航欄,單擊文件管理,上傳目標數據文件ZIP包。
在在 頁面單擊目標作業名稱,在部署詳情頁簽基礎配置區域,單擊編輯后,Python Archives項,選擇需要使用的數據文件ZIP包。
在Python自定義函數中,可以通過如下方式訪問數據文件。假如數據文件所在壓縮包名稱為mydata.zip。
def map(): with open("mydata.zip/mydata/data.txt") as f: ...
通過附加依賴文件選項
如果您的數據文件數量比較少時,可以通過如下方式在Python作業中使用。操作步驟如下:
登錄實時計算控制臺。
單擊目標工作空間操作列下的控制臺。
在左側導航欄,單擊文件管理,上傳目標數據文件。
在,在 頁面單擊目標作業名稱,在部署詳情頁簽基礎配置區域,單擊編輯后,附加依賴文件項,選擇需要的數據文件。
在Python自定義函數中,可以通過如下方式訪問數據文件。以數據文件名稱為data.txt為例的代碼如下。
def map(): with open("/flink/usrlib/data.txt") as f: ...
相關文檔
Python API作業開發的方法請參見Python作業開發。
Flink Python作業的完整開發流程示例,請參見Flink Python作業快速入門。