Apache Flink中DataStream基本转换

Apache Flink中DataStream基本转换

经验文章nimo972024-12-24 10:56:5311A+A-

很多刚刚接触Flink的人会被其中各种类型的stream绕晕,DataStream作为最基础的流处理类是Stream流转换的中心,通过一些方法可以和其它类型的流相互转换,今天来说一下它们是如何互相转换。

1、Stream类型介绍:

flink中的stream主要有以下几种类型:

  • DataStream:表示无界的数据流,可以包含任何类型的数据元素,可以使用各种算子进行转换和处理。
  • KeyedStream:表示按照指定的key或字段进行分区的DataStream,可以使用聚合、分组、窗口等算子进行操作。
  • ConnectedStream:表示连接两个类型不同的DataStream的结果,可以使用coMap或coFlatMap等算子对两个源数据流进行不同的处理。
  • WindowedStream:表示按照时间或者数量进行划分的KeyedStream,可以使用窗口函数对每个窗口内的数据进行计算。
  • SplitStream:表示按照指定的条件将一个DataStream拆分为多个DataStream的结果,可以使用select算子选择其中一个或多个拆分流。
  • JoinedStreams:表示两个KeyedStream按照相同的key进行连接的结果,可以使用where、equalTo和join等算子指定连接条件和函数。
  • CoGroupedStreams:表示两个KeyedStream按照相同的key进行分组的结果,可以使用where、equalTo和coGroup等算子指定分组条件和函数。


2、转换方法:

keyBy转换:将一个datastream按照指定的key或字段进行分区,得到一个keyedstream。keyedstream中具有相同key的元素会被分配到同一个算子实例中。

聚合转换:将一个keyedstream按照指定的聚合函数(如sum,min,max等)进行滚动计算,得到一个datastream。datastream中的每个元素是一个聚合结果。

reduce转换:将一个keyedstream按照指定的reduce函数进行滚动组合,得到一个datastream。datastream中的每个元素是一个组合结果。

fold转换:将一个keyedstream按照指定的fold函数和初始值进行滚动折叠,得到一个datastream。datastream中的每个元素是一个折叠结果。

union转换:将两个或多个类型相同的datastream合并为一个datastream。datastream中的元素是来自不同源的数据流的元素。

connect转换:将两个类型不同的datastream连接为一个connectedstream。connectedstream中的元素是两个源数据流的元素的组合。

coMap或coFlatMap转换:将一个connectedstream按照指定的map或flatMap函数分别对两个源数据流进行处理,得到一个datastream。datastream中的元素是处理后的结果。

filter转换:将一个数据流中的每个元素应用一个布尔函数,只保留返回true的元素,得到一个新的数据流。

示例:

// keyBy转换
DataStream<SensorReading> sensorData = env.addSource(new SensorSource());
KeyedStream<SensorReading, String> keyedSensorData = sensorData.keyBy(r -> r.id);


// 聚合转换
DataStream<SensorReading> avgTemp = keyedSensorData.timeWindow(Time.seconds(5)).apply(new TemperatureAverager());


// reduce转换
DataStream<SensorReading> minTemp = keyedSensorData.reduce((r1, r2) -> {
  if (r1.temperature < r2.temperature) {
    return r1;
  } else {
    return r2;
  }
});


// fold转换
DataStream<String> result = keyedSensorData.fold("start", (current, r) -> current + "-" + r.temperature);


// union转换
DataStream<SensorReading> high = sensorData.filter(r -> r.temperature > 25);
DataStream<SensorReading> low = sensorData.filter(r -> r.temperature <= 25);
DataStream<SensorReading> all = high.union(low);


// connect转换
DataStream<Tuple2<String, Double>> warning = high.map(r -> new Tuple2<>(r.id, r.temperature));
ConnectedStream<Tuple2<String, Double>, SensorReading> connected = warning.connect(low);


// coMap转换
DataStream<String> coMap = connected.map(
  warningData -> warningData.f0 + " " + warningData.f1 + " warning",
  lowData -> lowData.id + " healthy"
);
// map转换
DataStream<String> ds = env.fromElements("Good good study","Day day up");
DataStream<String> ds_map = ds.map(String::toLowerCase);
ds_map.print();


// flatMap转换
DataStream<String> ds = env.fromElements("Good good study","Day day up");
DataStream<String> ds_flatmap = ds.flatMap((value, out) -> {
  for (String word: value.split("\\W+")) {
    out.collect(word);
  }
}).returns(Types.STRING);
ds_flatmap.print();


// filter转换
DataStream<String> ds = env.fromElements("Good good study","Day day up");
DataStream<String> ds_filter = ds.filter(s -> s.contains("study"));
ds_filter.print();


使用 window() 方法将 KeyedStream 转换成 WindowedStream。12

WindowedStream 是一个对键分组的流进行窗口化的数据流,它使用 WindowAssigner 将元素放入窗口中。窗口中的元素既按键也按窗口分组。你可以定义一个 Trigger 来指定何时评估窗口。

KeyedStream<Tuple2<String, Integer>, String> keyedStream = ...;
WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowedStream = keyedStream
    .window(TumblingEventTimeWindows.of(Time.seconds(5))); // 按事件时间滚动窗口


使用 apply() 或 process() 方法将 WindowedStream 转换成 DataStream。

apply() 方法接受一个 AllWindowFunction 或 WindowFunction 作为参数,它可以对每个窗口中的元素进行操作,并输出一个或多个结果。

process() 方法接受一个 ProcessAllWindowFunction 或 ProcessWindowFunction 作为参数,它可以访问窗口的元数据,并输出一个或多个结果。

WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowedStream = ...;
DataStream<Tuple2<String, Integer>> dataStream = windowedStream
    .apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
        @Override
        public void apply(String key, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) {
            // 对每个窗口中的元素进行操作,并输出结果
        }
    });

点击这里复制本文地址 以上内容由nimo97整理呈现,请务必在转载分享时注明本文地址!如对内容有疑问,请联系我们,谢谢!
qrcode

尼墨宝库 © All Rights Reserved.  蜀ICP备2024111239号-7