美文网首页
在hadoop的map-reduce框架下实现经典的pagera

在hadoop的map-reduce框架下实现经典的pagera

作者: Mr_Relu | 来源:发表于2019-04-28 21:10 被阅读0次

编程环境:

Ubuntu16.4 uklin
Hadoop3.2.0
openjdk version "1.8.0_191"
spark 2.3.0 ( 集群环境 )
完整代码已经更新至GitHub,欢迎fork~GitHub链接


声明:创作不易,未经授权不得复制转载
statement:No reprinting without authorization


一、算法思想介绍:

PageRank的核心思想其实十分简单,概括如下:
1、如果一个网页被很多其它网页链接到,说明这个网页很重要,它的PageRank值也会相应较高;
2、如果一个PageRank值很高的网页链接到另外某个网页,那么那个网页的PageRank值也会相应地提高。
使用随机浏览模型的PageRank公式:


image.png

也可将(1-d)换为(1-d/N)N为总网页数

二、hadoop上利用mapreduce框架实现pagerank算法:

1、设计思路:

源数据格式为起始网页+\t+用逗号,分割的起始网页所连接指向的其它网页列表,eg:A B,D,F......
所以考虑首先读入一遍全体数据,先初始化每个网页的权重为1.0,并在往后的每一次迭代中更新这个记录每个网页权重pr值,每次将上一轮更新后的pr值表传给下一次迭代计算新的pr值。
具体在map时将每一行源数据拆分,首先读取到A的pr值,而后统计A后一共的出链数目num,将A指向的每个网页名作为一个key,value设为pr/num,表示A对该网页的贡献量,如上例中的一行数据经 过mapper后将得到的是:<B,prA/num>、<D,prA/num>、<F,prA/num>......
而后在reducer端接受到的将是同一个key为网页,value是其它所有网页对它pr值贡献量的值列表,eg:<B, <0.1 , 0.2 , 0.6 ......>>,所以reducer要做的便是利用公式将列表中值相加得到sum值,而后将sum乘以权重d再加上(1-d)便得到更新后的网页B的pr值:


image.png

2、难点

如何设计一个共享的变量来保存所有网页的pr值:
查询后发现hadoop对数据传递和共享变量没有什么特别好的接口,主要是因为hadoop是基于分布式的节点计算,要想实现数据共享传递主要是依赖对文件的操作,以及自带的Configuration中设置属性值,考虑到频繁的文件io操作带来的低效性,我采用了后一种方法:

//初始化configuration后,使用
conf.set("propertyName“,properyValue);
//在mapper或reducer中,
Configuration conf = context.getConfiguration();
String g = conf.get("propertyName");

补充其它方式:

image.png

而后在每一次迭代前,先读取上一次迭代的输出文件(part-r-00000), 将网页的pr值封装进一个String的字符串中,用“,”进行分割,用lastWeights保存,用在mapper中读取到后进行计算即可,不需要考虑到在每个reducer节点进行遍历更新(会很慢),转换为一次的文件io操作即可,极大增加了运行效率。

三、具体迭代实现的mapper和reducer的代码:

Mapper:

public static class pageMapper extends Mapper<LongWritable, Text, Text, DoubleWritable>{
        
        private static Text keyInfo = new Text();
        private static DoubleWritable valueInfo = new DoubleWritable();     
        private static String[] lastWeightsInfo;

        @Override
        protected void setup(Context context)
                throws IOException, InterruptedException {
            try {
                //从全局配置获取配置参数
                Configuration conf = context.getConfiguration();                                
                lastWeightsInfo = conf.get("lastWeights").split(","); //这样就拿到了                          
            } catch (Exception e) {                
                e.printStackTrace();
            }           
        }
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            
            //分离邻接链表
            String[] line = value.toString().split("\t");
            if(line.length==2){
                String curPageName = line[0];   
                String[] outPages = line[1].split(",");
                
                double outNum = outPages.length;
                double ww = 0.0;
                for(int i = 0;i<lastWeightsInfo.length;i++){
                    if(curPageName.equals(lastWeightsInfo[i])){
                        ww = Double.valueOf(lastWeightsInfo[i+1].toString())/outNum;
                        break;
                    }
                }                       
                
                for (int i=0;i<outPages.length;i++){            
                    keyInfo.set(outPages[i]);
                    valueInfo.set(ww);
                    context.write(keyInfo,valueInfo);
                }
            }
        }
    }

