美文网首页
数据算法 Hadoop/Spark大数据处理---第十一章

数据算法 Hadoop/Spark大数据处理---第十一章

作者: _Kantin | 来源:发表于2018-07-08 11:21 被阅读10次

本章为马尔可夫模型

输入数据为:[用户,交易号,日期,价格],输入数据为:[配对的马尔可夫状态,数量]

本章实现方式

  1. 基于传统spark来实现
  2. 基于传统Scala来实现

本章实现方式的思路

image

++基于传统spark来实现++

//对输入数据进行切分
     JavaPairRDD<String, Tuple2<Long, Integer>>  kv = records.mapToPair(new PairFunction<String, String, Tuple2<Long, Integer>>() {
            @Override
            public Tuple2<String, Tuple2<Long, Integer>> call(String s) throws Exception {
                String[] tokens = s.split(",");
                if (tokens.length != 4) {
                    return null;
                }
                // tokens[0] = customer-id
                // tokens[1] = transaction-id
                // tokens[2] = purchase-date
                // tokens[3] = amount

                long date = 0;
                date = DateUtil.getDateAsMilliSeconds(tokens[2]);
                int amount = Integer.parseInt(tokens[3]);
                Tuple2<Long, Integer> V = new Tuple2<>(date, amount);
                //发射一个tuple2包含(customer-id,(date, amount))
                return new Tuple2<>(tokens[0], V);
            }
        });
        
        //        K2: customerID
        //           V2: Iterable<Tuple2<purchaseDate, Amount>>
        //根据排序的时间序列,根据价格,进行对状态序列的转换
    JavaPairRDD<String, Iterable<Tuple2<Long, Integer>>> customerRDD = kv.groupByKey();

        // For implementation of this step, we use:
        //     PairFlatMapFunction<T, K, V>
        //     T => Iterable<Tuple2<K, V>>
        JavaPairRDD<String, List<String>> stateSequence = customerRDD.mapValues(new Function<Iterable<Tuple2<Long, Integer>>, List<String>>() {
            @Override
            public List<String> call(Iterable<Tuple2<Long, Integer>> dateAndAmount) throws Exception {
                List<Tuple2<Long, Integer>> list = toList(dateAndAmount);
                List<String> stateSequence = toStateSequence(list);
                return stateSequence;
            }
        });
        //这一步把状态序列List<String>一对一对的发射出去
    JavaPairRDD<Tuple2<String,String>, Integer> model = stateSequence.flatMapToPair(
                new PairFlatMapFunction<
                        Tuple2<String, List<String>>,  // T
                        Tuple2<String,String>,         // K
                        Integer                        // V
                        >() {
                    @Override
                    public Iterator<Tuple2<Tuple2<String,String>, Integer>> call(Tuple2<String, List<String>> s) {
                        List<String> states = s._2;
                        if ( (states == null) || (states.size() < 2) ) {
                            return Collections.EMPTY_LIST.iterator();
                        }

                        List<Tuple2<Tuple2<String,String>, Integer>> mapperOutput =
                                new ArrayList<Tuple2<Tuple2<String,String>, Integer>>();
                        for (int i = 0; i < (states.size() -1); i++) {
                            String fromState = states.get(i);
                            String toState = states.get(i+1);
                            Tuple2<String,String> k = new Tuple2<String,String>(fromState, toState);
                            mapperOutput.add(new Tuple2<Tuple2<String,String>, Integer>(k, 1));
                        }
                        return mapperOutput.iterator();
                    }
                });
        //对key进行的聚合,得出总的马尔科夫模型个数
    JavaPairRDD<Tuple2<String,String>, Integer> markovModel =  model.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer i1, Integer i2) throws Exception {
                return i1+i2;
            }
        });

    //对数据进行输出
    JavaRDD<String> markovModelFormatted =
                markovModel.map(new Function<Tuple2<Tuple2<String,String>, Integer>, String>() {
                    @Override
                    public String call(Tuple2<Tuple2<String,String>, Integer> t) {
                        return t._1._1 + "," + t._1._2 + "\t" + t._2;
                    }
                });

++基于传统Scala来实现++

  //获取输入的文件
  val transactions = sc.textFile(input)
    //转换输入日期的格式
    val dataFormat = new SimpleDateFormat("yyyy-MM-dd")
    //对数据进行切分
    val customers = transactions.map(line =>{
      val tokens = line.split(",")
      (tokens(0),(dataFormat.parse(tokens(2)).getTime.toLong,tokens(3).toDouble))
    })
    //接下来进行排序,得到以下格式的数据
    /**
      * (OIROUTC2034,[(1241114412444,86),(232623343246,30)])
      */
    val coustomerGrouped = customers.groupByKey();

    //对后面的序列的日期进行排序
    val sortedByDate = coustomerGrouped.mapValues(_.toList.sortBy(_._1))

    //对后面序列的value进行操作
    //变成OIROUTC2034,[SG,SL,SG,SL,SG,ML,SG,SL,SG,SL]这些
    val stateSequence = sortedByDate.mapValues(list =>{
      val sequence = for{
        i <- 0 until list.size -1
        (currentDate,currentPurchase)=list(i)
        (nextDate,nextPurchase)=list(i+1)
      }yield {
        val elapsedTime = (nextDate - currentDate) /86400000 match {
          case  diff if (diff < 30) => "S"
          case  diff if (diff < 60) => "S"
          case _                    =>"L"
        }
        val amountRange=(currentPurchase / nextPurchase) match {
          case  ratio if (ratio < 0.9) => "L"
          case  ratio if (ratio < 1.1) => "E"
          case _                    =>"G"
        }
        elapsedTime +amountRange
      }
      sequence
    })
    //对所有value数组对大于2,分成Tuple2的形式发送出去
    val model = stateSequence.filter(_._2.size >=2).flatMap(f =>{
      val states =f._2

      val statePair = for{
        i <- 0 until states.size-1
        fromState = states(i)
        toState = states(i+1)
      }yield {
        ((fromState,toState),1)
      }
      statePair
    })

    //对发出的(fromState,toState),1进行安装key分组
    val markovModel = model.reduceByKey(_+_)

相关文章

网友评论

      本文标题:数据算法 Hadoop/Spark大数据处理---第十一章

      本文链接:https://www.haomeiwen.com/subject/afqouftx.html