大数据专家详解,Flink事件时间水印和迟到数据处理,Flink Time

大数据专家详解,Flink事件时间水印和迟到数据处理,Flink Time

经验文章nimo972025-02-18 11:24:068A+A-

前言

之前的文章中已经屡次提到过Flink的事件时间(event time)、水印(watermark)、乱序(out-of-order)、迟到数据(late element)这些概念,虽然它们都非常基础,但笔者还没有对它们做过像样的介绍,感觉不太合适。正好今天脑子比较累,又是Friday night,不适合写复杂的东西,就来谈谈简单的吧。(来源:简书 作者:LittleMagic)

事件时间与水印

所谓事件时间,就是Flink DataStream中的数据元素自身带有的、在其实际发生时记录的时间戳,具有业务含义,并与系统时间独立。很显然,由于外部系统产生的数据往往不能及时、按序到达Flink系统,所以事件时间比处理时间有更强的不可预测性。为了能够准确地表达事件时间的处理进度,就必须用到水印。

Flink水印的本质是DataStream中的一种特殊元素,每个水印都携带有一个时间戳。当时间戳为T的水印出现时,表示事件时间t <= T的数据都已经到达,即水印后面应该只能流入事件时间t > T的数据。也就是说,水印是Flink判断迟到数据的标准,同时也是窗口触发的标记。

为了形象地说明水印的作用,参考一下下面的图,是一个乱序的基于事件时间的数据流示例。

图中的方框就是数据元素,其中的数字表示事件时间,W(x)就表示时间戳是x的水印,并有长度为4个时间单位的滚动窗口。假设时间单位为秒,可见事件时间为2、3、1s的元素都会进入区间为[1s, 4s]的窗口,而事件时间为7s的元素会进入区间为[5s, 8s]的窗口。当水印W(4)到达时,表示已经没有t <= 4s的元素了,[1s, 4s]窗口会被触发并计算。同理,水印W(9)到达时,[5s, 8s]窗口会被触发并计算,以此类推。

不过图中暂时没有示出迟到数据。如果事件时间为6的元素出现在W(9)后面,就算是迟到了。迟到数据的处理后面再说。

上面的示例只有一个并行度,那么在有多个并行度的情况下,就会有多个流产生水印,窗口触发时该采用哪个水印呢?答案是所有流入水印中时间戳最小的那个。来自官方文档的图能够说明问题。

https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/event_time.html

容易理解,如果所有流入水印中时间戳最小的那个都已经达到或超过了窗口的结束时间,那么所有流的数据肯定已经全部收齐,就可以安全地触发窗口计算了。

提取事件时间、产生水印

上面说了这么多,那么事件时间是如何从数据中提取的,水印又是如何产生的呢?Flink提供了统一的
DataStream.assignTimestampsAndWatermarks()方法来提取事件时间并同时产生水印,毕竟它们在处理过程中是紧密联系的。


assignTimestampsAndWatermarks()方法接受的参数类型有
AssignerWithPeriodicWatermarks和
AssignerWithPunctuatedWatermarks两种,分别对应周期性水印和打点(即由事件本身的属性触发)水印,它们的类图如下所示。

周期性水印

顾名思义,使用
AssignerWithPeriodicWatermarks时,水印是周期性产生的。该周期默认为200ms,也能通过
ExecutionConfig.setAutoWatermarkInterval()方法来指定新的周期。

由类图容易看出,我们需要通过实现extractTimestamp()方法来提取事件时间,实现getCurrentWatermark()方法产生水印。但好在Flink已经提供了3种内置的实现类,所以我们直接用就可以了,省事。

  • AscendingTimestampExtractor
    总说话口干舌燥的(?),还是看代码吧。
    public abstract long extractAscendingTimestamp(T element);

    @Override
    public final long extractTimestamp(T element, long elementPrevTimestamp) {
        final long newTimestamp = extractAscendingTimestamp(element);
        if (newTimestamp >= this.currentTimestamp) {
            this.currentTimestamp = newTimestamp;
            return newTimestamp;
        } else {
            violationHandler.handleViolation(newTimestamp, this.currentTimestamp);
            return newTimestamp;
        }
    }

    @Override
    public final Watermark getCurrentWatermark() {
        return new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - 1);
    }


