Flink state 中的mapState demo

Flink state 中的mapState demo

经验文章nimo972025-03-16 15:46:3812A+A-

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.Map;

public class MapStateExample {
    public static void main(String[] args) throws Exception {
        // 设置执行环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 定义一个数据源,这里使用集合作为示例数据源
        DataStream dataStream = env.fromElements("apple:5", "banana:3", "apple:2", "orange:4", "apple:22");

        // 定义MapState的描述符,指定键和值的类型
        MapStateDescriptor descriptor = new MapStateDescriptor<>(
                "zx", // 状态的名字
                String.class, // 键的类型
                Integer.class // 值的类型
        );

        // 使用RichFlatMapFunction来操作数据并使用MapState
        dataStream
                .map(new MapFunction<String, Tuple2>() {
                    @Override
                    public Tuple2 map(String value) throws Exception {
                        String[] vv = value.split(":");
                        return new Tuple2<>(vv[0], vv[1]);
                    }
                })
                .keyBy(0)
                .flatMap(new RichFlatMapFunction<Tuple2, String>() {
                    private transient MapState itemsState;

                    @Override
                    public void open(Configuration parameters) {
                        // 在open方法中初始化状态
                        itemsState = getRuntimeContext().getMapState(descriptor);
                    }

                    @Override
                    public void flatMap(Tuple2 value, Collector out) throws Exception {
                        // 解析键值对
                        String p1 = value.f1;
                        String useKey = value.f0;
                        int count = Integer.parseInt(p1);
                        // 更新或添加状态中的项
                        Integer currentCount = itemsState.get(useKey);
                        if (currentCount == null) {
                            itemsState.put(useKey, count);
                        } else {
                            itemsState.put(useKey, currentCount + count);
                        }

                        for (Map.Entry entry : itemsState.entries()) {
                            out.collect(entry.getKey() + ":" + entry.getValue());
                        }
                    }
                }).print();

        // 执行程序
        env.execute("MapState Example");
    }
}


输入:
apple:5
banana:3
apple:2
orange:4
apple:22
输出:
4> banana:3
4> orange:4
5> apple:2
5> apple:7 ----5+2
5> apple:29  -----5+2+22

使用 key value的mapstate的时候,必须要在keyBy之后,因为只有通过这个api后相同的key才会分配在同一个task中,而mapState是存在的内容是存储在task上的,

如果没有使用keyBy 一是程序运行会报错,而是通过context获取到的结果也会错误。

这也算是flink的一个尝试,记录中间状态并调整后续的结果,不过也联想到一旦key的个数过多,一个task manager的内存可能吃不消,可能出现问题

点击这里复制本文地址 以上内容由nimo97整理呈现,请务必在转载分享时注明本文地址!如对内容有疑问,请联系我们,谢谢!
qrcode

尼墨宝库 © All Rights Reserved.  蜀ICP备2024111239号-7