import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Map;
public class MapStateExample {
public static void main(String[] args) throws Exception {
// 设置执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 定义一个数据源,这里使用集合作为示例数据源
DataStream dataStream = env.fromElements("apple:5", "banana:3", "apple:2", "orange:4", "apple:22");
// 定义MapState的描述符,指定键和值的类型
MapStateDescriptor descriptor = new MapStateDescriptor<>(
"zx", // 状态的名字
String.class, // 键的类型
Integer.class // 值的类型
);
// 使用RichFlatMapFunction来操作数据并使用MapState
dataStream
.map(new MapFunction<String, Tuple2>() {
@Override
public Tuple2 map(String value) throws Exception {
String[] vv = value.split(":");
return new Tuple2<>(vv[0], vv[1]);
}
})
.keyBy(0)
.flatMap(new RichFlatMapFunction<Tuple2, String>() {
private transient MapState itemsState;
@Override
public void open(Configuration parameters) {
// 在open方法中初始化状态
itemsState = getRuntimeContext().getMapState(descriptor);
}
@Override
public void flatMap(Tuple2 value, Collector out) throws Exception {
// 解析键值对
String p1 = value.f1;
String useKey = value.f0;
int count = Integer.parseInt(p1);
// 更新或添加状态中的项
Integer currentCount = itemsState.get(useKey);
if (currentCount == null) {
itemsState.put(useKey, count);
} else {
itemsState.put(useKey, currentCount + count);
}
for (Map.Entry entry : itemsState.entries()) {
out.collect(entry.getKey() + ":" + entry.getValue());
}
}
}).print();
// 执行程序
env.execute("MapState Example");
}
}
输入:
apple:5
banana:3
apple:2
orange:4
apple:22
输出:
4> banana:3
4> orange:4
5> apple:2
5> apple:7 ----5+2
5> apple:29 -----5+2+22
使用 key value的mapstate的时候,必须要在keyBy之后,因为只有通过这个api后相同的key才会分配在同一个task中,而mapState是存在的内容是存储在task上的,
如果没有使用keyBy 一是程序运行会报错,而是通过context获取到的结果也会错误。
这也算是flink的一个尝试,记录中间状态并调整后续的结果,不过也联想到一旦key的个数过多,一个task manager的内存可能吃不消,可能出现问题