AscendingTimestampExtractor产生的时间戳和水印必须是单调非递减的,用户通过覆写extractAscendingTimestamp()方法抽取时间戳。如果产生了递减的时间戳,就要使用名为MonotonyViolationHandler的组件处理异常,有两种方式:打印警告日志(默认)和抛出RuntimeException。

单调递增的事件时间并不太符合实际情况,所以
AscendingTimestampExtractor用得不多。

  • BoundedOutOfOrdernessTimestampExtractor
    它的出镜率就非常高了。还是看代码先。
    public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) {
        if (maxOutOfOrderness.toMilliseconds() < 0) {
            throw new RuntimeException("Tried to set the maximum allowed " +
                "lateness to " + maxOutOfOrderness + ". This parameter cannot be negative.");
        }
        this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();
        this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness;
    }

    public abstract long extractTimestamp(T element);

    @Override
    public final Watermark getCurrentWatermark() {
        long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
        if (potentialWM >= lastEmittedWatermark) {
            lastEmittedWatermark = potentialWM;
        }
        return new Watermark(lastEmittedWatermark);
    }

    @Override
    public final long extractTimestamp(T element, long previousElementTimestamp) {
        long timestamp = extractTimestamp(element);
        if (timestamp > currentMaxTimestamp) {
            currentMaxTimestamp = timestamp;
        }
        return timestamp;
    }

如名字所述,
BoundedOutOfOrdernessTimestampExtractor产生的时间戳和水印是允许“有界乱序”的,构造它时传入的参数maxOutOfOrderness就是乱序区间的长度,而实际发射的水印为通过覆写extractTimestamp()方法提取出来的时间戳减去乱序区间,相当于让水印把步调“放慢一点”。这是Flink为迟到数据提供的第一重保障。

当然,乱序区间的长度要根据实际环境谨慎设定,设定得太短会丢较多的数据,设定得太长会导致窗口触发延迟,实时性减弱。

  • IngestionTimeExtractor
    @Override
    public long extractTimestamp(T element, long previousElementTimestamp) {
        final long now = Math.max(System.currentTimeMillis(), maxTimestamp);
        maxTimestamp = now;
        return now;
    }

    @Override
    public Watermark getCurrentWatermark() {
        final long now = Math.max(System.currentTimeMillis(), maxTimestamp);
        maxTimestamp = now;
        return new Watermark(now - 1);
    }

IngestionTimeExtractor基于当前系统时钟生成时间戳和水印,其实就是Flink三大时间特征里的摄入时间了。

打点水印

打点水印比周期性水印用的要少不少,并且Flink没有内置的实现,那么就写个最简单的栗子吧。

    sourceStream.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks() {
      @Nullable
      @Override
      public Watermark checkAndGetNextWatermark(UserActionRecord lastElement, long extractedTimestamp) {
        return lastElement.getUserId().endsWith("0") ? new Watermark(extractedTimestamp - 1) : null;
      }

      @Override
      public long extractTimestamp(UserActionRecord element, long previousElementTimestamp) {
        return element.getTimestamp();
      }
    });


AssignerWithPunctuatedWatermarks适用于需要依赖于事件本身的某些属性决定是否发射水印的情况。我们实现checkAndGetNextWatermark()方法来产生水印,产生的时机完全由用户控制。上面例子中是收取到用户ID末位为0的数据时才发射。

