日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

使用Flink將Kafka數(shù)據(jù)流式寫入阿里云OSS

更新時(shí)間:

將Kafka數(shù)據(jù)實(shí)時(shí)導(dǎo)入到OSS等湖存儲(chǔ)中來(lái)降低存儲(chǔ)成本或者進(jìn)行查詢分析是常見的使用場(chǎng)景。在EMR-3.37.1及之后的版本中,DataFlow集群內(nèi)置了JindoFS相關(guān)的依賴,使得您可以在DataFlow集群中運(yùn)行Flink作業(yè),將Kafka數(shù)據(jù)以Exactly-Once語(yǔ)義流式寫入阿里云OSS。本文通過(guò)示例為您介紹如何在DataFlow集群中編寫并運(yùn)行Flink作業(yè)來(lái)滿足上述場(chǎng)景。

背景信息

關(guān)于JindoFS的部分高級(jí)配置(例如,熵注入),請(qǐng)參見支持Flink可恢復(fù)性寫入JindoFS或OSS

前提條件

  • 已開通E-MapReduce服務(wù)和OSS服務(wù)。
  • 已完成云賬號(hào)的授權(quán),詳情請(qǐng)參見角色授權(quán)

操作流程

  1. 步驟一:準(zhǔn)備環(huán)境

  2. 步驟二:準(zhǔn)備JAR包

  3. 步驟三:創(chuàng)建Kafka Topic并生成數(shù)據(jù)

  4. 步驟四:運(yùn)行Flink作業(yè)

  5. 步驟五:查看輸出的結(jié)果

步驟一:準(zhǔn)備環(huán)境

  1. 創(chuàng)建包含F(xiàn)link和Kafka組件的DataFlow集群,詳情請(qǐng)參見創(chuàng)建集群

    說(shuō)明

    本文以EMR-3.43.1版本為例。

  2. 在OSS上創(chuàng)建與DataFlow集群相同地域的Bucket,詳情請(qǐng)參見控制臺(tái)創(chuàng)建存儲(chǔ)空間

