编程环境:
Ubuntu16.4 uklin
Hadoop3.2.0
openjdk version "1.8.0_191"
spark 2.3.0 ( 集群环境 )
完整代码已经更新至GitHub,欢迎fork~GitHub链接
声明:创作不易,未经授权不得复制转载
statement:No reprinting without authorization
一、算法思想介绍:
PageRank的核心思想其实十分简单,概括如下:
1、如果一个网页被很多其它网页链接到,说明这个网页很重要,它的PageRank值也会相应较高;
2、如果一个PageRank值很高的网页链接到另外某个网页,那么那个网页的PageRank值也会相应地提高。
使用随机浏览模型的PageRank公式:
image.png
也可将(1-d)换为(1-d/N)N为总网页数
二、hadoop上利用mapreduce框架实现pagerank算法:
1、设计思路:
源数据格式为起始网页+\t+用逗号,分割的起始网页所连接指向的其它网页列表,eg:A B,D,F......
所以考虑首先读入一遍全体数据,先初始化每个网页的权重为1.0,并在往后的每一次迭代中更新这个记录每个网页权重pr值,每次将上一轮更新后的pr值表传给下一次迭代计算新的pr值。
具体在map时将每一行源数据拆分,首先读取到A的pr值,而后统计A后一共的出链数目num,将A指向的每个网页名作为一个key,value设为pr/num,表示A对该网页的贡献量,如上例中的一行数据经 过mapper后将得到的是:<B,prA/num>、<D,prA/num>、<F,prA/num>......
而后在reducer端接受到的将是同一个key为网页,value是其它所有网页对它pr值贡献量的值列表,eg:<B, <0.1 , 0.2 , 0.6 ......>>,所以reducer要做的便是利用公式将列表中值相加得到sum值,而后将sum乘以权重d再加上(1-d)便得到更新后的网页B的pr值:
image.png
2、难点
如何设计一个共享的变量来保存所有网页的pr值:
查询后发现hadoop对数据传递和共享变量没有什么特别好的接口,主要是因为hadoop是基于分布式的节点计算,要想实现数据共享传递主要是依赖对文件的操作,以及自带的Configuration中设置属性值,考虑到频繁的文件io操作带来的低效性,我采用了后一种方法:
//初始化configuration后,使用
conf.set("propertyName“,properyValue);
//在mapper或reducer中,
Configuration conf = context.getConfiguration();
String g = conf.get("propertyName");
补充其它方式:
image.png而后在每一次迭代前,先读取上一次迭代的输出文件(part-r-00000), 将网页的pr值封装进一个String的字符串中,用“,”进行分割,用lastWeights保存,用在mapper中读取到后进行计算即可,不需要考虑到在每个reducer节点进行遍历更新(会很慢),转换为一次的文件io操作即可,极大增加了运行效率。
三、具体迭代实现的mapper和reducer的代码:
Mapper:
public static class pageMapper extends Mapper<LongWritable, Text, Text, DoubleWritable>{
private static Text keyInfo = new Text();
private static DoubleWritable valueInfo = new DoubleWritable();
private static String[] lastWeightsInfo;
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
try {
//从全局配置获取配置参数
Configuration conf = context.getConfiguration();
lastWeightsInfo = conf.get("lastWeights").split(","); //这样就拿到了
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//分离邻接链表
String[] line = value.toString().split("\t");
if(line.length==2){
String curPageName = line[0];
String[] outPages = line[1].split(",");
double outNum = outPages.length;
double ww = 0.0;
for(int i = 0;i<lastWeightsInfo.length;i++){
if(curPageName.equals(lastWeightsInfo[i])){
ww = Double.valueOf(lastWeightsInfo[i+1].toString())/outNum;
break;
}
}
for (int i=0;i<outPages.length;i++){
keyInfo.set(outPages[i]);
valueInfo.set(ww);
context.write(keyInfo,valueInfo);
}
}
}
}
Reducer:
public static class pageReducer extends Reducer<Text,DoubleWritable,Text,DoubleWritable>{
private static DoubleWritable info = new DoubleWritable();
//private static int totalNum;
/* @Override
protected void setup(Context context)
throws IOException, InterruptedException {
try {
//从全局配置获取配置参数
Configuration conf = context.getConfiguration();
//totalNum = conf.getInt("totalNum", 0);
} catch (Exception e) {
e.printStackTrace();
}
} */
@Override
protected void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
double sum = 0.0;// 统计PR
for (DoubleWritable value : values) {
sum += value.get();
}
sum = 0.85*sum + 0.15;
info.set(sum);
context.write(key, info);
}
}
四、实现标准输出:
标准输出如下:
(recovered_factory,1.3220451459)
(AbboT,1.3123762223)
(Abbess,1.2992692277)
(Martin_Scorsese,1.2820462602)
(Asteropaeus,1.2775443371)
(Best_Adapted_Screenplay,1.2755708599)
(For_Your_Consideration,1.2743701080)
(Whittaker_Chambers,1.2633760889)
(capitalism,1.2620055989)
方案:
额外增加设置一次jop,单独设计mapper和reducer,将每个网页的pr值设定为key,按照降序传递到reducer端,并保留十位小数进行输出,主要利用的是重写比较器WritableComparator:
public static class IntWritableDecreasingComparator extends IntWritable.Comparator {
public int compare(WritableComparable a, WritableComparable b) {
return -super.compare(a, b);
}
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
return -super.compare(b1, s1, l1, b2, s2, l2);
}
}
新的Mapper和Reduder:
public static class pageOutMapper extends Mapper<LongWritable, Text, DoubleWritable, Text>{
private static DoubleWritable keyInfo = new DoubleWritable();
private static Text valueInfo = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//super.map(key, value, context);
//若A能连接到B,则说明B是A的一条出链
if(!value.toString().equals("")){
String[] line = value.toString().split("\t");
String curPageName = line[0];
double pr = Double.valueOf(line[1]);
DecimalFormat df = new DecimalFormat("0.0000000000");
String Prvalue = df.format(pr);
String vout = "("+curPageName+","+Prvalue+")";
valueInfo.set(vout);
keyInfo.set(pr);
context.write(keyInfo, valueInfo);
}
}
}
public static class pageOutReducer extends Reducer<DoubleWritable,Text,NullWritable,Text>{
private static Text info = new Text();
@Override
protected void reduce(DoubleWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for(Text value:values){
info.set(value.toString());
context.write(NullWritable.get(),info);
}
}
}
五、在spark上完成pagerank
spark下的scala编程简单很多,直接在集群上的spark-shell上几行代码就可以搞定:
image.png
六、实验代码设计漏洞:
1、:在邻接链表右边未出现过的,左边出现,会出现异常,输出少一个page
2、:没有进行检查链接到自己的网页筛选
还可以进一步优化。
网友评论