还有三点需要提醒:

  • 不管使用哪种方式产生水印,都不能过于频繁。因为Watermark对象是会全部流向下游的,也会实打实地占用内存,水印过多会造成系统性能下降。
  • 水印的生成要尽量早,一般是在接入Source之后就产生,或者在Source经过简单的变换(map、filter等)之后产生。
  • 如果需求方对事件时间carry的业务意义并不关心,可以直接使用处理时间,简单方便。

迟到数据处理

如上所述,水印的乱序区间能够保证一些迟到数据不被丢弃,但是乱序区间往往不很长,那些真正迟到了的数据该怎么办呢?有两种方法来兜底,可以说是Flink为迟到数据提供的第二重保障。

窗口允许延迟

Flink提供了
WindowedStream.allowedLateness()方法来设定窗口的允许延迟。也就是说,正常情况下窗口触发计算完成之后就会被销毁,但是设定了允许延迟之后,窗口会等待allowedLateness的时长后再销毁。在该区间内的迟到数据仍然可以进入窗口中,并触发新的计算。当然,窗口也是吃资源大户,所以allowedLateness的值要适当。给个完整的代码示例如下。

      sourceStream.assignTimestampsAndWatermarks(
        new BoundedOutOfOrdernessTimestampExtractor(Time.seconds(30)) {
          private static final long serialVersionUID = 1L;
          @Override
          public long extractTimestamp(UserActionRecord record) {
            return record.getTimestamp();
          }
        }
      )
      .keyBy("platform")
      .window(TumblingEventTimeWindows.of(Time.minutes(1)))
      .allowedLateness(Time.seconds(30))
      .aggregate(new ViewAggregateFunc(), new ViewSumWindowFunc())
      // ......

allowedLateness机制实际上就是DataFlow模型中的回填(backfill)策略的实现。对于滑动窗口和滚动窗口是累积(accumulating)策略,对于会话窗口则是累积与回撤(accumulating & retracting)策略。之前讲DataFlow模型时提到过,不废话了。

侧输出迟到数据

侧输出(side output)是Flink的分流机制。迟到数据本身可以当做特殊的流,我们通过调用
WindowedStream.sideOutputLateData()方法将迟到数据发送到指定OutputTag的侧输出流里去,再进行下一步处理(比如存到外部存储或消息队列)。代码如下。

      // 侧输出的OutputTag
      OutputTag lateOutputTag = new OutputTag<>("late_data_output_tag");

      sourceStream.assignTimestampsAndWatermarks(
        new BoundedOutOfOrdernessTimestampExtractor(Time.seconds(30)) {
          private static final long serialVersionUID = 1L;
          @Override
          public long extractTimestamp(UserActionRecord record) {
            return record.getTimestamp();
          }
        }
      )
      .keyBy("platform")
      .window(TumblingEventTimeWindows.of(Time.minutes(1)))
      .allowedLateness(Time.seconds(30))
      .sideOutputLateData(lateOutputTag)   // 侧输出
      .aggregate(new ViewAggregateFunc(), new ViewSumWindowFunc())
      // ......

      // 获取迟到数据并写入对应Sink
      stream.getSideOutput(lateOutputTag).addSink(lateDataSink);



Flink Time详解

Flink Time 详解(来源:简书 作者:鬼古神月)

概述:
    对于流式数据处理,最大的特点是数据上具有时间的属性特征,Flink根据时间产生的位置不同,将时间区分为三种时间语义
    分别为事件生成时间(Event Time)、事件接入时间(Ingestion Time)和事件处理时间(Processing Time)
1. Event Time: 事件产生的时间,它通常有事件中的时间戳描述
2. Ingestion Time: 事件进入Flink的时间
3. Processing Time : 事件被处理时当前系统的时间.

1. 时间语义Time

概述:
    数据从终端产生,或者从系统中产生的过程中生成的时间为事件生成时间,当时间经过消息中间件传入到Flink系统中,在DataSource中接入
    的时候会生成事件接入时间,当数据在Flink系统中通过各个算子实例执行转换操作的过程中,算子实例所在系统的时间为数据处理时间,
