美文网首页
Greedy Step Averaging A paramete

Greedy Step Averaging A paramete

作者: kiminh | 来源:发表于2016-12-26 11:38 被阅读0次

    Fregdata

    https://github.com/TalkingData/Fregata

    Combining Features Logistic Regression
    • data including age, gender, marital status ...
    • data including internet behavior ...
    • data including phone brand ...
    • data including APP installed ...
    imageimage imageimage imageimage

    真正计算的模块

    Logistic regression derived from approximation formula
    imageimage
    Averaging Scheme
    imageimage imageimage
    运行接口,通过参数传入真正的
    //fregata.spark.model.classification
    class LogisticRegressionModel(val model:LLogisticRegressionModel) extends ClassificationModel
    
    object LogisticRegression {
    
      /**
        *
        * @param data
        * @param localEpochNum the local model epoch num of every partition
        * @param epochNum
        * @return
        */
      def run(data:RDD[(Vector,Num)],
              localEpochNum:Int = 1 ,
              epochNum:Int = 1) = {
        //class LogisticRegression extends ModelTrainer      
        val trainer = new LLogisticRegression
        new SparkTrainer(trainer)
          .run(data,epochNum,localEpochNum)
        
        //def buildModel(ps:ParameterServer) = new LogisticRegressionModel(ps.get(0))
        new LogisticRegressionModel(trainer.buildModel(trainer.ps))
      }
    }
    
    
    //fregata.spark.model.SparkTrainer
    class SparkTrainer(trainer:ModelTrainer) {
    
      def run(data:RDD[(Vector,Num)],epochNum:Int,localEpochNum:Int) {
        (0 until epochNum).foreach{
          i =>
            run(data,localEpochNum)
        }
      }
    
      def run(data:RDD[(Vector,Num)],localEpochNum:Int) {
        val _trainer = this.trainer
        val br_opt = data.sparkContext.broadcast(_trainer)
        val pn = data.partitions.length
        
        
        //  def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U],preservesPartitioning: Boolean = false): RDD[U] = withScope {
        //  val cleanedF = sc.clean(f)
        //  new MapPartitionsRDD(
        //  this,
        //  (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
        //  preservesPartitioning)
        //  }
        
        //每个分区中的内容将以Iterator[T]传递给输入函数f,f的输出结果是Iterator[U]。最终的RDD由所有分区经过输入函数处理后的结果合并起来的
        
        val ws = data.mapPartitions{
          it =>
            val local_opt = br_opt.value
            
            //真正的run函数
            local_opt.run(it.toIterable,localEpochNum)
            Iterator( local_opt.ps.get )
        }.treeReduce{
          (a,b) =>
            a.zip(b).map{
              case (w1,w2) => w1 + w2
            }
        }
        ws.foreach{
          w =>
          val values = w match {
            case w : DenseVector => w.data
            case w : SparseVector => w.data
          }
          var i = 0
          while( i < values.length ) {
            values(i) /= pn
            i += 1
          }
        }
        trainer.ps.set(ws)
      }
    }
    
    
    通过parameter server获取参数
    class LogisticGradient(ps:ParameterServer) extends Gradient {
      val thres = 0.95
      val update = Array(0.0)
      var stepSize = 0.0
      var i = 0.0
      
      //重载caculate,计算梯度更新值
      def calculate(x:Vector,label:Num) : Array[Num] = {
        var weight = ps.get
        if( weight == null ) {
          //更新权重 1*(n+1) 带bias
          ps.init(1,x.length + 1)
          weight = ps.get
        }
        val lambda = i / ( i + 1 )
        i += 1
        val margin = VectorUtil.wxpb(weight(0),x,1.0)
        val p1 = 1.0 / ( 1.0 + math.exp( - margin ) )
        val p0 = 1 - p1
        val b1 = math.exp(p1)
        val b0 = math.exp(p0)
        val x2 = math.pow(norm(x),2.0)
        // compute greedy step size
        val greedyStep = if( label == 1 ) {
          (p1 - thres) / ( thres * (1 - p0 * b0 - p1 * b1) + p1 * (1 - b0) ) / x2
        }else{
          (p0 - thres) / ( thres * (1 - p0 * b0 - p1 * b1 ) + p0 * (1 - b1)) / x2
        }
        // compute averaged step size
        stepSize = lambda * stepSize + (1 - lambda) * greedyStep
        update(0) = 2 * ( p1 - label ) * stepSize
        update
      }
    }
    
    
    //LogisticRegerssion.scala
    class LogisticRegressionModel(val weights:Vector) extends ClassificationModel{
    
      var threshold = 0.5
      def setThreshold(t:Double) : this.type = {
        this.threshold = t
        this
      }
      def classPredict(x: Vector): (Num, Num) = {
        val margin = VectorUtil.wxpb(weights,x,1.0)
        val p = 1.0 / ( 1.0 + math.exp( - margin ) )
        val c = if( p > threshold ) 1.0 else 0.0
        (asNum(p),asNum(c))
      }
    }
    
    class LogisticRegression extends ModelTrainer {
      override type M = LogisticRegressionModel
      val ps = newPs    //ps
      val gradient = new LogisticGradient(ps)   //定义计算梯度
      def buildModel(ps:ParameterServer) = new LogisticRegressionModel(ps.get(0))
      def run( data:Iterable[(Vector,Num)] ) = {
      
        !!! 在run函数中通过Target这个case class传入gradient 和ps
        val target = Target(gradient,ps)
        new AdaptiveSGD()
          .minimize(target)
          .run(data)
        new LogisticRegressionModel(ps.get(0))
      }
    }
    
    //ModelTrainer.scala  core
    trait ModelTrainer extends Serializable{
    
      type M
    
      def newPs = ParameterServer.create   //创建parameter serever
      def ps : ParameterServer
      def buildModel(ps:ParameterServer) : M
    
    
        //http://stackoverflow.com/questions/38289353/using-scala-trait-as-a-callback-interface
        //callback 语法糖
      def run(data:Iterable[(Vector,Num)],epochNum:Int ,callback : (M,Int) => Unit = null) : M = {
        var model : Any = null
        (0 until epochNum).foreach{
          i =>
            model = run(data)
            if( callback != null ) callback(model.asInstanceOf[M],i)
        }
        model.asInstanceOf[M]
      }
    
      def run(data:Iterable[(Vector,Num)]) : M
    
    }
    
    
    
    定义一个case class,其中需要Gradient和Parameter Server
    case class Target(val gradient : fregata.optimize.Gradient, val ps : fregata.param.ParameterServer) extends scala.AnyRef with scala.Product with scala.Serializable {
    }
    
    
    trait Gradient extends Serializable{
      def calculate(x:Vector,label:Num) : Array[Num]
    }
    
    
    SGD优化方法
    //AdaptiveSGD.scala
    Ad
    
    class AdaptiveSGD extends StochasticGradientDescent {
    
      override def stepSize(i:Int,x:Vector) = asNum(1d)
    
    }
    
    //StochasticGradientDescent.scala
    class StochasticGradientDescent extends Minimizer {
    
      private var eta = asNum(.1)
      def setStepSize(eta:Num) : this.type = {
        this.eta = eta
        this
      }
      protected def stepSize(itN:Int,x:Vector) = eta
    
      def run(data:TraversableOnce[(Vector,Num)]) = {
        var i = 0
        data.foreach{
          case (x,label) =>
            val gradients = target.gradient.calculate(x,label)
            val step = stepSize(i,x)
            val delta = gradients.map( v => asNum( v * step ) )
            target.ps.adjust(delta,x)
            i += 1
        }
      }
    }
    
    //Minimizer.scala
    trait Minimizer extends Optimizer {
      private[this] var _target : Target = _
      def minimize(target: Target) : this.type = {
        this._target = target
        this
      }
      def target = _target
    }
    
    //Optimizer.scala
    trait Optimizer extends Serializable {
      def run(data:TraversableOnce[(Vector,Num)])
    }
    
    object ParameterServer {
      def create : ParameterServer = new LocalParameterServer
    }
    
    trait ParameterServer extends Serializable {
      def init(rows:Int,cols:Int)
      def adjust(delta:Array[Num],x:Vector)
      def get : Array[Vector]
      def set(ps:Array[Vector])
    }
    
    class LocalParameterServer extends ParameterServer {
    
      private[this] var ps : Array[DenseVector] = null
      private[this] var values : Array[Array[Num]] = null
    
      def init(rows:Int,cols:Int) = {
        values = Array.fill(rows)( Array.ofDim[Num](cols) )
        ps = values.map( new DenseVector(_) )
      }
      
      //Array(Array(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0))
      //Array(DenseVector(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0))
    
      def set(ps:Array[Vector]) = this.ps = ps.map( _.toDenseVector )
      def adjust(delta:Array[Num],x:Vector) = {
        var k = 0
        while( k < delta.length ) {
          val d = delta(k)
          VectorUtil.forV(x,(i,xi) =>{
            values(k)(i) -= d * xi
          })
          k += 1
        }
      }
      def get : Array[Vector] = ps.asInstanceOf[Array[Vector]]
    }
    
    

    相关文章

      网友评论

          本文标题:Greedy Step Averaging A paramete

          本文链接:https://www.haomeiwen.com/subject/judjvttx.html