美文网首页
2020-02-27-ModelCode

2020-02-27-ModelCode

作者: inspiredhss | 来源:发表于2020-03-01 16:10 被阅读0次

    召回策略:热度,LBS,user tag,itemcf,频繁模式挖掘,二部图挖掘,embedding(word2vec、fasttext、bert),deep match排序策略,learning to rank 流程三大模式(pointwise、pairwise、listwise),常见的特征挖掘(user、item、context,以及相互交叉),ctr预估(lr、gbdt、fm、ffm、dnn、widedeep、dcn、deepfm)探索与发现(bandit、Q-Learning、DQN)

    LSH算法的优势是,可以在线性时间内获取相似的topK向量,类似于搜索引擎和NLP算法实现,大多数情况下无法对全量数据进行计算,这样复杂度是n的平方,海量数据n的平方复杂度是可怕的。局部敏感哈希的基本思想类似于一种空间域转换思想,LSH算法基于一个假设,如果两个向量在原有的数据空间是相似的,那么分别经过哈希函数转换以后的它们也具有很高的相似度;相反,如果它们本身是不相似的,那么经过转换后它们应仍不具有相似性。

    import java.text.SimpleDateFormat
    import java.util.{Calendar, Date}
    
    import org.apache.spark.sql.{DataFrame, SparkSession}
    import org.apache.spark.{SparkConf, SparkContext, mllib}
    import org.apache.spark.mllib.linalg.{Vector, Vectors}
    import org.apache.spark.mllib.recommendation.ALS
    import com.soundcloud.lsh.{Cosine, Lsh, NearestNeighbours}
    import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix}
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.mllib.recommendation.Rating
    
    import scala.collection.mutable.ArrayBuffer
    object CosineLSHJoinSpark {
    
      def main(args: Array[String]){
    
        val conf = new SparkConf().setAppName("CosineLSHJoinSpark")
        val sc = new SparkContext(conf)
        val sqlContext = new org.apache.spark.sql.SQLContext(sc)
        val df = sqlContext.read.format("orc").load(getLastNDaysPath(15))
        df.show
    
        val (ratings,productMap) = datatransform_formkpixel(df)
        ratings.take(20).foreach(println)
        // Build the recommendation model using ALS
        val rank = 10
        val numIterations = 10
        val model = ALS.train(ratings, rank, numIterations, 0.02)
    
        val vectorArray: ArrayBuffer[IndexedRow] = new ArrayBuffer[IndexedRow]()
        val fdata=model.productFeatures.collect()
    
        for(i <- 0 until fdata.length) {
          if(fdata(i)!=null && fdata(i)._1!=null && fdata(i)._2!=null) {
            val t=fdata(i)  //(int,Array[double]) (productID,vector)
            val ir=IndexedRow(i,Vectors.dense(t._2))
            vectorArray.append(ir)
          }
        }
        val idxrows = sc.parallelize(vectorArray)
        val idxmat: IndexedRowMatrix = new IndexedRowMatrix(idxrows)
        /*
            val rows = Seq(
              IndexedRow(1, Vectors.dense(1.0, 1.0, 1.0)),
              IndexedRow(2, Vectors.dense(2.0, 2.0, 2.0)),
              IndexedRow(5, Vectors.dense(6.0, 3.0, 2.0))
            )
            val matrix = new IndexedRowMatrix(sparkSession.sparkContext.parallelize(rows))
            */
        val lsh = new Lsh(
          minCosineSimilarity = 0.1,
          dimensions = 20,
          numNeighbours = 10,
          numPermutations = 2
        )
        val similariyMatrix = lsh.join(idxmat)
    
        val orderTable=similariyMatrix.entries.groupBy(tup => tup.i).flatMap(tup =>{
          tup._2.toList.sortWith((a,b) =>a.value>b.value)
        })
    
        val results = orderTable.map {
          entry =>
            "%s %s %.6f".format(productMap(entry.i), productMap(entry.j), entry.value)
        }
    
        results.take(20).foreach(println)
        results.saveAsTextFile("this is save path ")
    
        // above will print:
        // item:2 item:5 cosine:0.91
        // item:1 item:5 cosine:0.91
        // item:1 item:2 cosine:1,00
      }
      def datatransform(df:DataFrame) ={
        val r = df.rdd
        val affData = r.flatMap(row => {
          val result: ArrayBuffer[(String, String)] = new ArrayBuffer[(String, String)]()
          val ifa: String = if (row(0) != null) row.getString(0) else null
          val bundle: Array[String] = if (row(1) != null) row.getSeq[String](1).toArray[String] else null
          if (bundle != null && bundle.length > 1) {
            bundle.foreach(b => {
              result.append((ifa, b.trim))
            })
          }
          result
        }).filter(x => x._1 != null && x._2 != null )
        val stringData=affData.map(x =>(x._1,x._2,1))
    
        //get distinct names and products and create maps from them
        val ifaname = stringData.map(_._1).distinct.sortBy(x => x).zipWithIndex.collectAsMap
        val products = stringData.map(_._2).distinct.sortBy(x => x).zipWithIndex.collectAsMap
    
        val data_rating=stringData.map(r => Rating(ifaname(r._1).toInt,products(r._2).toInt,r._3))
        val reproducts=products.map(line=>(line._2,line._1))
        (data_rating,reproducts)
      }
      def datatransform_formkpixel(df:DataFrame) ={
        val r = df.rdd
        val affData = r.flatMap(row => {
          val result: ArrayBuffer[(String, String)] = new ArrayBuffer[(String, String)]()
          val ifa: String = if (row(8) != null) row.getString(8) else null
          val content = if (row(16) != null) row.getString(16) else null
          val content_ids: Array[String] = if (content != null) content.split(",") else null
          if (content_ids != null && content_ids.length > 1) {
            content_ids.foreach(b => {
              result.append((ifa, b.trim))
            })
          }
          result
        }).filter(x => x!=null && x._1 != null && x._2 != null)
        val stringData=affData.map(x =>(x._2,x._1,1)) //(content id,ifa,1)
    
        //get distinct names and products and create maps from them
        val ifaname = stringData.map(_._1).distinct.sortBy(x => x).zipWithIndex.collectAsMap
        val products = stringData.map(_._2).distinct.sortBy(x => x).zipWithIndex.collectAsMap
    
        val data_rating=stringData.map(r => Rating(ifaname(r._1).toInt,products(r._2).toInt,r._3))
        val reproducts=products.map(line=>(line._2,line._1))
        (data_rating,reproducts)
      }
    
      def getLastNDaysPath(days: Int ): String = {
        //println(date)
        val dateFormat: SimpleDateFormat = new SimpleDateFormat( "yyyyMMdd" )
        val date=dateFormat.format(new Date())
        val dateF: Date = dateFormat.parse(date)
        val cal: Calendar = Calendar.getInstance()
        cal.setTime(dateF)
        val dateArr = new ArrayBuffer[String]()
        //dateArr.append(date)
        var i = 1
        while ( i <= days ) {
          cal.add( Calendar.DATE, -i )
          dateArr.append(dateFormat.format( cal.getTime() ))
          i = i + 1
          cal.setTime(dateF)
        }
        val path = "this is path prefix/{"+ dateArr.mkString(",") +"}/*/*"
        println(path)
        path
      }
    }
    

    相关文章

      网友评论

          本文标题:2020-02-27-ModelCode

          本文链接:https://www.haomeiwen.com/subject/ljxqhhtx.html