1. 设置时间语义:
概述:
     在Flink中默认情况下使用的是Process Time时间语义,如果用户选择使用Event Time或者Ingestion Time 语义, 则需要在创建的
     StreamExecutionEnvironment 中调用setStreamTimeCharacteristic()方法设定系统的时间概念,
     如下代码使用TimeCharacteristic.EventTime作为系统的时间语义:
code:
    //设置使用EventTime
    env.setStreamTImeCharacteristic(TimeCharacteristic.EventTime)
    // 使用IngestionTime
    env.setStreamTImeCharacteristic(TimeCharacteristic.IngestionTime)

2. WaterMark水位线

概述:
    在使用EventTIme处理Stream数据的时候会遇到数据乱序的问题,流处理从Event(事件)产生,流经Source,再到Operator,这中间需要一定
    的时间,虽然大部分情况下,传输到Operator的数据都是按照事件产生的时间顺序来的,但是也不能排除由于网络延迟等原因而导致乱序的
    产生,特别是使用Kafka的时候,多个分区之间的数据无法保证有序,因此,在进行Window计算的时候,不能无限期地等下去,必须要有个机制
    来保证在特定的时间后,必须要触发Window进行计算,WaterMark是用于处理乱序事件的.
1. Watermark原理
概述:
    在Filnk的窗口处理过程中,如果确定全部数据到达,就可以对Window的所有数据做窗口计算操作(汇总,分组),如果数据没有全部到达,则
    继续等待该窗口中的数据全部到达才开始处理,这种情况下就需要用到水位线(WaterMarks)机制,它能够衡量数据处理进度(表达数据达到
    的完整性),保证事件数据(全部)达到Flink系统,或者在乱序以及延迟到达时,也能够想预期一样计算出正确并且连续的结果,
    当任何Event进入到FLink系统时,会根据当前最大事件时间产生Watermarks时间戳(t)
Flink是怎么计算Watermark的值呢?
概述:
    Watermark = 进入Flink的最大的事件时间(MaxEventTime)-指定的延迟时间(t)
那么有Watermark的Window是怎么处罚窗口函数的呢?
概述:
    如果有窗口的停止时间等于或者小于maxEventTIme -t(当时的Watermark),那么这个窗口被触发执行.
注意:
    Watermark本质可以理解成一个延迟触发机制.

Watermark的使用存在三种情况:

1. 本来有序的Stream中的Watermark
概述:
    如果数据元素的事件时间是有序的,Watermark时间戳会随着数据元素的事件时间按顺序生成,此时水位线的变化和
事件时间保持一致(因为既然是有序的时间,就需要设置延迟了,那么t就是0,所以watermark= maxTime-0 = maxTime),也
就是理想状态下的水位线,
当Watermark大于Windows结束时间就会触发对Windows的数据计算,以此类推
下一个Window也是一样
2. 乱序事件中的Watermark
概述:
    现实情况下数据元素往往并不是按照其产生顺序接入到Flink系统中进行处理,而频繁出现乱序或迟到的情况,这种
情况就需要使用Watermark来应对.
3. 并行数据流中的Watermark
概述:
    在多并行度的情况下,Watermark会有一个对齐机制,这个对齐机制会取所有Channel中最小的Watermark.

引入Wateramark和EventTime

1. 有序数据流中引入Watermark和EventTime
    对于有序的数据,代码比较简洁,主要需要从源Event中抽取EventTime.
    code:
    // 读取Socket数据
    //获取flink实时流处理的环境
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        val data = env.socketTextStream("localhost", 9999)
          .map { line =>
            var arr = line.split(",")
            StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
          }
        // 根据EventTime有序的数据流
        data.assignAscendingTimestamps(_.callTime)
        // StationLog对象中抽取EventTime就是callTime属性