步驟二:準(zhǔn)備JAR包

  1. 下載Demo代碼

    基于JindoFS,您可以在Flink作業(yè)中,如同HDFS一樣將數(shù)據(jù)以流式的方式寫入OSS中(路徑需要以oss://為前綴)。本示例中使用了Flink的StreamingFileSink方法來(lái)演示開啟了檢查點(diǎn)(Checkpoint)之后,F(xiàn)link如何以Exactly-Once語(yǔ)義寫入OSS。

    下述代碼片段演示了如何構(gòu)建Kafka Source與OSS Sink,完整代碼您可以從GitHub鏈接中下載獲得。

    重要

    JindoFS支持免密讀寫相同阿里云賬號(hào)下的OSS存儲(chǔ),因此作業(yè)中無(wú)需聲明相關(guān)AccessKey信息。

    public class OssDemoJob {
    
        public static void main(String[] args) throws Exception {
            ...
    
            // Check output oss dir
            Preconditions.checkArgument(
                    params.get(OUTPUT_OSS_DIR).startsWith("oss://"),
                    "outputOssDir should start with 'oss://'.");
    
            // Set up the streaming execution environment
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // Checkpoint is required
            env.enableCheckpointing(30000, CheckpointingMode.EXACTLY_ONCE);
    
            String outputPath = params.get(OUTPUT_OSS_DIR);
    
            // Build Kafka source with new Source API based on FLIP-27
            KafkaSource<Event> kafkaSource =
                    KafkaSource.<Event>builder()
                            .setBootstrapServers(params.get(KAFKA_BROKERS_ARG))
                            .setTopics(params.get(INPUT_TOPIC_ARG))
                            .setStartingOffsets(OffsetsInitializer.latest())
                            .setGroupId(params.get(INPUT_TOPIC_GROUP_ARG))
                            .setDeserializer(new EventDeSerializationSchema())
                            .build();
            // DataStream Source
            DataStreamSource<Event> source =
                    env.fromSource(
                            kafkaSource,
                            WatermarkStrategy.<Event>forMonotonousTimestamps()
                                    .withTimestampAssigner((event, ts) -> event.getEventTime()),
                            "Kafka Source");
    
            StreamingFileSink<Event> sink =
                    StreamingFileSink.forRowFormat(
                                    new Path(outputPath), new SimpleStringEncoder<Event>("UTF-8"))
                            .withRollingPolicy(OnCheckpointRollingPolicy.build())
                            .build();
            source.addSink(sink);
    
            // Compile and submit the job
            env.execute();
        }
    }
    說(shuō)明

    本示例代碼片段給出了主要的示例程序,您可以根據(jù)自身環(huán)境進(jìn)行修改(例如,添加包名以及修改代碼中的Checkpoint間隔)后,進(jìn)行編譯。關(guān)于如何構(gòu)建Flink作業(yè)的JAR包,可以參見Flink官方文檔。如果無(wú)需任何修改,您可以直接使用 dataflow-oss-demo-1.0-SNAPSHOT.jar 包進(jìn)行操作。

  2. 在命令行中,進(jìn)入到下載的項(xiàng)目文件的根目錄下,執(zhí)行以下命令打包文件。

    mvn clean package

    根據(jù)您pom.xml文件中artifactId的信息,項(xiàng)目對(duì)應(yīng)目錄dataflow-demo/dataflow-oss-demo/target下會(huì)出現(xiàn)dataflow-oss-demo-1.0-SNAPSHOT.jar包。

步驟三:創(chuàng)建Kafka Topic并生成數(shù)據(jù)

  1. 通過(guò)SSH方式連接DataFlow集群,詳情請(qǐng)參見登錄集群

  2. 執(zhí)行以下命令,創(chuàng)建測(cè)試所需的Topic。

    kafka-topics.sh --create  --bootstrap-server core-1-1:9092 \
        --replication-factor 2  \
        --partitions 3  \
        --topic kafka-test-topic

    創(chuàng)建成功后,命令行會(huì)打印如下信息。

    Created topic kafka-test-topic.
  3. 寫入數(shù)據(jù)至Kafka Topic。

    1. 在命令行中執(zhí)行以下命令,進(jìn)入Kafka Producer Console。

      kafka-console-producer.sh --broker-list core-1-1:9092 --topic  kafka-test-topic
    2. 輸入五條測(cè)試數(shù)據(jù)。

      1,Ken,0,1,1662022777000
      1,Ken,0,2,1662022777000
      1,Ken,0,3,1662022777000
      1,Ken,0,4,1662022777000
      1,Ken,0,5,1662022777000
    3. 按下Ctrl+C退出Kafka Producer Console。

步驟四:運(yùn)行Flink作業(yè)

  1. 通過(guò)SSH方式連接DataFlow集群,詳情請(qǐng)參見登錄集群

  2. 上傳打包好的dataflow-oss-demo-1.0-SNAPSHOT.jar至DataFlow集群的根目錄下。

    說(shuō)明

    本文示例中dataflow-oss-demo-1.0-SNAPSHOT.jar是上傳至root根目錄下,您也可以自定義上傳路徑。

  3. 執(zhí)行以下命令,提交作業(yè)。

    本示例通過(guò)Per-Job Cluster模式提交作業(yè),其他方式請(qǐng)參見基礎(chǔ)使用

    flink run -t yarn-per-job -d -c com.alibaba.ververica.dataflow.demo.oss.OssDemoJob \
        /dataflow-oss-demo-1.0-SNAPSHOT.jar  \
        --outputOssDir oss://xung****-flink-dlf-test/oss_kafka_test \
        --kafkaBrokers core-1-1:9092 \
        --inputTopic kafka-test-topic \
        --inputTopicGroup my-group

    參數(shù)說(shuō)明:

    • outputOssDir:指定您計(jì)劃寫入的OSS目錄。

    • kafkaBrokers:指定Kafka集群的broker,使用core-1-1:9092即可。

    • inputTopic:指定計(jì)劃讀取的Kafka Topic,使用在步驟三中創(chuàng)建的kafka-test-topic

    • inputTopicGroup:指定計(jì)劃使用的Kafka Consumer Group,使用my-group用于測(cè)試即可。

    result

    您可以執(zhí)行以下命令,查看作業(yè)狀態(tài)。

    flink list -t yarn-per-job -Dyarn.application.id=<appId>
    說(shuō)明

    <appId>為作業(yè)運(yùn)行后返回的Application ID。例如,本示例截圖中的application_1670236019397_0003。

步驟五:查看輸出的結(jié)果

  • 作業(yè)正常運(yùn)行后,您可以在OSS控制臺(tái)查看輸出結(jié)果。

    1. 登錄OSS管理控制臺(tái)

    2. 單擊創(chuàng)建的存儲(chǔ)空間。

    3. 在文件管理頁(yè)面指定的輸出目錄下查看輸出結(jié)果,輸出結(jié)果如下圖所示。OSS results

      重要

      由于該作業(yè)為流式作業(yè)會(huì)持續(xù)運(yùn)行,產(chǎn)生較多輸出文件,應(yīng)在完成驗(yàn)證后,及時(shí)在命令行中通過(guò)yarn application -kill <appId>命令終止該作業(yè)。

  • 您也可以在DataFlow集群中,通過(guò)命令行運(yùn)行hdfs dfs -cat oss://<YOUR_TARGET_BUCKET>/oss_kafka_test/<DATE_DIR>/part-0-0來(lái)展示實(shí)際存儲(chǔ)到OSS中的數(shù)據(jù),如下圖所示。OSS示例

    重要
    • 為了保證Exactly-Once語(yǔ)義,在Flink作業(yè)每完成一次Checkpoint(本示例中Checkpoint間隔為30s),數(shù)據(jù)文件才會(huì)落盤到OSS中。

    • 此外,由于該作業(yè)為流式作業(yè)會(huì)持續(xù)運(yùn)行,會(huì)產(chǎn)生較多輸出文件,應(yīng)在完成驗(yàn)證后,及時(shí)在命令行中通過(guò)yarn application -kill <appId>命令終止該作業(yè)。