美文网首页
数据算法 Hadoop/Spark大数据处理---第三章

数据算法 Hadoop/Spark大数据处理---第三章

作者: _Kantin | 来源:发表于2018-01-06 11:52 被阅读111次

    本章欲解决的问题为求TOP(N),共用到的方法有:

    • 假设输入键都是唯一的,也即给定的输入集合{(K,V)},所有的K都是唯一的,用Mapreduce/Hadoop方法
    • 假设输入键都是唯一的,也即给定的输入集合{(K,V)},所有的K都是唯一的,用spark方法
    • 假设输入键都不是唯一的,也即给定的输入集合{(K,V)},K是有重复的,用spark强大的排序算法top()函数和takeOrdered()等

    主要用到的TOP N函数

    java中实现Top N的方法最常用的是适用SortedMap<K,V>和TreeMap<K,V>,然后将L的所有元素增加到topN中,如果topN.size()>N,则删除第一个元素或最后一个元素

    //TOP K 中最关键的算法
    static SortMap<Integer,T> topN(List<Tuple2<T,Integer>> L,int N){
        if((L==null) || (L.isEmpty())){
            return null;
        }
        
        SortMap<Integer,T> topN = new TreeMap<Integer,T>();
        for(Tuple2<T,Integer> element : L){
            topN.put(element._1,element._2);
            if(topN.size() > N){
                topN.remove(topN.firstKey())
            }
        }
        return topN;
    }
    
    

    基于MapReduce实现的键唯一方法

    类名 描述
    TopN_Driver 提交作业的驱动器
    TopN_Mapper 定义map()
    TopN_Reduce 定义reduce()

    • 重写setup和cleanup函数,这里两个函数再每次启动映射器都会执行一次,setup用于获取N的值,cleanup用于发射每个映射器的TOP N到reduce端
      //获取N的值
      @Override
       protected void setup(Context context) throws IOException,
             InterruptedException {
          this.N = context.getConfiguration().getInt("N", 10); // default is top 10
       }
       
        //将结果发射,其中NullWritable.get()获取的值都相同,也即都映射到相同的reduce端
       @Override
       protected void cleanup(Context context) throws IOException,
             InterruptedException {
          for (String str : top.values()) {
             context.write(NullWritable.get(), new Text(str));
          }
       }
    

    - Map函数,完成分区的TOP N求值

     @Override
       public void map(Text key, IntWritable value, Context context)
             throws IOException, InterruptedException {
    
          String keyAsString = key.toString();
          int frequency =  value.get();
          String compositeValue = keyAsString + "," + frequency;
          top.put(frequency, compositeValue);
          // keep only top N
          if (top.size() > N) {
             top.remove(top.firstKey());
          }
       }
    
    

    - Reduce函数,完成所有的TOP N求值

     private int N = 10; // default
       private SortedMap<Integer, String> top = new TreeMap<Integer, String>();
    
       //同样的SortedMap<Integer, String>操作
       @Override
       public void reduce(NullWritable key, Iterable<Text> values, Context context) 
          throws IOException, InterruptedException {
          for (Text value : values) {
             String valueAsString = value.toString().trim();
             String[] tokens = valueAsString.split(",");
             String url = tokens[0];
             int frequency =  Integer.parseInt(tokens[1]);
             top.put(frequency, url);
             // keep only top N
             if (top.size() > N) {
                top.remove(top.firstKey());
             }
          }
          
          // 发射最终的 final top N
            List<Integer> keys = new ArrayList<Integer>(top.keySet());
            for(int i=keys.size()-1; i>=0; i--){
             context.write(new IntWritable(keys.get(i)), new Text(top.get(keys.get(i))));
          }
       }
       
       //也先执行setup获得N的值
       @Override
       protected void setup(Context context) 
          throws IOException, InterruptedException {
          this.N = context.getConfiguration().getInt("N", 10); // default is top 10
       }
    
    

    - 驱动程序类TopNDriver.java

     Job job = new Job(getConf());
          HadoopUtil.addJarsToDistributedCache(job, "/lib/");
          int N = Integer.parseInt(args[0]); // top N
          job.getConfiguration().setInt("N", N);
          job.setJobName("TopNDriver");
    
          job.setInputFormatClass(SequenceFileInputFormat.class);
          job.setOutputFormatClass(SequenceFileOutputFormat.class);
    
          job.setMapperClass(TopNMapper.class);
          job.setReducerClass(TopNReducer.class);
          //设置reduce的数目为1个,也即所有的TOP N都到同一个Reduce
          job.setNumReduceTasks(1);
    
          // map()'s output (K,V)
          job.setMapOutputKeyClass(NullWritable.class);   
          job.setMapOutputValueClass(Text.class);   
          
          // reduce()'s output (K,V)
          job.setOutputKeyClass(IntWritable.class);
          job.setOutputValueClass(Text.class);
    
    

    - 查找TOP 10 和 Bottom 10

    //查找top10 
    if(top10Cats.size()>10){
        top10Cats.remove(top10Cats.firstKey())
    }
    
    //查找Bottom10
    if(top10Cats.size()>10){
        top10Cats.remove(top10Cats.lastKey())
    }
    
    

    基于Spark实现的键唯一方法

    Java API使用的spark函数类
    spark java类 函数类型
    Function<T,R> T=>R
    DoubleFunction<T> T=>Double
    PairFunction<T,K,V> T=>Tuple2<K,V>
    FlatMapFunction<T,R> T=>Iterable<R>
    DoubleFlatMapFunction<T> T=>Iterable<Double>
    PairFlatMapFunction<T,K,V> T=>Iterable<Tuple2<K,v>>
    Function2<T1,T2,R> T1,T2 => R
    在spark中使用setUp()和cleanUp()
     JavaRDD<SortedMap<Integer, String>> partitions =
     //使用mapPartitions方法
     pairs.mapPartitions(
      new FlatMapFunction<Iterator<Tuple2<K,V>>, SortedMap<K1, K2>>() {
          @Override
             public Iterator<Tuple2<K,V>> call(Iterator<Tuple2<K,V>> iter) {
                 setup();
                 while(iter.hasNext()){
                     //map()功能
                 }
                 cleanUp();
                 return <the-result>
             }
      })
    

    - 采用spark实现TOP N

     public static void main(String[] args) throws Exception {
      
          // 输入处理参数
          if (args.length < 1) {
             System.err.println("Usage: Top10 <input-file>");
             System.exit(1);
          }
          String inputPath = args[0];
          System.out.println("args[0]: <input-path>="+inputPath);
    
          // 连接到spark master
          JavaSparkContext ctx = SparkUtil.createJavaSparkContext();
    
          // 从HDFS中读取文件并创建第一个RDD
          //  <string-key><,><integer-value>,
          JavaRDD<String> lines = ctx.textFile(inputPath, 1);
    
        
          // 从现有的JavaRDD<String>创建一个新的成对的RDDJavaPairRDD<String,Integer>
          // Spark Java类:PairFunction<T, K, V>
          // 函数类型:T => Tuple2<K, V>
          //其实每一个JavaPairRDD<String,Integer>也即是Tuple2<String,Integer>()
          JavaPairRDD<String,Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() {
             @Override
             public Tuple2<String,Integer> call(String s) {
                String[] tokens = s.split(","); // cat7,234
                return new Tuple2<String,Integer>(tokens[0], Integer.parseInt(tokens[1]));
             }
          });
    
          List<Tuple2<String,Integer>> debug1 = pairs.collect();
          for (Tuple2<String,Integer> t2 : debug1) {
             System.out.println("key="+t2._1 + "\t value= " + t2._2);
          }
    
        
          // 为各个输入分区创建一个本地TOP 10列表
          JavaRDD<SortedMap<Integer, String>> partitions = pairs.mapPartitions(
             new FlatMapFunction<Iterator<Tuple2<String,Integer>>, SortedMap<Integer, String>>() {
             @Override
             public Iterator<SortedMap<Integer, String>> call(Iterator<Tuple2<String,Integer>> iter) {
                 SortedMap<Integer, String> top10 = new TreeMap<Integer, String>();
                 while (iter.hasNext()) {
                    Tuple2<String,Integer> tuple = iter.next();
                    top10.put(tuple._2, tuple._1);
                    // keep only top N 
                    if (top10.size() > 10) {
                       top10.remove(top10.firstKey());
                    }  
                 }
                 //singletonList确保唯一性
                 return Collections.singletonList(top10).iterator();
             }
          });
    
        
          SortedMap<Integer, String> finaltop10 = new TreeMap<Integer, String>();
          //使用collect得到所有TOP 10列表
          List<SortedMap<Integer, String>> alltop10 = partitions.collect();
          //获得最终所有的TOP 10
          for (SortedMap<Integer, String> localtop10 : alltop10) {
              //System.out.println(tuple._1 + ": " + tuple._2);
              // weight/count = tuple._1
              // catname/URL = tuple._2
              for (Map.Entry<Integer, String> entry : localtop10.entrySet()) {
                  //   System.out.println(entry.getKey() + "--" + entry.getValue());
                  finaltop10.put(entry.getKey(), entry.getValue());
                  // keep only top 10 
                  if (finaltop10.size() > 10) {
                     finaltop10.remove(finaltop10.firstKey());
                  }
              }
          }
        
          // 输出最终的TOP 10列表
          for (Map.Entry<Integer, String> entry : finaltop10.entrySet()) {
             System.out.println(entry.getKey() + "--" + entry.getValue());
          }
    
          System.exit(0);
       }
    
    
    全局指定TOP N 参数
    • 定义broadcastTopN:final Broadcast<Integer> broadcastTopN = context.broadcast(topN)
    • 获取N的值:final int topN = broadcastTopN.value();

    基于Spark实现的键不唯一的方法

    算法过程
    1. 要保证K是唯一的,要把输入映射到JavaPairRDD<K,V>对,然后交给reduceByKey()
    2. 将所有唯一的(K,V)对划分为M个分区
    3. 找到各个分区的TOP N (本地TOP N)
    4. 找出所有本地TOP N的最终TOP N

    基于Spark实现的非唯一键方法

    public static void main(String[] args) throws Exception {
          // 输入处理参数
          if (args.length < 2) {
             System.err.println("Usage: Top10 <input-path> <topN>");
             System.exit(1);
          }
          System.out.println("args[0]: <input-path>="+args[0]);
          System.out.println("args[1]: <topN>="+args[1]);
          final int N = Integer.parseInt(args[1]);
    
          // 创建一个javaSpark上下文对象
          JavaSparkContext ctx = SparkUtil.createJavaSparkContext();
    
          // 将TOP N 广播到所有集群节点
          final Broadcast<Integer> topN = ctx.broadcast(N);
          // now topN is available to be read from all cluster nodes
    
          // 创建第一个RDD,格式是这样的A,2 | B,2 |C,3这样
          //<string-key><,><integer-value-count>
          JavaRDD<String> lines = ctx.textFile(args[0], 1);
          lines.saveAsTextFile("/output/1");
        
          // RDD分区,返回一个新的RDD,归约到numPartitions分区
          //分区的原则:每个执行器使用(2*num_executors*cores_per_executor)个分区
          JavaRDD<String> rdd = lines.coalesce(9);
           
          // 将输入(T)映射到(K,V)对
          // PairFunction<T, K, V>   
          // T => Tuple2<K, V>
          JavaPairRDD<String,Integer> kv = rdd.mapToPair(new PairFunction<String,String,Integer>() {
             @Override
             public Tuple2<String,Integer> call(String s) {
                String[] tokens = s.split(","); // url,789
                return new Tuple2<String,Integer>(tokens[0], Integer.parseInt(tokens[1]));
             }
          });
          kv.saveAsTextFile("/output/2");
    
          //用Function函数对重复键进行归约
          JavaPairRDD<String, Integer> uniqueKeys = kv.reduceByKey(new Function2<Integer, Integer, Integer>() {
             @Override
             public Integer call(Integer i1, Integer i2) {
                return i1 + i2;
             }
          });
          uniqueKeys.saveAsTextFile("/output/3");
        
          // 为本地的partitions创建本地的TOP N
          JavaRDD<SortedMap<Integer, String>> partitions = uniqueKeys.mapPartitions(
              new FlatMapFunction<Iterator<Tuple2<String,Integer>>, SortedMap<Integer, String>>() {
              @Override
              public Iterator<SortedMap<Integer, String>> call(Iterator<Tuple2<String,Integer>> iter) {
                 final int N = topN.value();
                 SortedMap<Integer, String> localTopN = new TreeMap<Integer, String>();
                 while (iter.hasNext()) {
                    Tuple2<String,Integer> tuple = iter.next();
                    localTopN.put(tuple._2, tuple._1);
                    // keep only top N 
                    if (localTopN.size() > N) {
                       localTopN.remove(localTopN.firstKey());
                    } 
                 }
                 return Collections.singletonList(localTopN).iterator();
              }
          });
          partitions.saveAsTextFile("/output/4");
    
          // 获得最终的TOP N
          SortedMap<Integer, String> finalTopN = new TreeMap<Integer, String>();
          //获得所有分区的TOP N
          List<SortedMap<Integer, String>> allTopN = partitions.collect();
          for (SortedMap<Integer, String> localTopN : allTopN) {
             for (Map.Entry<Integer, String> entry : localTopN.entrySet()) {
                 // count = entry.getKey()
                 // url = entry.getValue()
                 finalTopN.put(entry.getKey(), entry.getValue());
                 // keep only top N 
                 if (finalTopN.size() > N) {
                    finalTopN.remove(finalTopN.firstKey());
                 }
             }
          }
        
          //输出最终的TOP N
          for (Map.Entry<Integer, String> entry : finalTopN.entrySet()) {
             System.out.println(entry.getKey() + "--" + entry.getValue());
          }
    
          System.exit(0);
       }
    
    

    基于takeOrdered实现的键不唯一的方法

    //步骤8:获取全局TOP 10的使用
     List<Tuple2<String, Integer>> topNResult = uniqueKeys.takeOrdered(N, MyTupleComparator.INSTANCE);
     
    //但需要实现排序方法
    static class MyTupleComparator implements Comparator<Tuple2<String, Integer>> ,Serializable {
           final static MyTupleComparator INSTANCE = new MyTupleComparator();
           @Override
           public int compare(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) {
              return -t1._2.compareTo(t2._2);     // sorts RDD elements descending (use for Top-N)
              // return t1._2.compareTo(t2._2);   // sorts RDD elements ascending (use for Bottom-N)
           }
       }
    

    用mapreduce求不唯一的方法

    • 先类似wordCount求出唯一的<key,value>
    • 之后用第一节唯一键求TOP N即可

    使用Scala实现唯一键和不唯一键

    唯一键的实现方法
     def main(args: Array[String]): Unit = {
        if (args.size < 1) {
          println("Usage: TopN <input>")
          sys.exit(1)
        }
        //获得sparkConf对象
        val sparkConf = new SparkConf().setAppName("TopN")
        val sc = new SparkContext(sparkConf)
        //广播N变量
        val N = sc.broadcast(10)
        val path = args(0)
    
        val input = sc.textFile(path)
        //注意:key和value倒过来了
        val pair = input.map(line => {
          val tokens = line.split(",")
          (tokens(2).toInt, tokens)
        })
    
        import Ordering.Implicits._
        val partitions = pair.mapPartitions(itr => {
        //sortedMap是对key进行排序的,也即对value排序了
          var sortedMap = SortedMap.empty[Int, Array[String]]
          itr.foreach { tuple =>
            {
              sortedMap += tuple
              if (sortedMap.size > N.value) {
                sortedMap = sortedMap.takeRight(N.value)
              }
            }
          }
          //获得分区右边的N个
          sortedMap.takeRight(N.value).toIterator
        })
        //获得所有分区
        val alltop10 = partitions.collect()
        //把所有分区连接上SortedMap,也即可所有分区都排序好了
        val finaltop10 = SortedMap.empty[Int, Array[String]].++:(alltop10)
        val resultUsingMapPartition = finaltop10.takeRight(N.value)
        
        //Prints result (top 10) on the console
        resultUsingMapPartition.foreach {
          case (k, v) => println(s"$k \t ${v.asInstanceOf[Array[String]].mkString(",")}")
        }
    
        // 方法二:sortByKey对key进行排序,以降序的方式
        val moreConciseApproach = pair.groupByKey().sortByKey(false).take(N.value)
    
        //Prints result (top 10) on the console
        moreConciseApproach.foreach {
          case (k, v) => println(s"$k \t ${v.flatten.mkString(",")}")
        }
        
        // done
        sc.stop()
      }
    
    不唯一键的实现方法
    def main(args: Array[String]): Unit = {
        if (args.size < 1) {
          println("Usage: TopNNonUnique <input>")
          sys.exit(1)
        }
    
        val sparkConf = new SparkConf().setAppName("TopNNonUnique")
        val sc = new SparkContext(sparkConf)
    
        val N = sc.broadcast(2)
        val path = args(0)
    
        val input = sc.textFile(path)
        val kv = input.map(line => {
          val tokens = line.split(",")
          (tokens(0), tokens(1).toInt)
        })
    
        val uniqueKeys = kv.reduceByKey(_ + _)
        import Ordering.Implicits._
        val partitions = uniqueKeys.mapPartitions(itr => {
          //SortedMap是一个对键进行排列
          var sortedMap = SortedMap.empty[Int, String]
          itr.foreach { tuple =>
            {
              //把元组的值相反再相加
              sortedMap += tuple.swap
              if (sortedMap.size > N.value) {
                sortedMap = sortedMap.takeRight(N.value)
              }
            }
          }
          sortedMap.takeRight(N.value).toIterator
        })
    
        val alltop10 = partitions.collect()
        val finaltop10 = SortedMap.empty[Int, String].++:(alltop10)
        val resultUsingMapPartition = finaltop10.takeRight(N.value)
    
        //Prints result (top 10) on the console
        resultUsingMapPartition.foreach { 
          case (k, v) => println(s"$k \t ${v.mkString(",")}") 
        }
    
        // Below is additional approach which is more concise
        val createCombiner = (v: Int) => v
        val mergeValue = (a: Int, b: Int) => (a + b)
        val moreConciseApproach = kv.combineByKey(createCombiner, mergeValue, mergeValue)
                                    .map(_.swap)
                                    .groupByKey()
                                    .sortByKey(false)
                                    .take(N.value)
    
        //Prints result (top 10) on the console
        moreConciseApproach.foreach {
          case (k, v) => println(s"$k \t ${v.mkString(",")}")
        }
    
        // done
        sc.stop()
      }
    
    

    相关文章

      网友评论

          本文标题:数据算法 Hadoop/Spark大数据处理---第三章

          本文链接:https://www.haomeiwen.com/subject/xxcynxtx.html