Reducer:

public static class pageReducer extends Reducer<Text,DoubleWritable,Text,DoubleWritable>{
        
        private static DoubleWritable info = new DoubleWritable();
        //private static int totalNum;
        
        /* @Override
        protected void setup(Context context)
                throws IOException, InterruptedException {
            try {
                //从全局配置获取配置参数
                Configuration conf = context.getConfiguration();
                //totalNum = conf.getInt("totalNum", 0);
            } catch (Exception e) {                
                e.printStackTrace();
            }           
        }  */   
        @Override
        protected void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {

            double sum = 0.0;// 统计PR 
            for (DoubleWritable value : values) {  
                sum += value.get();  
            }  
            sum = 0.85*sum + 0.15;
            info.set(sum);
            context.write(key, info);     
        }
    }

四、实现标准输出:

标准输出如下:
(recovered_factory,1.3220451459)
(AbboT,1.3123762223)
(Abbess,1.2992692277)
(Martin_Scorsese,1.2820462602)
(Asteropaeus,1.2775443371)
(Best_Adapted_Screenplay,1.2755708599)
(For_Your_Consideration,1.2743701080)
(Whittaker_Chambers,1.2633760889)
(capitalism,1.2620055989)

方案:
额外增加设置一次jop,单独设计mapper和reducer,将每个网页的pr值设定为key,按照降序传递到reducer端,并保留十位小数进行输出,主要利用的是重写比较器WritableComparator:

public static class IntWritableDecreasingComparator extends IntWritable.Comparator {
        public int compare(WritableComparable a, WritableComparable b) {
          return -super.compare(a, b);
        }

        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
            return -super.compare(b1, s1, l1, b2, s2, l2);
        }
    }

新的Mapper和Reduder:

    public static class pageOutMapper extends Mapper<LongWritable, Text, DoubleWritable, Text>{
        
        private static DoubleWritable keyInfo = new DoubleWritable();
        private static Text valueInfo = new Text();     

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //super.map(key, value, context);
            
            //若A能连接到B,则说明B是A的一条出链
            if(!value.toString().equals("")){
                String[] line = value.toString().split("\t");
                String curPageName = line[0]; 
                
                double pr = Double.valueOf(line[1]);
                
                DecimalFormat df = new DecimalFormat("0.0000000000");
                String Prvalue = df.format(pr);
                String vout = "("+curPageName+","+Prvalue+")";
                
                valueInfo.set(vout);
                keyInfo.set(pr);
                
                context.write(keyInfo, valueInfo);
            }
            
        }
    }
    
    public static class pageOutReducer extends Reducer<DoubleWritable,Text,NullWritable,Text>{
        
        private static Text info = new Text();
        
        @Override
        protected void reduce(DoubleWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            
            for(Text value:values){
                info.set(value.toString());
                context.write(NullWritable.get(),info); 
            }                 
        }
    }

五、在spark上完成pagerank

spark下的scala编程简单很多,直接在集群上的spark-shell上几行代码就可以搞定:


image.png

六、实验代码设计漏洞:

1、:在邻接链表右边未出现过的,左边出现,会出现异常,输出少一个page
2、:没有进行检查链接到自己的网页筛选
还可以进一步优化。

相关文章

网友评论

      本文标题:在hadoop的map-reduce框架下实现经典的pagera

      本文链接:https://www.haomeiwen.com/subject/ztxsnqtx.html