大数据之Spark性能优化、内存调优

引用网络插图
1.1.1. 数据序列化
Spark 提供了两个序列化库,Java serialization(by default)和Kryo serialization Kryo serialization相比java Serialization 序列化工具有着更高的效率和更大的压缩比(通来说是java Serialization 的10x倍),但是kryo Serlization 并不支持所有的数据类型,需要提前在程序中注册才可以达到其最佳性能。例如设置kryo Serialization 序列化工具:
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
此配置适用于节点间shuffle 数据和将RDD序列化到磁盘
Kryo Serialization 没有作为默认的序列化工具,是因为其需要自定义注册,但是官方推荐在网络密集型应用中尽量使用kryo Serialization 序列化工具。从spark 2.0 开始,在处理简单类型、或者简单类型和String 类型的数组是spark 内部已经默认使用kryo Serialization工具。向kryo 注册自定义类的方法:
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)
最后,如果您不注册自定义类,Kryo仍然可以工作,但必须将完整的类名与每个对象一起存储,这很浪费。
1.1.2. spark内存调优
默认情况下,java 对象可以实现高效访问,但是相对对象中存储的“原始”数据来说,这很容易消耗其2-5倍的存储空间。
1.1.2.1. 内存管理模型
spark中的内存分为多个部分,UI页面上显示的只是缓存RDD用的storage memory,大约是(总内存 - 300M) * 60% * 50% 的量,所以会偏小。具体内存分配如下图:

引用网络插图
默认情况下spark只使用堆内内存(on-heap Memory),exector端的堆内内存主要分为4大块
--executor-memory = Execution memory + storage + Other
Execution memory 存储shuffles, joins, sorts and aggregations 等过程中的临时数据
storage 存储cache数据
Other 存储rdd 转换所需要的数据,转换关系信息
reserved-memory 存储spark的内部对象
Execution memory 和 storage 在spark1.5之前这个值是固定的,1.5开始这是动态的,即可以相互借用
官方说明:
Memory usage in Spark largely falls under one of two categories: execution and storage. Execution memory refers to that used for computation in shuffles, joins, sorts and aggregations, while storage memory refers to that used for caching and propagating internal data across the cluster. In Spark, execution and storage share a unified region (M). When no execution memory is used, storage can acquire all the available memory and vice versa. Execution may evict storage if necessary, but only until total storage memory usage falls under a certain threshold (R). In other words, R describes a subregion within M where cached blocks are never evicted. Storage may not evict execution due to complexities in implementation.
This design ensures several desirable properties. First, applications that do not use caching can use the entire space for execution, obviating unnecessary disk spills. Second, applications that do use caching can reserve a minimum storage space (R) where their data blocks are immune to being evicted. Lastly, this approach provides reasonable out-of-the-box performance for a variety of workloads without requiring user expertise of how memory is divided internally.
Although there are two relevant configurations, the typical user should not need to adjust them as the default values are applicable to most workloads:
· spark.memory.fraction expresses the size of M as a fraction of the (JVM heap space - 300MB) (default 0.6). The rest of the space (40%) is reserved for user data structures, internal metadata in Spark, and safeguarding against OOM errors in the case of sparse and unusually large records.
· spark.memory.storageFraction expresses the size of R as a fraction of M (default 0.5). R is the storage space within M where cached blocks immune to being evicted by execution.
The value of spark.memory.fraction should be set in order to fit this amount of heap space comfortably within the JVM’s old or “tenured” generation. See the discussion of advanced GC tuning below for details.
1.1.2.2. 确定内存消耗
l 计算一个数据集占用的内存,创建一个RDD 并且放入到缓存,通过webUI 的”storage” 可查询到其耗用的内存
l 评估一个对象的大小,调用 SizeEstimator.estimate()方法可实现。
例如,查看一个广播变量在executor 中占用的内存
val l: Long = SizeEstimator.estimate(df)
1.1.2.3. 调整数据结构
l 减少内存消耗的第一个方法是避免使用需要增加开销的java 特性,比如基于指针的数据结构和包装对象。
l 用对象或基本类型的数据代替scala/java 的集合类,或者用更加高效的fastutils工具集合
l 少用嵌套结构的小对象和指针
l 使用数值Ids或者枚举对象代替字符串keys
l 如果内存小于32G,则设置jvm属性 -XX:+UseCompressedOops (64位系统中对象压缩,jdk 1.6后默认开启)
1.1.2.4. 序列化RDD 保存
如果经过数据结构优化后内存仍然不够,更简单的方式就是序列化的形式保存,缓存不同的存储级别,spark 将保存每一个分区为字节数组。缺点是读取对象时效率低,因为需要反序列化,如果通过kryo序列化工具,相比java Serializaion 效果会好很多。
//缓存
缓存cache()调用的是默认的persist()
12个存储级别
none
memory
disk
memory-ser
memory-disk
memory-disk-ser
OFF_HEAP
离线数据的cache()默认是memory 以反序列化对象的方式缓存
流数据的cache()默认是memory-ser 以序列化对象的方式存储为一个字节数组,节省空间,同时读取时会增加cpu的计算负担
1.1.2.5. 合理设置并行度
Spark 可以自动设置根据rdd 的分区数设置并行度,也可是通过 spark.default.parallelism 更改默认值,官方推荐并行度为集群总cup 的2-3倍。对于并行度的设置,rdd 和 dataset 有所不同

v 上面两个参数都是设置默认的并行度,但是适用的场景不同:
spark.sql.shuffle.partitions是对sparkSQL进行shuffle操作的时候生效,比如 join或者aggregation等操作的时候,之前有个同学设置了spark.default.parallelism 这个并行度为2000,结果还是产生200的stage,排查了很久才发现,是这个原因。
spark.default.parallelism这个参数只是针对rdd的shuffle操作才生效,比如join,reduceByKey
1.1.2.6. 减少任务的内存使用
当内存不足以存放RDDs数据,会抛出OOM 错误,比如groupBykey 操作中的一个reduce task 太大。Spark 的shuffle 操作(sortByKey、groupByKey、reduceBykey、join,etc)的task 会进行grouping, 产生的hashtable 往往比较大,因此容易OOM。最简单的处理方式就是增加并行度,因此每个task 的输入减小。Spark 可以复用一个executor中的jvm 执行多个task,并且其启动成本较低。因此,适当增加并行度也是一个不错的选择
1.1.2.7. 广播变量
广播变量可以有效降低序列化任务的大小和集群任务的启动开销。如果任务从driver 程序中使用的变量太大,则可以将其转换成broadcast 变量。一般来说,变量超过20k 则需要优化。
减少executor和driver之间的数据传输,减少节点间I/O耗时,节约内存开销
1.1.2.8. 数据本地性
数据本地性对spark job执行的性能有主要影响,如果数据与执行代码在一起,则计算较快,反之。
数据本地性是指数据与执行代码的‘远近’关系。官方文档说明如下:
PROCESS_LOCAL data is in the same JVM as the running code. This is the best locality possible
· NODE_LOCAL data is on the same node. Examples might be in HDFS on the same node, or in another executor on the same node. This is a little slower than PROCESS_LOCAL because the data has to travel between processes
· NO_PREF data is accessed equally quickly from anywhere and has no locality preference
· RACK_LOCAL data is on the same rack of servers. Data is on a different server on the same rack so needs to be sent over the network, typically through a single switch
· ANY data is elsewhere on the network and not in the same rack·