日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

DataStream的Timer使用最佳實踐

本文為您介紹如何在DataStream上使用Timer,及Timer使用建議和注意事項。

什么是Timer?

Timer是Flink提供的定時器機制。

通常,F(xiàn)link作業(yè)是事件驅動計算的,但在一些場景下,F(xiàn)link作業(yè)需要基于處理時間(ProcessingTime)或者事件時間(EventTime)驅動計算和發(fā)送數(shù)據(jù),這時便需要使用Timer。算子可以注冊一個Timer,當時間達到指定的處理時間,或事件時間水印(Watermark)達到指定的事件時間時,便會觸發(fā)指定的計算邏輯。Flink中的窗口便是基于Timer實現(xiàn)的。

多數(shù)情況下,這類需求可以使用SQL中的窗口滿足。但有時,F(xiàn)link作業(yè)存在更加復雜且定制化的需求,這時可以考慮使用DataStream API,利用其中的Timer機制實現(xiàn)。

如何使用Timer?

Flink作業(yè)開發(fā)者可以在KeyedStream上使用KeyedProcessFunction,或者在ConnectedStream上使用KeyedCoProcessFunction,又或在BroadcastConnectedStream上使用KeyedBroadcastProcessFunction。通過這些Function中提供的TimerService來使用Timer。其中使用最多的是KeyedProcessFunction。我們以此為例來介紹下如何使用Timer。

KeyedProcessFunction與RichFlatMapFunction非常相近,同樣可以處理單條數(shù)據(jù),輸出0到任意多條數(shù)據(jù),但KeyedProcessFunction只能在KeyedStream上使用,并提供了額外的Timer支持。

重要