2. 乱序数据流中引入Watermark和EventTime
    对于乱序数据流,有两种常见的引入方法:周期性和间断性.
    2.1 With Periodic(周期性的)Watermark
        周期性地生成Watermark的生成,默认是100ms,每隔N毫秒自动向流里注入一个Watermark,时间间隔由streamEnv.getConfig.
        setAutoWatermarkInterval()决定.最简单写入
        // 如果EventTime是乱序的,需要考虑一个延迟时间t
        // 当前代码设置的延迟时间为3s
        data.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[StationLog](Time.seconds(3)) {
          override def extractTimestamp(t: StationLog): Long = {
            t.callTime// 设置EventTIme
          }
        })
    2.2 With Punctuated(间断性的)Watermark
        间断性的生成Watermark一般都是基于某些事件触发Watermark的生成和发送,比如:在我们的基站数据中,有一个基站的CallTime
        总是没有按照顺序传入,其他基站的时间都是正常的,那我们需要对这个基站来专门生成Watermark
        // 1. 只有satation_1的Event是无序的,所以只需要针对Station_1做处理
        // 当前代码设置station_1基站的延迟处理时间为3s
        data.assignTimestampsAndWatermarks(new MyCustomerPunctuatedWatermarks(3000L)) //自定义延迟
        class MyCustomerPunctuatedWatermarks(delary: Long) extends
        AssignerWithPunctuatedWatermarks[StationLog] {
        var maxTime: long = 0
    
        override def checkAndGetNextWatermark(t: StationLog, l: Long): Watermark = {
          if (t.sid.equals("station_1")) {
            ///当基站ID为station_1才生成水位线
            maxTime = math.max(maxTime, l)
            new Watermark(maxTime)
          } else {
            return null //
          }
        }
    
        override def extractTimestamp(t: StationLog, l: Long): Long = {
          // 抽取EventTime的值
          t.callTime
        }
        }

Watermark 案例

概述:
    每隔5s中统计一下最近10s内每个基站中通话时间最长的一次通话发生的
    呼叫时间、主叫号码、被叫号码,通话时长. 并且还得告诉我是那个时间范围(10s)
code:
    package FlinkDemo.time
    import java.text.SimpleDateFormat
    import FlinkDemo.functions.FunctionClassTransformation.StationLog
    import org.apache.flink.api.common.functions.ReduceFunction
    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.streaming.api.scala.function.WindowFunction
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    import org.apache.flink.util.Collector
       /**
        * 每隔5s中统计一下最近10s内每个基站中通话时间最长的一次通话发生的
        * 呼叫时间、主叫号码、被叫号码,通话时长
        * 并且还得告诉我是那个时间范围(10s)
        */
    object Watermark_demo {
      def main(args: Array[String]): Unit = {
        //获取flink实时流处理的环境
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        // 设置数据流时间特征,默认为TimeCharacteristic.ProcessingTime,默认水位线更新200ms
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        // 设置并行度1
        env.setParallelism(1)
        // 导入隐式转换
        import org.apache.flink.streaming.api.scala._
        val data = env.socketTextStream("localhost", 9999)
        // 分配事件时间提取器, 
          .assignTimestampsAndWatermarks(new TimestampExtractor1())
          .map { line =>
            var arr = line.split(",")
            StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
          }
          // 引入Watermark
          .keyBy(_.sid)
          // 设置滑动窗口大小为10s,滚动时间为5s
          .timeWindow(Time.seconds(10), Time.seconds(5))
          // reduce 函数做增量聚合,MaxTimeAggregate能做到来一条数据处理一条,
          // ReturnMaxTime 在窗口触发的时候调用.
          // reduce( preAggregator: (T, T) => T,
          //      function: ProcessWindowFunction[T, R, K, W])
          .reduce(new MaxTimeReduce, new ReturnMaxTime)
          .print()
        env.execute()
      }
    
      class MaxTimeReduce extends ReduceFunction[StationLog] {
        // 每个基站中传入的数据
        override def reduce(value1: StationLog, value2: StationLog): StationLog = {
          if (value1.duration > value2.duration) value1 else value2
        }
      }
    
      class ReturnMaxTime extends WindowFunction[StationLog, String, String, TimeWindow] {
        // 获取时间范围
        override def apply(key: String, window: TimeWindow, input: Iterable[StationLog], out: Collector[String]): Unit = {
          val sb = new StringBuilder
          val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
          sb.append(s"时间范围: Start: ${format.format(window.getStart)},${format.format(window.getEnd)}")
          sb.append("\n")
          sb.append(s"接入时间: ${format.format(input.iterator.next.callTime)}")
          sb.append("\n")
          sb.append("通话日志: ").append(input.iterator.next())
          out.collect(sb.toString())
        }
      }
    }
