美文网首页
Spark kyro Serialization

Spark kyro Serialization

作者: breeze_lsw | 来源:发表于2016-04-13 21:33 被阅读498次

    序列化在分布式系统中扮演着重要的角色,优化Spark程序时,首当其冲的就是对序列化方式的优化。Spark为使用者提供两种序列化方式:
    Java Serialization: 默认的序列化方式。
    Kryo Serialization: 相较于 Java Serialization 的方式,速度更快,空间占用更小,但并不支持所有的序列化格式,同时使用的时候需要注册class。spark-sql中默认使用的是kyro的序列化方式。
    下文将会讲解kryo的使用方式并对比性能。

    配置

    可以在spark-default.conf设置全局参数,也可以代码中初始化时对SparkConf设置 conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") ,该参数会同时作用于机器之间数据的shuffle操作以及序列化rdd到磁盘,内存。

    Spark不将Kyro设置成默认的序列化方式是因为它需要对类进行注册,官方强烈建议在一些网络数据传输很大的应用中使用kyro序列化。

    val conf = new SparkConf()
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    conf.registerKryoClasses(Array(classOf[MyClass1],classOf[MyClass2]))
    val sc = new SparkContext(conf)
    

    如果你要序列化的对象比较大,可以增加参数spark.kryoserializer.buffer所设置的值。

    如果你没有注册需要序列化的class,Kyro依然可以照常工作,但会存储每个对象的全类名(full class name),这样的使用方式往往比默认的 Java serialization 还要浪费更多的空间。

    可以设置 spark.kryo.registrationRequired 参数为 true,使用kyro时如果在应用中有类没有进行注册则会报错:

    这里写图片描述

    如上这个错误需要添加

    sparkConf.registerKryoClasses(
        Array(classOf[scala.collection.mutable.WrappedArray.ofRef[_]],
        classOf[MyClass]))
    

    下面的 demo 将会演示不同方式的序列化对空间占用的情况。

    DEMO

    case class Info(name: String ,age: Int,gender: String,addr: String)
    
    object KyroTest {
      def main(args: Array[String]) {
      
      val conf = new SparkConf().setMaster("local[2]").setAppName("KyroTest")
          conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
          conf.registerKryoClasses(Array(classOf[Info]))
      val sc = new SparkContext(conf)
    
      val arr = new ArrayBuffer[Info]()
    
      val nameArr = Array[String]("lsw","yyy","lss")
      val genderArr = Array[String]("male","female")
      val addressArr = Array[String]("beijing","shanghai","shengzhen","wenzhou","hangzhou")
    
      for(i <- 1 to 1000000){
        val name = nameArr(Random.nextInt(3))
        val age = Random.nextInt(100)
        val gender = genderArr(Random.nextInt(2))
        val address = addressArr(Random.nextInt(5))
        arr.+=(Info(name,age,gender,address))
        }
    
      val rdd = sc.parallelize(arr)
    
      //序列化的方式将rdd存到内存
      rdd.persist(StorageLevel.MEMORY_ONLY_SER)
      rdd.count()
      }
    }
    

    结果

    可以在web ui中看到缓存的rdd大小:

    这里写图片描述
    序列化方式 是否注册 空间占用
    kyro 21.1 MB
    kyro 38.3 MB
    Java 25.1 MB

    相关文章

      网友评论

          本文标题:Spark kyro Serialization

          本文链接:https://www.haomeiwen.com/subject/mzcglttx.html