由于Timer會使用KeyedState進行保存和恢復,因此只能在KeyedProcessFunction中使用Timer,無法在ProcessFunction中使用。

    public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {
    
        // 處理輸入數(shù)據(jù)。
    	public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
    
        // 當達到Timer指定時間時的回調。
    	public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
    
        // 處理數(shù)據(jù)中使用的Context,也是Timer回調中使用的Context的基類。
        public abstract class Context {
    
            // 當前處理的數(shù)據(jù)或Timer的時間戳。
            public abstract Long timestamp();
    
            // 獲取TimerService以進行Timer注冊或刪除操作。
            public abstract TimerService timerService();
    
            // 將數(shù)據(jù)作為Side Output輸出。
            public abstract <X> void output(OutputTag<X> outputTag, X value);
    
            // 獲取當前處理的數(shù)據(jù)的Key。
            public abstract K getCurrentKey();
        }
    
        // Timer回調中使用的Context。
        public abstract class OnTimerContext extends Context {
            // 獲取當前Timer的TimeDomain,即使用處理時間還是事件時間。
            public abstract TimeDomain timeDomain();
    
            // 獲取當前Timer的Key。
            public abstract K getCurrentKey();
        }
    }

    KeyedProcessFunction.Context提供了訪問TimerService的途徑,可以在處理數(shù)據(jù)或Timer時使用TimerService注冊新的Timer或刪除已有的Timer。注冊所使用的時間單位均為毫秒。

    public interface TimerService {
    
        // 獲取當前的處理時間。
        long currentProcessingTime();
    
        // 獲取當前的事件時間水印。
        long currentWatermark();
    
        // 注冊指定處理時間的Timer。
        void registerProcessingTimeTimer(long time);
    
        // 注冊指定事件時間的Timer。
        void registerEventTimeTimer(long time);
    
        // 刪除指定處理時間的Timer。
        void deleteProcessingTimeTimer(long time);
    
        // 刪除指定事件時間的Timer。
        void deleteEventTimeTimer(long time);
    }

    在processElement中注冊Timer時,會使用當前處理的數(shù)據(jù)的Key,而在onTimer中注冊Timer時會繼承當前處理的Timer的Key。同一個Key在同一個時間點只會有一個Timer,因此也只會觸發(fā)一次計算。不同的Key則會分別觸發(fā)計算。注冊的單個Timer均為一次性觸發(fā),如果需要實現(xiàn)周期性觸發(fā)的邏輯,則需要在onTimer中注冊下一個觸發(fā)時間點的Timer。

    Timer使用示例

    如前面所說,F(xiàn)link的窗口就是使用Timer實現(xiàn)的。首先我們看一下基于事件時間窗口,每分鐘對輸入數(shù)值求和并輸出的例子。在DataStream API中使用窗口的代碼示例如下。

    DataStream<Tuple2<String, Long>> sum = inputs
            .keyBy(input->input.f0)
            .window(TumblingEventTimeWindows.of(Time.minutes(1)))
            .reduce(new SumReduceFunction());

    我們可以嘗試直接使用KeyedProcessFunction和Timer來實現(xiàn)類似的邏輯:

    DataStream<Tuple2<String, Long>> sum = inputs
        .keyBy(input -> input.f0)
        .process(new KeyedProcessFunction<String, Tuple2<String, Long>, Tuple2<String, Long>>() {
            // 記錄窗口內總和的State。
            private ValueState<Long> sumState;
    
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                sumState = getRuntimeContext().getState(new ValueStateDescriptor<>("sum", Long.class));
            }
    
            @Override
            public void processElement(Tuple2<String, Long> value, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {
                if (sumState.value() == null) {
                    // 當某個Key的數(shù)據(jù)第一次處理,或在Timer觸發(fā)后第一次處理時,根據(jù)當前數(shù)據(jù)的事件時間,計算所屬的時間窗口,注冊窗口結束時刻的Timer。
                    ctx.timerService().registerEventTimeTimer(getWindowStartWithOffset(ctx.timestamp(), 0, 60 * 1000) + 60 * 1000);
                    sumState.update(value.f1);
                } else {
                    // 否則進行累加。
                    sumState.update(sumState.value() + value.f1);
                }
            }
    
            @Override
            public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception {
                // 輸出此期間的總和,并清除累積值。
                out.collect(new Tuple2<>(ctx.getCurrentKey(), sumState.value()));
                sumState.clear();
            }
    
            // 該方法自TimeWindow.java中復制而來,用于計算給定時間戳所從屬的窗口的起點。
            private long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
                final long remainder = (timestamp - offset) % windowSize;
                // handle both positive and negative cases
                if (remainder < 0) {
                    return timestamp - (remainder + windowSize);
                } else {
                    return timestamp - remainder;
                }
            }
        });

    當一個Key首次有數(shù)據(jù)輸入時,F(xiàn)unction會計算當前數(shù)據(jù)的事件時間屬于哪一個時間窗口,注冊這個時間窗口結束時刻觸發(fā)的Timer,并開始累加數(shù)據(jù)。事件時間水印達到指定時刻之后,F(xiàn)link會調用onTimer,將累加值輸出出去,并清除累加狀態(tài)。此后這個Key再有新的數(shù)據(jù)輸入時,會重復這個過程。

    以上這兩個實現(xiàn)的邏輯基本是相同的。可以發(fā)現(xiàn)如果Timer處理后,這個Key不再有數(shù)據(jù)輸入,后續(xù)也不會再輸出這個Key的數(shù)據(jù)。有時作業(yè)的邏輯已知輸入Key是有限個,希望有一個Key輸入一次后,無論后續(xù)是否還有數(shù)據(jù),都以相同的事件時間周期輸出周期內的累加值,可以將OnTimer的實現(xiàn)修改為:

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception {
        // 輸出此期間的總和。
        out.collect(new Tuple2<>(ctx.getCurrentKey(), sumState.value()));
        // 重置但不清除累積值。
        sumState.update(0L);
        // 注冊下一次輸出累積值的Timer。該timestamp就是窗口結束時刻,下一個窗口可以直接加60s。
        ctx.timerService().registerEventTimeTimer(timestamp + 60 * 1000);
    }

    如此便可以使得sumState.value()在賦值一次后永遠不為null,從而實現(xiàn)無論是否有數(shù)據(jù),都會繼續(xù)定期輸出這個Key的累加值,無數(shù)據(jù)時會輸出0。

    說明

    這里的輸出周期是基于事件時間水印的事件時間周期。

    如果想要基于處理時間而非事件時間進行聚合,則可以替換processElement中注冊Timer和獲取時間的邏輯,改為:

    @Override
    public void processElement(Tuple2<String, Long> value, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception {
        if (sumState.value() == null) {
            // 根據(jù)當前的處理時間,計算所屬的時間窗口,注冊窗口結束時間的Timer。
            ctx.timerService().registerProcessingTimeTimer(getWindowStartWithOffset(ctx.timerService().currentProcessingTime(), 0, 60 * 1000) + 60 * 1000);
            sumState.update(value.f1);
        } else {
            sumState.update(sumState.value() + value.f1);
        }
    }

    當處理時間達到指定時間之后,便會調用對應的onTimer邏輯。

    基于以上類似的邏輯,修改State計算邏輯和輸出數(shù)據(jù)的邏輯,可以實現(xiàn)其他類似的計算需求。

    另一個單純使用窗口不易實現(xiàn)而需要使用Timer實現(xiàn)的業(yè)務邏輯是心跳警告。當一個Key的輸入一次后,如果一分鐘內沒有再輸入新的數(shù)據(jù),就發(fā)出一個告警消息。方便起見這里只使用Key作為輸入,實現(xiàn)的代碼如下。

    DataStream<String> sum = inputs
        .keyBy(input->input)
        .process(new KeyedProcessFunction<String, String, String>() {
            // 記錄此前的超時時間的State。
            private ValueState<Long> lastTimerState;
    
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                lastTimerState = getRuntimeContext().getState(new ValueStateDescriptor<>("timer", Long.class));
            }
    
            @Override
            public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
                if (lastTimerState.value() != null) {
                    // 清除此前注冊的超時Timer。
                    ctx.timerService().deleteProcessingTimeTimer(lastTimerState.value());
                }
                // 注冊新的超時Timer,并記錄在State中,用于后續(xù)清除。
                long timeout = ctx.timerService().currentProcessingTime() + 60 * 1000;
                ctx.timerService().registerProcessingTimeTimer(timeout);
                lastTimerState.update(timeout);
                // 輸出正常數(shù)據(jù)。
                out.collect(value);
            }
    
            @Override
            public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                // 進入此方法說明已超時,發(fā)送一個心跳超時警告的消息。也可以考慮使用SideOutput而非默認輸出流進行輸出。
                out.collect("Heartbeat timeout:" + ctx.getCurrentKey());
            });

    使用建議

    • 大多數(shù)情況下窗口能夠滿足需求,建議優(yōu)先使用窗口。

    • KeyedProcessFunction的processElement和onTimer方法不會被同時調用,因此不需要擔心同步問題。但這也意味著處理onTimer邏輯是會阻塞處理數(shù)據(jù)的。

    • Flink沒有提供查詢Timer注冊狀態(tài)的API,因此如果預計需要進行Timer刪除操作,F(xiàn)unction需要自行記錄已注冊Timer的時間。

    • Timer會保存在Checkpoint中,當作業(yè)從Failover中恢復,或從Savepoint重新啟動時,Timer也會被恢復。此時:

      • 已經到時間的處理時間Timer,會直接觸發(fā)處理。因此作業(yè)啟動后短時間內可能會觸發(fā)大量的Timer進行數(shù)據(jù)處理和發(fā)送。

      • 事件時間Timer則會在收到對應時間的Watermark后觸發(fā)處理。因此作業(yè)也有可能在啟動后一段時間后,即事件時間水印更新后觸發(fā)大量的Timer進行數(shù)據(jù)的處理和發(fā)送。

    • Timer與Key相關,在Checkpoint里會保存在KeyedState中,因此只能在KeyedStream,或者有Key的ConnectedStream或BroadcastConnectedStream上使用。無Key的流作業(yè)在需要使用Timer時,如果符合以下兩種情況可以按相應的方法使用:

      • 如果Timer的邏輯與特定字段值無關,每條數(shù)據(jù)獨立使用一個Timer,可以使用數(shù)據(jù)內的一個唯一ID(UUID)作為Key進行keyby。

        重要

        該字段需要存在于上游數(shù)據(jù)中,不可以是keyby方法中生成隨機值。

      • 如果全局共享一個Timer,即全局進行聚合計算的情況,則可以使用一個常量作為Key進行keyby,并將并發(fā)設為1。

    注意事項

    • 請盡量避免大量Timer同時觸發(fā)的情況,例如數(shù)百萬個Key的Timer都在整點觸發(fā)。這種情況建議把觸發(fā)時間打散到前后數(shù)分鐘或更長的范圍內。

    • 請避免在processElement和onTimer中重復注冊Timer,因為這會導致Timer數(shù)量急劇膨脹。

    • 通常情況下Timer的開銷是很小的,大量的Key注冊Timer也沒有問題。但仍然建議關注Checkpoint時間和內存狀態(tài)。如果使用Timer后,Checkpoint時間或者內存使用量增加很多,超過可容忍范圍,可能需要考慮優(yōu)化邏輯,或使用其他方式實現(xiàn)。

    • 如果在有限流上使用處理時間Timer需要注意,當數(shù)據(jù)處理結束時,未到時間的處理時間Timer將被忽略,這意味著數(shù)據(jù)可能會丟失。