提取器: 
    package FlinkDemo.time
    
    import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
    import org.apache.flink.streaming.api.watermark.Watermark
    
    import java.text.SimpleDateFormat
    
    class TimestampExtractor1 extends AssignerWithPeriodicWatermarks[String] with
      Serializable {
      var currentTimestamp: Long = 0L
      //设置最大允许的乱序事件是5s
      val maxDelayTime = 5000L
      val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
      var watermark: Watermark = null
    
      override def getCurrentWatermark: Watermark = {
        watermark =
          new Watermark(currentTimestamp - maxDelayTime)
        //println("new watermark: " + format.format(watermark.getTimestamp) + " ")
        watermark
      }
    
      override def extractTimestamp(t: String, l: Long): Long = {
        val timeStamp =  t.split(",")(4).toLong
        currentTimestamp = Math.max(timeStamp, currentTimestamp)
        println("timestamp: " + format.format(timeStamp) + "|" + format.format(currentTimestamp) + "|" + format.format(getCurrentWatermark.getTimestamp))
        timeStamp
      }
    }

Watermark demo2

code:
    import org.apache.flink.api.common.functions.ReduceFunction
    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.streaming.api.scala.function.WindowFunction
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    import org.apache.flink.util.Collector
    
    /*
     *想使用WaterMark,需要3个步骤
     *  1. 对数据进行timestamp提取,即调用assignTimestampsAndWaterMarks函数
     *     实例化BoundedOutOfOrdernessTimestampExtractor,重写extractTimestamp方法
     * 2. 是指使用事件时间,因为WaterMark是基于事件时间
     * 3. 定义时间窗口: 翻滚窗口(TumblingEventWindows)、滑动窗口(timeWindow)
     *  任意要给没有实现都会报异常
     */
    object EtDemo {
      /**/
    
    
      def main(args: Array[String]): Unit = {
        //获取flink实时流处理的环境
        val senv = StreamExecutionEnvironment.getExecutionEnvironment
        import org.apache.flink.streaming.api.scala._
        // 设置数据流时间特征,默认为TimeCharacteristic.ProcessingTime,默认水位线更新200ms
        senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        // 导入隐式转换
    
        //便于测试,并行度设置为1
        senv.setParallelism(1)
    
        // 2. 创建数据源
    
        // 3. 绑定数据源
        val text = senv.socketTextStream("localhost", 9999)
          //senv.fromCollection(data)
          // 添加事件时间提取器
          .assignTimestampsAndWatermarks(new TimestampExtractor())
        // 对自定的数据集进行窗口计数
        val counts = text
          .map { (m: String) => new Character(m.split(",")(1), 1) }
          .keyBy(_.character)
          //划分规则时按1分钟,内秒数划分的1-10,5-15,..30-40,35-45,40-50,45-55,50-60,55-60
          .timeWindow(Time.seconds(10), Time.seconds(5))
          .reduce(new ReduceDemo, new MyWFunction)
          .print()
        senv.execute("EventTime processing examkple")
      }
    
      case class Character(character: String, num: Int)
    
      class ReduceDemo extends ReduceFunction[Character] {
        // 计算传入的数据累加
        override def reduce(value1: Character, value2: Character): Character = {
          new Character(value1.character, value2.num + value1.num)
        }
      }
    
      class MyWFunction extends WindowFunction[Character, String, String, TimeWindow] {
        override def apply(key: String, window: TimeWindow, input: Iterable[Character], out: Collector[String]): Unit = {
          val sb = new StringBuilder
          val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
          sb.append(s"时间范围: Start: ${format.format(window.getStart)},${format.format(window.getEnd)}  ")
          sb.append(input.iterator.next().toString())
          out.collect(sb.toString())
        }
      }
    }
