Apache Flink中DataStream基本转换
很多刚刚接触Flink的人会被其中各种类型的stream绕晕,DataStream作为最基础的流处理类是Stream流转换的中心,通过一些方法可以和其它类型的流相互转换,今天来说一下它们是如何互相转换。

1、Stream类型介绍:
flink中的stream主要有以下几种类型:
- DataStream:表示无界的数据流,可以包含任何类型的数据元素,可以使用各种算子进行转换和处理。
- KeyedStream:表示按照指定的key或字段进行分区的DataStream,可以使用聚合、分组、窗口等算子进行操作。
- ConnectedStream:表示连接两个类型不同的DataStream的结果,可以使用coMap或coFlatMap等算子对两个源数据流进行不同的处理。
- WindowedStream:表示按照时间或者数量进行划分的KeyedStream,可以使用窗口函数对每个窗口内的数据进行计算。
- SplitStream:表示按照指定的条件将一个DataStream拆分为多个DataStream的结果,可以使用select算子选择其中一个或多个拆分流。
- JoinedStreams:表示两个KeyedStream按照相同的key进行连接的结果,可以使用where、equalTo和join等算子指定连接条件和函数。
- CoGroupedStreams:表示两个KeyedStream按照相同的key进行分组的结果,可以使用where、equalTo和coGroup等算子指定分组条件和函数。
2、转换方法:
keyBy转换:将一个datastream按照指定的key或字段进行分区,得到一个keyedstream。keyedstream中具有相同key的元素会被分配到同一个算子实例中。
聚合转换:将一个keyedstream按照指定的聚合函数(如sum,min,max等)进行滚动计算,得到一个datastream。datastream中的每个元素是一个聚合结果。
reduce转换:将一个keyedstream按照指定的reduce函数进行滚动组合,得到一个datastream。datastream中的每个元素是一个组合结果。
fold转换:将一个keyedstream按照指定的fold函数和初始值进行滚动折叠,得到一个datastream。datastream中的每个元素是一个折叠结果。
union转换:将两个或多个类型相同的datastream合并为一个datastream。datastream中的元素是来自不同源的数据流的元素。
connect转换:将两个类型不同的datastream连接为一个connectedstream。connectedstream中的元素是两个源数据流的元素的组合。
coMap或coFlatMap转换:将一个connectedstream按照指定的map或flatMap函数分别对两个源数据流进行处理,得到一个datastream。datastream中的元素是处理后的结果。
filter转换:将一个数据流中的每个元素应用一个布尔函数,只保留返回true的元素,得到一个新的数据流。
示例:
// keyBy转换
DataStream<SensorReading> sensorData = env.addSource(new SensorSource());
KeyedStream<SensorReading, String> keyedSensorData = sensorData.keyBy(r -> r.id);
// 聚合转换
DataStream<SensorReading> avgTemp = keyedSensorData.timeWindow(Time.seconds(5)).apply(new TemperatureAverager());
// reduce转换
DataStream<SensorReading> minTemp = keyedSensorData.reduce((r1, r2) -> {
if (r1.temperature < r2.temperature) {
return r1;
} else {
return r2;
}
});
// fold转换
DataStream<String> result = keyedSensorData.fold("start", (current, r) -> current + "-" + r.temperature);
// union转换
DataStream<SensorReading> high = sensorData.filter(r -> r.temperature > 25);
DataStream<SensorReading> low = sensorData.filter(r -> r.temperature <= 25);
DataStream<SensorReading> all = high.union(low);
// connect转换
DataStream<Tuple2<String, Double>> warning = high.map(r -> new Tuple2<>(r.id, r.temperature));
ConnectedStream<Tuple2<String, Double>, SensorReading> connected = warning.connect(low);
// coMap转换
DataStream<String> coMap = connected.map(
warningData -> warningData.f0 + " " + warningData.f1 + " warning",
lowData -> lowData.id + " healthy"
);
// map转换
DataStream<String> ds = env.fromElements("Good good study","Day day up");
DataStream<String> ds_map = ds.map(String::toLowerCase);
ds_map.print();
// flatMap转换
DataStream<String> ds = env.fromElements("Good good study","Day day up");
DataStream<String> ds_flatmap = ds.flatMap((value, out) -> {
for (String word: value.split("\\W+")) {
out.collect(word);
}
}).returns(Types.STRING);
ds_flatmap.print();
// filter转换
DataStream<String> ds = env.fromElements("Good good study","Day day up");
DataStream<String> ds_filter = ds.filter(s -> s.contains("study"));
ds_filter.print();
使用 window() 方法将 KeyedStream 转换成 WindowedStream。12
WindowedStream 是一个对键分组的流进行窗口化的数据流,它使用 WindowAssigner 将元素放入窗口中。窗口中的元素既按键也按窗口分组。你可以定义一个 Trigger 来指定何时评估窗口。
KeyedStream<Tuple2<String, Integer>, String> keyedStream = ...;
WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowedStream = keyedStream
.window(TumblingEventTimeWindows.of(Time.seconds(5))); // 按事件时间滚动窗口
使用 apply() 或 process() 方法将 WindowedStream 转换成 DataStream。
apply() 方法接受一个 AllWindowFunction 或 WindowFunction 作为参数,它可以对每个窗口中的元素进行操作,并输出一个或多个结果。
process() 方法接受一个 ProcessAllWindowFunction 或 ProcessWindowFunction 作为参数,它可以访问窗口的元数据,并输出一个或多个结果。
WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowedStream = ...;
DataStream<Tuple2<String, Integer>> dataStream = windowedStream
.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
@Override
public void apply(String key, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) {
// 对每个窗口中的元素进行操作,并输出结果
}
});