本章为马尔可夫模型
输入数据为:[用户,交易号,日期,价格],输入数据为:[配对的马尔可夫状态,数量]
本章实现方式
- 基于传统spark来实现
- 基于传统Scala来实现
本章实现方式的思路
image++基于传统spark来实现++
//对输入数据进行切分
JavaPairRDD<String, Tuple2<Long, Integer>> kv = records.mapToPair(new PairFunction<String, String, Tuple2<Long, Integer>>() {
@Override
public Tuple2<String, Tuple2<Long, Integer>> call(String s) throws Exception {
String[] tokens = s.split(",");
if (tokens.length != 4) {
return null;
}
// tokens[0] = customer-id
// tokens[1] = transaction-id
// tokens[2] = purchase-date
// tokens[3] = amount
long date = 0;
date = DateUtil.getDateAsMilliSeconds(tokens[2]);
int amount = Integer.parseInt(tokens[3]);
Tuple2<Long, Integer> V = new Tuple2<>(date, amount);
//发射一个tuple2包含(customer-id,(date, amount))
return new Tuple2<>(tokens[0], V);
}
});
// K2: customerID
// V2: Iterable<Tuple2<purchaseDate, Amount>>
//根据排序的时间序列,根据价格,进行对状态序列的转换
JavaPairRDD<String, Iterable<Tuple2<Long, Integer>>> customerRDD = kv.groupByKey();
// For implementation of this step, we use:
// PairFlatMapFunction<T, K, V>
// T => Iterable<Tuple2<K, V>>
JavaPairRDD<String, List<String>> stateSequence = customerRDD.mapValues(new Function<Iterable<Tuple2<Long, Integer>>, List<String>>() {
@Override
public List<String> call(Iterable<Tuple2<Long, Integer>> dateAndAmount) throws Exception {
List<Tuple2<Long, Integer>> list = toList(dateAndAmount);
List<String> stateSequence = toStateSequence(list);
return stateSequence;
}
});
//这一步把状态序列List<String>一对一对的发射出去
JavaPairRDD<Tuple2<String,String>, Integer> model = stateSequence.flatMapToPair(
new PairFlatMapFunction<
Tuple2<String, List<String>>, // T
Tuple2<String,String>, // K
Integer // V
>() {
@Override
public Iterator<Tuple2<Tuple2<String,String>, Integer>> call(Tuple2<String, List<String>> s) {
List<String> states = s._2;
if ( (states == null) || (states.size() < 2) ) {
return Collections.EMPTY_LIST.iterator();
}
List<Tuple2<Tuple2<String,String>, Integer>> mapperOutput =
new ArrayList<Tuple2<Tuple2<String,String>, Integer>>();
for (int i = 0; i < (states.size() -1); i++) {
String fromState = states.get(i);
String toState = states.get(i+1);
Tuple2<String,String> k = new Tuple2<String,String>(fromState, toState);
mapperOutput.add(new Tuple2<Tuple2<String,String>, Integer>(k, 1));
}
return mapperOutput.iterator();
}
});
//对key进行的聚合,得出总的马尔科夫模型个数
JavaPairRDD<Tuple2<String,String>, Integer> markovModel = model.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) throws Exception {
return i1+i2;
}
});
//对数据进行输出
JavaRDD<String> markovModelFormatted =
markovModel.map(new Function<Tuple2<Tuple2<String,String>, Integer>, String>() {
@Override
public String call(Tuple2<Tuple2<String,String>, Integer> t) {
return t._1._1 + "," + t._1._2 + "\t" + t._2;
}
});
++基于传统Scala来实现++
//获取输入的文件
val transactions = sc.textFile(input)
//转换输入日期的格式
val dataFormat = new SimpleDateFormat("yyyy-MM-dd")
//对数据进行切分
val customers = transactions.map(line =>{
val tokens = line.split(",")
(tokens(0),(dataFormat.parse(tokens(2)).getTime.toLong,tokens(3).toDouble))
})
//接下来进行排序,得到以下格式的数据
/**
* (OIROUTC2034,[(1241114412444,86),(232623343246,30)])
*/
val coustomerGrouped = customers.groupByKey();
//对后面的序列的日期进行排序
val sortedByDate = coustomerGrouped.mapValues(_.toList.sortBy(_._1))
//对后面序列的value进行操作
//变成OIROUTC2034,[SG,SL,SG,SL,SG,ML,SG,SL,SG,SL]这些
val stateSequence = sortedByDate.mapValues(list =>{
val sequence = for{
i <- 0 until list.size -1
(currentDate,currentPurchase)=list(i)
(nextDate,nextPurchase)=list(i+1)
}yield {
val elapsedTime = (nextDate - currentDate) /86400000 match {
case diff if (diff < 30) => "S"
case diff if (diff < 60) => "S"
case _ =>"L"
}
val amountRange=(currentPurchase / nextPurchase) match {
case ratio if (ratio < 0.9) => "L"
case ratio if (ratio < 1.1) => "E"
case _ =>"G"
}
elapsedTime +amountRange
}
sequence
})
//对所有value数组对大于2,分成Tuple2的形式发送出去
val model = stateSequence.filter(_._2.size >=2).flatMap(f =>{
val states =f._2
val statePair = for{
i <- 0 until states.size-1
fromState = states(i)
toState = states(i+1)
}yield {
((fromState,toState),1)
}
statePair
})
//对发出的(fromState,toState),1进行安装key分组
val markovModel = model.reduceByKey(_+_)
网友评论