一、Spark 的序列化
序列化
Spark 是一个高性能、分布式的、基于内存计算的计算引擎,Spark 集群中包含多个节点,各节点之间要进行通信
(比如数据传输,Spark 通过 RPC 进行节点间的通信),因而必定存在序列化
(对象转字节数组)和反序列化
(字节数组转对象)。
二、Java Serialization 和 Kryo Serialization
Spark 目前支持两种序列化机制:java native serialization
和 kryo serialization
,默认
使用的是Java native serialization
。两者的区别:
类别 | 优点 | 缺点 | 备注 |
---|---|---|---|
java native serialization | 兼容性好、和scala更好融合 | 序列化性能较低、占用内存空间大(一般是Kryo Serialization 的10倍) | 默认的serializer |
Kryo Serialization | 序列化速度快、占用空间小(即更紧凑) | 不支持所有的Serializable类型、且需要用户注册要进行序列化的类class | shuffle的数据量较大或者较为频繁时建议使用 |
三、Spark 中使用 Kryo Serialization
要在Spark 中使用 Kryo 完成序列化和反序列化,需要完成 3.1 和 3.2 两样配置:
3.1 将配置项spark.serializer
设置为
关于配置项的设置优先级可以参考博客:https://www.jianshu.com/p/15cd9844c5a1
org.apache.spark.serializer.KryoSerializer
可以在配置文件spark-default.conf中添加该配置项(全局生效),比如:
spark.serializer org.apache.spark.serializer.KryoSerializer
或者在业务代码中通过SparkConf进行配置(针对当前application生效),比如:
val spark = SparkSession.builder().master("local[*]").appName("test").getOrCreate()
val conf = new SparkConf
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
又或者在spark-shell、spark-submit脚本中启动,可以在命令中加上:
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer
3.2 注册自定义类(非必须,但是强烈建议做)
......
conf.registerKryoClasses(Array(classOf[Test1], classOf[Test2]))
// 其中Test1.java 和 Test2.java 是自定义的类
如果是scala类Test1(scala中的trait就相当于java中的接口):
class Test1 extends Serializable {
......
}
如果是java类Test2:
public class Test2 implements Serializable {
......
}
注意:虽说该步不是必须要做的(不做Kryo仍然能够工作),但是如果不注册的话,Kryo会存储自定义类中用到的所有对象的类名全路径,这将会导致耗费大量内存。
3.3 配置 spark.kryoserializer.buffer
如果要被序列化的对象很大,这个时候就最好将配置项spark.kryoserializer.buffer
的值(默认64k)设置的大些,使得其能够hold要序列化的最大的对象。
水平有限,如有错误,敬请指正!
网友评论