美文网首页
数据算法 Hadoop/Spark大数据处理---第六章

数据算法 Hadoop/Spark大数据处理---第六章

作者: _Kantin | 来源:发表于2018-01-09 19:46 被阅读206次

    本章欲解决的问题

    移动平均:也即是不定具体数值内值的平均,类似于看同花顺的时候,可以看五分线,十分线,时K,周K等。所给的例子为(k,t,v):其它k为键(股票名字),t是时间(时分秒),v是相关联的值(价格)等。


    本章的输入和输出

    股票价格(k,t,v),在其特定的时间内输出(k,t,周期内平均值)


    本章共有四种实现方式

    1. 基于内存实现的创建mapreduce
    2. 分区分组的shuffle方式的mapreduce
    3. 基于传统的内存式Scala实现
    4. 基于分区分组的Scala实现

    首先介绍java实现移动平局的两种方式

    //基于队列实现
    private double sum = 0;
        private int period;
        private Queue<Double> window = new LinkedList<>();
        
        public SimpleMovingAverage(int preriod) {
            this.period = preriod;
        }
        
        public void addNumber(double number) {
            sum += number;
            window.add(number);
            if(window.size()>period) {
                sum -= window.remove();
            }
        }
        public double getMovingAverage() {
            return sum/window.size();
        }
    
    

    //基于数组实现
    public class SimpleMovingAverageUsingArray {
        private double sum=0;
        private int preiod;
        private double[] window = null;
        private int pointer = 0;
        private int size = 0;
        
        public SimpleMovingAverageUsingArray(int preiod) {
            this.preiod = preiod;
            window  =new double[preiod];
        }
        public void addNewNuber(double number) {
            sum += number;
            if(size<preiod) {
                window[pointer++] = number;
                size++;
            }else {
                pointer = pointer%preiod;
                sum -= window[pointer];
                window[pointer++]=number;
            }
        }
        public double getMovingAverage() {
            return sum/size;
        }
        
        public static void main(String[] args) {
            double[] testData= {10,18,20,30,24,33,27};
            int[] allWindowSize= {3,4};
            for(int windowSize:allWindowSize) {
                SimpleMovingAverageUsingArray sau = new SimpleMovingAverageUsingArray(windowSize);
                for(double x:testData) {
                    sau.addNewNuber(x);
                    System.out.println("Next Number= "+x+" sau: "+sau.getMovingAverage());
                }
                System.out.println("");
            }
            
        }
    }
    

    基于内存实现的创建mapreduce

    输入格式为:<name-as-string><,><timestamp><,><value-as-double>,设置key为:<name-as-string>,value为:(<timestamp><,><value-as-double>)


    类名 描述
    SortInMemory_MovingAverageDriver 提交Hadoop作业的驱动器程序
    SortInMemory_MovingAverageMapper 定义map()函数
    SortInMemory_MovingAverageReducer 定义reduce()函数
    //map端函数
     public void map(LongWritable key, Text value, Context context)
           throws IOException, InterruptedException {
           String record = value.toString();
           if ((record == null) || (record.length() == 0)) {
              return;
           }
           String[] tokens = StringUtils.split(record.trim(), ",");
           if (tokens.length == 3) {
              // tokens[0] = name of timeseries as string
              // tokens[1] = timestamp
              // tokens[2] = value of timeseries as double
              Date date = DateUtil.getDate(tokens[1]);
              if (date == null) {
                 return;
              }
              //发出key为name of timeseries as string,value为(timestamp ,value of timeseries as double)
               //TimeSeriesData javaBean 定义了它们
              reducerKey.set(tokens[0]); // set the name as key
              reducerValue.set(date.getTime(), Double.parseDouble(tokens[2]));
              context.write(reducerKey, reducerValue);
           }
           else {
              // log as error, not enough tokens
           }
       }
    
    

    //reduce函数
    public void setup(Context context)
            throws IOException, InterruptedException {
            this.windowSize = context.getConfiguration().getInt("moving.average.window.size", 5);
            System.out.println("setup(): key="+windowSize);
        }
    
        public void reduce(Text key, Iterable<TimeSeriesData> values, Context context)  
            throws IOException, InterruptedException {
           
            System.out.println("reduce(): key="+key.toString());
    
            // build the unsorted list of timeseries
            List<TimeSeriesData> timeseries = new ArrayList<TimeSeriesData>();
            for (TimeSeriesData tsData : values) {
                TimeSeriesData copy = TimeSeriesData.copy(tsData);
                timeseries.add(copy);
            } 
            
            //在TimeSeriesData有重写了sort的compareT方法了
            Collections.sort(timeseries);
            System.out.println("reduce(): timeseries="+timeseries.toString());
            
            
            // 先把小于预设值的先计算
            double sum = 0.0;
            for (int i=0; i < windowSize-1; i++) {
                sum += timeseries.get(i).getValue();
            }
            
            // 大于预设值的,则到达一个发送一个
            Text outputValue = new Text();
            //遍历序列timeseries
            for (int i = windowSize-1; i < timeseries.size(); i++) {
                System.out.println("reduce(): key="+key.toString() + "  i="+i);
                sum += timeseries.get(i).getValue();
                double movingAverage = sum / windowSize;
                long timestamp = timeseries.get(i).getTimestamp();
                outputValue.set(DateUtil.getDateAsString(timestamp) + "," + movingAverage);
                // send output to HDFS
                context.write(key, outputValue);
                
                //减出第一个,为下一次迭代做准备
                sum -= timeseries.get(i-windowSize+1).getValue();
            }
        } // reduce
    
    

    分区分组的shuffle方式的mapreduce

    基于非内存的排序,也即是二次排序,需要经过洗牌阶段

    类名 描述
    CompositeKey 定义一个定制组合键{String,timestamp}
    CompositeKeyComparator 映射阶段的数据输出发送到洗牌阶段之前先分区
    MovingAverage 一个简单移动平均算法
    NaturalKeyGroupingComparator 洗牌阶段对组合键分组
    NaturalKeyPartitioner 洗牌阶段之前先分区
    SortByMRF_MovingAverageDriver Hadoop作业的驱动器
    SortByMRF_MovingAverageMapper 定义map()函数
    SortByMRF_MovingAverageReducer 定义reduce()函数
    //整个作业的驱动器
      //自然键分区,根据哈希算法
      jobconf.setPartitionerClass(NaturalKeyPartitioner.class);
      //分组之后内排序    
      jobconf.setOutputKeyComparatorClass(CompositeKeyComparator.class);
      //reduce阶段对自然键排序
      jobconf.setOutputValueGroupingComparator(NaturalKeyGroupingComparator.class);
    
    

    //MovingAverage基于数组的方式来计算移动平均值
    public void addNewNumber(double number) {
            sum += number;
            if (size < period) {
                window[pointer++] = number;
                size++;
            } 
            else {
                // size = period (size cannot be > period)
                pointer = pointer % period;
                sum -= window[pointer];
                window[pointer++] = number;
            }
        }
    
        public double getMovingAverage() {
            if (size == 0) {
                throw new IllegalArgumentException("average is undefined");
            }
            //
            return sum / size;
        }
    
    

    //map()函数
     @Override
        public void map(LongWritable inkey, Text value,
                OutputCollector<CompositeKey, TimeSeriesData> output,
                Reporter reporter) throws IOException {
            String record = value.toString();
            if ((record == null) || (record.length() == 0)) {
                return;
            }
            String[] tokens = StringUtils.split(record, ",");
            if (tokens.length == 3) {
                // tokens[0] = name of timeseries as string
                // tokens[1] = timestamp
                // tokens[2] = value of timeseries as double
                Date date = DateUtil.getDate(tokens[1]);
                if (date == null) {
                    return;
                }
                long timestamp = date.getTime();
                //reducerKey为:name,timestamp
                //reducerKey为:timestamp,value
                reducerKey.set(tokens[0], timestamp);
                reducerValue.set(timestamp, Double.parseDouble(tokens[2]));
                // emit key-value pair
                output.collect(reducerKey, reducerValue);
            } 
            else {
                // log as error, not enough tokens
            }
        }
    
    //reduce函数
     @Override
        public void reduce(CompositeKey key,
                Iterator<TimeSeriesData> values,
                OutputCollector<Text, Text> output,
                Reporter reporter)
                throws IOException {
    
            // note that values are sorted.
            // apply moving average algorithm to sorted timeseries
            Text outputKey = new Text();
            Text outputValue = new Text();
            //设置窗口的大小
            MovingAverage ma = new MovingAverage(this.windowSize);
            while (values.hasNext()) {
                TimeSeriesData data = values.next();
                ma.addNewNumber(data.getValue());
                //调用getMovingAverage函数获取平均值
                double movingAverage = ma.getMovingAverage();
                long timestamp = data.getTimestamp();
                String dateAsString = DateUtil.getDateAsString(timestamp);
                outputValue.set(dateAsString + "," + movingAverage);
                outputKey.set(key.getName());
                output.collect(outputKey, outputValue);
            }
        } 
    
    

    基于传统的内存式Scala实现

    //基于内存排序的传统Scala形式
     def main(args: Array[String]): Unit = {
        if (args.size < 3) {
          println("Usage: MovingAverageInMemory <window> <input-dir> <output-dir>")
          sys.exit(1)
        }
        //定义一个sparkConf对象
        val sparkConf = new SparkConf().setAppName("MovingAverageInMemory")
        val sc = new SparkContext(sparkConf)
    
        val window = args(0).toInt
        val input = args(1)
        val output = args(2)
        //把窗口的数量设置为一个广播的变量
        val brodcastWindow = sc.broadcast(window)
    
        val rawData = sc.textFile(input)
        val keyValue = rawData.map(line => {
          val tokens = line.split(",")
          (tokens(0), (tokens(1), tokens(2).toDouble))
        })
    
        // 对Key进行排序,key的格式如:IBM, GOOG, AAPL, etc
        val groupByStockSymbol = keyValue.groupByKey() 
    
        val result = groupByStockSymbol.mapValues(values => {
          val dateFormat = new java.text.SimpleDateFormat("yyyy-MM-dd")      
          // 在内存中排序,mapValues只对value操作,其中s._1为时间,s._2为价格
          val sortedValues = values.map(s => (dateFormat.parse(s._1).getTime.toLong, s._2)).toSeq.sortBy(_._1) 
          val queue = new scala.collection.mutable.Queue[Double]()
          //把value添加到队列中,之后判断队列的数量
          for (tup <- sortedValues) yield {
            queue.enqueue(tup._2)
            if (queue.size > brodcastWindow.value)
              queue.dequeue
    
            (dateFormat.format(new java.util.Date(tup._1)), (queue.sum / queue.size))
          }
        })
    
        // 输出整个结果值
        // <stock_symbol><,><date><,><moving_average>
        val formattedResult = result.flatMap(kv => {
          kv._2.map(v => (kv._1 + "," + v._1 + "," + v._2.toString()))
        })
        formattedResult.saveAsTextFile(output)
        
        //done 
        sc.stop()
      }
    
    

    基于分区分组的Scala实现

    //基于二次排序的Scala实现
     def main(args: Array[String]): Unit = {
        if (args.size < 4) {
          println("Usage: MemoryMovingAverage <window> <number-of-partitions> <input-dir> <output-dir>")
          sys.exit(1)
        }
    
        val sparkConf = new SparkConf().setAppName("MovingAverage")
        val sc = new SparkContext(sparkConf)
    
        val window = args(0).toInt
        val numPartitions = args(1).toInt // number of partitions in secondary sorting, choose a high value
        val input = args(2)
        val output = args(3)
    
        val brodcastWindow = sc.broadcast(window)
    
        val rawData = sc.textFile(input)
    
        // Key contains part of value (closing date in this case)
        val valueTokey = rawData.map(line => {
          val tokens = line.split(",")
          val dateFormat = new java.text.SimpleDateFormat("yyyy-MM-dd")
          val timestamp = dateFormat.parse(tokens(1)).getTime
          (CompositeKey(tokens(0), timestamp), TimeSeriesData(timestamp, tokens(2).toDouble))
        })
    
        // 进行分区再分组的二次排序,其中CompositeKeyPartitioner进行分区,CompositeKey进行内排序
        val sortedData = valueTokey.repartitionAndSortWithinPartitions(new CompositeKeyPartitioner(numPartitions))
        //获得(CompositeKey(stockSymbol, timestamp),TimeSeriesData(timestamp, price)
        val keyValue = sortedData.map(k => (k._1.stockSymbol, (k._2)))
        val groupByStockSymbol = keyValue.groupByKey()
    
        //对value进行操作,与内存的不一样的是,这里没用到sort
        val movingAverage = groupByStockSymbol.mapValues(values => {
          val dateFormat = new java.text.SimpleDateFormat("yyyy-MM-dd")
          val queue = new scala.collection.mutable.Queue[Double]()
          for (TimeSeriesData <- values) yield {
            queue.enqueue(TimeSeriesData.closingStockPrice)
            if (queue.size > brodcastWindow.value)
              queue.dequeue
    
            (dateFormat.format(new java.util.Date(TimeSeriesData.timeStamp)), (queue.sum / queue.size))
          }
        })
        
        // output will be in CSV format
        // <stock_symbol><,><date><,><moving_average>
        val formattedResult = movingAverage.flatMap(kv => {
          kv._2.map(v => (kv._1 + "," + v._1 + "," + v._2.toString()))
        })
        formattedResult.saveAsTextFile(output)
        
        // done
        sc.stop()
      }
    
    }
    
    // 定义两个Case class的javabean
    case class CompositeKey(stockSymbol: String, timeStamp: Long)
    case class TimeSeriesData(timeStamp: Long, closingStockPrice: Double)
    
    // 定义CompositeKey的排序函数,也即可是组内排序
    object CompositeKey {
      implicit def ordering[A <: CompositeKey]: Ordering[A] = {
        Ordering.by(fk => (fk.stockSymbol, fk.timeStamp))
      }
    }
    
    
    //---------------------------------------------------------
    // the following class defines a custom partitioner by
    // extending abstract class org.apache.spark.Partitioner
    //---------------------------------------------------------
    import org.apache.spark.Partitioner
    
    class CompositeKeyPartitioner(partitions: Int) extends Partitioner {
      require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")
    
      def numPartitions: Int = partitions
      //getPartition用于划定分区
      def getPartition(key: Any): Int = key match {
        case k: CompositeKey => math.abs(k.stockSymbol.hashCode % numPartitions)
        case null            => 0
        case _               => math.abs(key.hashCode % numPartitions)
      }
    
      override def equals(other: Any): Boolean = other match {
        case h: CompositeKeyPartitioner => h.numPartitions == numPartitions
        case _                          => false
      }
    
      override def hashCode: Int = numPartitions
    }
    
    
    

    相关文章

      网友评论

          本文标题:数据算法 Hadoop/Spark大数据处理---第六章

          本文链接:https://www.haomeiwen.com/subject/kcvgnxtx.html