input:
    1522827251000,a
    1522827252000,a
    1522827251000,b
    1522827252000,b
    1522827256000,a
    1522827254000,a
    1522827261000,a
    1522827270000,a
result:
    timestamp: 2018-04-04 15:34:11.000|2018-04-04 15:34:11.000|2018-04-04 15:34:06.000
    timestamp: 2018-04-04 15:34:12.000|2018-04-04 15:34:12.000|2018-04-04 15:34:07.000
    timestamp: 2018-04-04 15:34:11.000|2018-04-04 15:34:12.000|2018-04-04 15:34:07.000
    timestamp: 2018-04-04 15:34:12.000|2018-04-04 15:34:12.000|2018-04-04 15:34:07.000
    timestamp: 2018-04-04 15:34:16.000|2018-04-04 15:34:16.000|2018-04-04 15:34:11.000
    timestamp: 2018-04-04 15:34:14.000|2018-04-04 15:34:16.000|2018-04-04 15:34:11.000
    timestamp: 2018-04-04 15:34:21.000|2018-04-04 15:34:21.000|2018-04-04 15:34:16.000
    时间范围: Start: 2018-04-04 15:34:05,2018-04-04 15:34:15  Character(a,3)
    时间范围: Start: 2018-04-04 15:34:05,2018-04-04 15:34:15  Character(b,2)
    timestamp: 2018-04-04 15:34:30.000|2018-04-04 15:34:30.000|2018-04-04 15:34:25.000
    时间范围: Start: 2018-04-04 15:34:10,2018-04-04 15:34:20  Character(a,4)
    时间范围: Start: 2018-04-04 15:34:10,2018-04-04 15:34:20  Character(b,2)
    时间范围: Start: 2018-04-04 15:34:15,2018-04-04 15:34:25  Character(a,2)        

Window的 allowedLateness

概述:
    基于Event-Time的窗口处理流式数据,虽然提供了Watermark机制,却只能在一定程度上解决数据乱序的问题,
    但在某些情况下数据可能延时会非常严重,即使通过Watermark机制也无法等到数据全部进入窗口再进行处理.
    Flink中默认会将这些迟到的数据做丢弃处理,但是有些时候用户希望即使数据延迟达到的情况下,也能够正常
    按照流程处理并输出结果,此时就需要使用Allowed Lateness机制来对迟到的数据进行额外的处理
情况:
    通常情况下用户虽然希望对迟到的数据进行窗口计算,但并不想将结果混入正常的计算流程中,
    例如: 用户大屏数据展示系统,即使正常的窗口中没有将迟到的数据进行统计,但为了保证页面数据显示的连续型,
    后来接入到系统中迟到数据所统计出来的结果不希望显示在屏幕上,而是将延时数据和结果存储到数据库中,
    便于后期对延时数据进行分析.
解决:
    对于这种情况需要借助SideOutput来处理,通过使用sideOutputLateDate(OutputTag)来标记迟到数据计算的结果,
    然后使用getSideOutput(lateOutputTag)从窗口结果中获取lateOutputTag标签对应的数据,之后转成独立的DataStream
    数据集进行处理,创建late-date的OutputTag,再通过该标签从窗口结果中将迟到的数据筛选出来.
