阿里云實時計算Flink支持通過連接器讀寫OSS以及OSS-HDFS數據。通過配置OSS或者OSS-HDFS連接器的輸入屬性,實時計算Flink會自動從指定的路徑讀取數據,并將其作為實時計算Flink的輸入流,然后將計算結果按照指定格式寫入到OSS或者OSS-HDFS的指定路徑。
前提條件
已開通Flink全托管。具體操作,請參見開通實時計算Flink版。
開通Flink全托管后,Flink全托管頁簽將在5~10分鐘內顯示已創建完成的工作空間。
已創建SQL作業。
創建SQL作業時,Flink計算引擎需選擇VVR 8.0.1及以上版本。具體操作,請參見新建作業。
使用限制
僅支持讀寫相同賬號下的OSS或者OSS-HDFS服務的數據。
對于寫入OSS的場景,暫不支持寫Avro、CSV、JSON和Raw此類行存的格式,具體原因請參見FLINK-30635。
操作步驟
進入SQL作業創建頁面。
登錄實時計算控制臺。
單擊目標工作空間操作列下的控制臺。
在左側導航欄,單擊SQL開發。
在SQL作業編輯區域,編寫DDL和DML代碼。
將srcbucket中的dir路徑下的源表數據寫入destbucket的test路徑下的結果表。
說明如果您希望通過以下代碼讀取OSS-HDFS的數據,請確保srcbucket以及destbucket已開通OSS-HDFS服務。
CREATE TEMPORARY TABLE source_table ( `file.name` STRING NOT NULL, `file.path` STRING NOT NULL METADATA ) WITH ( 'connector'='filesystem', 'path'='oss://srcbucket/dir/', 'format'='parquet' ); CREATE TEMPORARY TABLE target_table( `name` STRING, `path` STRING ) with ( 'connector'='filesystem', 'path'='oss://destbucket/test/', 'format'='parquet' ); INSERT INTO target_table SELECT * FROM source_table ;
關于源表支持的元數據列(例如file.path、file.name等)以及WITH參數的具體用法,請參見對象存儲OSS連接器。
單擊保存。
單擊深度檢查。
深度檢查能夠檢查作業的SQL語義、網絡連通性以及作業使用的表的元數據信息。同時,您可以單擊結果區域的SQL優化,展開查看SQL風險問題提示以及對應的SQL優化建議。
單擊部署。
完成作業開發和深度檢查后,即可部署作業,將數據發布至生產環境。
(可選)僅當您需要讀取OSS-HDFS服務的數據時,執行此步驟。
單擊作業,在部署詳情頁簽下的運行參數配置區域,按以下說明配置OSS-HDFS服務訪問密鑰以及Endpoint等信息,然后單擊保存。
fs.oss.jindo.buckets: srcbucket;destbucket fs.oss.jindo.accessKeyId: LTAI******** fs.oss.jindo.accessKeySecret: KZo1******** fs.oss.jindo.endpoint: cn-hangzhou.oss-dls.aliyuncs.com
各配置項說明如下:
配置項
說明
fs.oss.jindo.buckets
填寫待讀取源表數據所在的Bucket名稱以及待寫入結果表數據所在的Bucket名稱。Bucket名稱之間以分號分隔,例如
srcbucket;destbucket
。fs.oss.jindo.accessKeyId
阿里云賬號或者RAM用戶的AccessKey ID。獲取方法請參見查看RAM用戶的AccessKey信息。
fs.oss.jindo.accessKeySecret
阿里云賬號或者RAM用戶的AccessKey Secret。獲取方法請參見查看RAM用戶的AccessKey信息。
fs.oss.jindo.endpoint
OSS-HDFS服務的Endpoint,例如cn-hangzhou.oss-dls.aliyuncs.com。
在作業運維頁面,單擊啟動,等待作業進入運行中狀態。
通過指定的OSS或者OSS-HDFS結果表存儲路徑path查看寫入的數據。
寫入OSS時,您可以通過OSS控制臺文件列表下的OSS頁簽查看寫入的數據。寫入OSS-HDFS時,您可以通過OSS控制臺文件列表下的HDFS頁簽查看寫入的數據。