可恢復性寫入功能支持將數據以EXACTLY_ONCE語義寫入存儲介質。本文介紹Flink如何通過EMR集群的方式可恢復性寫入OSS-HDFS服務。
前提條件
已創建EMR-3.42.0及以上版本或EMR-5.8.0及以上版本的集群。具體步驟,請參見創建集群。
已開通并授權訪問OSS-HDFS服務。具體操作,請參見開通并授權訪問OSS-HDFS服務。
在Flink作業中的用法
- 通用配置
為了支持EXACTLY_ONCE語義寫入OSS-HDFS,您需要執行如下配置:
- 打開Flink的檢查點(Checkpoint)。示例如下。
- 通過如下方式建立的StreamExecutionEnvironment。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- 執行如下命令,啟動Checkpoint。
env.enableCheckpointing(<userDefinedCheckpointInterval>, CheckpointingMode.EXACTLY_ONCE);
- 通過如下方式建立的StreamExecutionEnvironment。
- 使用可以重發的數據源,例如Kafka。
- 打開Flink的檢查點(Checkpoint)。
- 便捷使用
您無需額外引入依賴,只需攜帶oss://前綴的路徑,并使用OSS-HDFS服務的Bucket及Endpoint,即可啟用Flink。
- 添加Sink。
以將DataStream<String>的對象OutputStream寫入OSS-HDFS為例。
String outputPath = "oss://<user-defined-oss-hdfs-bucket.oss-hdfs-endpoint>/<user-defined-dir>" StreamingFileSink<String> sink = StreamingFileSink.forRowFormat( new Path(outputPath), new SimpleStringEncoder<String>("UTF-8") ).build(); outputStream.addSink(sink);
重要 在OSS-HDFS服務的Bucket中帶有.<oss-hdfs-endpoint>
的字段為可選項。如果您希望省略該字段,請確保已在Flink或Hadoop組件中正確配置了OSS-HDFS服務的Endpoint。 - 使用
env.execute()
執行Flink作業。
- 添加Sink。
(可選)自定義配置
您在提交Flink作業時,可以自定義參數,以開啟或控制特定功能。
例如,通過-yD
配置以yarn-cluster模式提交Flink作業時,示例如下:
<flink_home>/bin/flink run -m yarn-cluster -yD key1=value1 -yD key2=value2 ...
您可以開啟熵注入(Entropy Injection)功能。熵注入可以匹配寫入路徑的一段特定字符串,用一段隨機的字符串進行替換,以削弱所謂片區效應,提高寫入效率。
當寫入場景為OSS-HDFS時,需要完成下列配置。
oss.entropy.key=<user-defined-key>
oss.entropy.length=<user-defined-length>
寫入新文件時,路徑中與<user-defined-key>
相同的字符串會被替換為一個隨機字符串,隨機串的長度為<user-defined-length>
,且<user-defined-length>
必須大于零。
文檔內容是否對您有幫助?