注意:
    如果有Watermark同时也有Allowed Lateness,name窗口函数再次触发的条件是:
    watermark < end-of-window +allowdLateness
案例:
    import java.text.SimpleDateFormat
    
    import FlinkDemo.functions.FunctionClassTransformation.StationLog
    import org.apache.flink.api.common.functions.AggregateFunction
    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.streaming.api.scala.function.WindowFunction
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    import org.apache.flink.util.Collector
    
    object LateDataOnWindow {
      /**
        * 案例: 每隔5s统计最近10s,每个基站的呼叫数量
        * 1. 每隔基站的数据会存在乱序
        * 2. 大多数数据延迟2s到,但是有些数据迟到时间比较长
        * 3. 迟到时间超过2s的数据不能丢弃,放入测流
        */
    
      def main(args: Array[String]): Unit = {
        //获取flink实时流处理的环境
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        // 设置数据流时间特征,默认为TimeCharacteristic.ProcessingTime,默认水位线更新200ms
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        env.setParallelism(1)
        // 导入隐式转换
        import org.apache.flink.streaming.api.scala._
        val data = env.socketTextStream("localhost", 9999)
          // .assignTimestampsAndWatermarks(new TimestampExtractor1())
          .map { line =>
          var arr = line.split(",")
          StationLog(arr(0).trim, arr(1).trim, arr(2).trim, arr(3).trim, arr(4).trim.toLong, arr(5).trim.toLong)
        }
          // 引入Watermark
          .assignTimestampsAndWatermarks(
          new BoundedOutOfOrdernessTimestampExtractor[StationLog](Time.seconds(2)) {
            //延迟2s
            override def extractTimestamp(t: StationLog): Long = {
              t.callTime
            }
          }
        )
        //分组,开窗处理
        // 定义一个侧输出流的 标签
        val lateTage = new OutputTag[StationLog]("late")
        val mainStream = data.keyBy(_.sid)
          .timeWindow(Time.seconds(10), Time.seconds(5))
          // 注意: 只要符合 watermark < end-of-window + allowedLateness 之内达到
          // 的数据,都会被再次出发窗口的计算
          // 迟到之外的迟到数据会被放入侧输出流
          .allowedLateness(Time.seconds(5)) // 允许数据迟到5s
          // 迟到的数据, 输出另一个位置保存
          .sideOutputLateData(lateTage)
          .aggregate(new AggregateCount, new OutputResult)
        // 迟到很久的数据可以另外再处理
        mainStream.getSideOutput(lateTage).print("late")// 迟到很久的数据可以另外再处理
        mainStream.print("main")
        env.execute()
    
      }
    
      /**
        * 累加聚合操作
        */
      class AggregateCount extends AggregateFunction[StationLog, Long, Long] {
        // 创建累加器
        override def createAccumulator(): Long = 0
        // 累加器加值
        override def add(value: StationLog, accumulator: Long): Long = accumulator + 1
        // 获取累加器结果
        override def getResult(accumulator: Long): Long = accumulator
        //合并Value
        override def merge(a: Long, b: Long): Long = a + b
      }
    
      class OutputResult extends WindowFunction[Long, String, String, TimeWindow] {
        override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[String]): Unit = {
          var sb = new StringBuilder
          val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
          sb.append(s"时间范围: Start: ${format.format(window.getStart)},----${format.format(window.getEnd)}")
          sb.append("\n")
          sb.append("\n")
          sb.append("当前基站是:").append(key)
            .append(" 呼叫数量是: ").append(input.iterator.next())
          out.collect(sb.toString())
        }
      }
    }


多多转发关注不迷路,感谢大家支持!!!

点击这里复制本文地址 以上内容由nimo97整理呈现,请务必在转载分享时注明本文地址!如对内容有疑问,请联系我们,谢谢!
qrcode

尼墨宝库 © All Rights Reserved.  蜀ICP备2024111239号-7