spark-sortBy算子

作者: 小草莓子桑 | 来源:发表于2021-04-22 01:30 被阅读0次

    上一篇已经简单说过了map等几个算子。今天,我们来了解一下sortBy算子,他们也是Transformation算子

    模拟一个场景,来讲各个Transformation的API

    在这里我们模拟一个统计网站页面浏览情况的需求。

    业务场景

    我们这里使用网站浏览日志作为业务场景,就用简单的三个字段页面埋点id、用户id、停留时长


    业务场景
    日志格式

    假定网站浏览日志以HDFS形式通过我们的系统埋点到我们数据仓库中了,埋点日志的数据格式如下:


    日志格式

    假定我们代码中已经使用这样的格式,把日志记录到服务器上,并通过flume上传到了HDFS服务器上

    页面id|用户id|停留时长
    index|2|6
    表示:用户id为2的用户访问了index页面,并停留了6秒钟
    

    这次,我们包装一个实体类,BrowserLogInfo,字段分别为url(用户访问页面),userId(用户id),time(停留时长),来映射我们日志中的三个字段,并通过map算子把日志文档读取写入到一个BrowserLogInfo对象的rdd中,直接上代码

    • 实体类BrowserLogInfo代码
    • 这里注意java定义的实体类上需要实现scala.Serializable接口,否则会报object not serializable异常
    - object not serializable (class: com.edu.spark.entity.BrowserLogInfo, value: BrowserLogInfo{url='index', userId=1, time=3})
        - element of array (index: 0)
        - array (class [Lcom.edu.spark.entity.BrowserLogInfo;, size 18)
    

    如图所示:


    object not serializable异常
    • 实体类BrowserLogInfo代码
    package com.edu.spark.entity;
    
    import scala.Serializable;
    
    /**
     * 用户浏览日志实体
     * @author xiaocaomei
     * @date 2021/4/22
     * @description
     */
    public class BrowserLogInfo implements Serializable {
    
        /**
         * 访问url
         */
        private String url;
        /**
         * 用户id
         */
        private long userId;
        /**
         * 浏览时长
         */
        private long time;
    
        public String getUrl() {
            return url;
        }
    
        public void setUrl(String url) {
            this.url = url;
        }
    
        public long getUserId() {
            return userId;
        }
    
        public void setUserId(long userId) {
            this.userId = userId;
        }
    
        public long getTime() {
            return time;
        }
    
        public void setTime(long time) {
            this.time = time;
        }
    
        @Override
        public String toString() {
            return "BrowserLogInfo{" +
                    "url='" + url + '\'' +
                    ", userId=" + userId +
                    ", time=" + time +
                    '}';
        }
    }
    
    • map算子转化代码代码
    object LogProcess {
    
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
    
        /**
          * 如果这个参数不设置,默认认为你运行的是集群模式
          * 如果设置成local代表运行的是local模式
          */
        conf.setMaster("local")
    
        //设置任务名
        conf.setAppName("EduSpark")
        //创建SparkCore的程序入口
        val sc = new SparkContext(conf)
        //读取文件 生成RDD
        val file : RDD[String] = sc.textFile("F:\\hdfs\\hello.log")
        println(file.collect().toBuffer)
    
        //把每一行数据按照 | 分割
        val traceRdd : RDD[BrowserLogInfo] = file.map(s => {
          //把每一行数据按照 | 分割
          //注意这里 | 需要使用转义字符
          var sArray: Array[String] = s.split("\\|")
          //split函数的作用是 通过|分隔字符串返回数组
          val info = new BrowserLogInfo
          //  sArray 数组为 url | userId | time
          info.setUrl(sArray(0))
          // string.toLong 是用来把字符串转化成long
          info.setUserId(sArray(1).toLong)
          info.setTime(sArray(2).toLong)
          println(info)
          info
        })
    
        println(traceRdd.collect().toBuffer)
        sc.stop()
      }
    }
    
    • 结果如下:
    • 每一项遍历的对象


      每一项对象
    • 整个list结果


      整个list结果

    下面接入正题,来看sortBy算子

    • sortBy就是用来对数据进行排序的算子
    • 先直接上代码,来看使用方式吧
    • 代码还是接上段,把日志读取出来后,转成了一个BrowserLogInfo对象的rdd,现在使用sortBy算子,对已经读取转成BrowserLogInfo对象的数据,按照time字段,用户浏览时长进行升序排序
    直接上代码举栗子
    object LogProcess {
    
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
    
        /**
          * 如果这个参数不设置,默认认为你运行的是集群模式
          * 如果设置成local代表运行的是local模式
          */
        conf.setMaster("local")
    
        //设置任务名
        conf.setAppName("EduSpark")
        //创建SparkCore的程序入口
        val sc = new SparkContext(conf)
        //读取文件 生成RDD
        val file : RDD[String] = sc.textFile("F:\\hdfs\\hello.log")
        println(file.collect().toBuffer)
    
        //把每一行数据按照 | 分割
        val traceRdd : RDD[BrowserLogInfo] = file.map(s => {
          //把每一行数据按照 | 分割
          //注意这里 | 需要使用转义字符
          var sArray: Array[String] = s.split("\\|")
          //split函数的作用是 通过|分隔字符串返回数组
          val info = new BrowserLogInfo
          //  sArray 数组为 url | userId | time
          info.setUrl(sArray(0))
          // string.toLong 是用来把字符串转化成long
          info.setUserId(sArray(1).toLong)
          info.setTime(sArray(2).toLong)
          println(info)
          info
        })
    
        println(traceRdd.collect().toBuffer)
    
        //按照对象BrowserLogInfo对象中的time字段进行升序排序
        //按照用户浏览时长 来升序 排序 浏览数据
        //第一个参数:函数,这里使用getTime获取time字段
        //第二个参数:ascending = true 升序,false降序
        val sortRdd = traceRdd.sortBy(x => x.getTime,ascending = true)
    
        println(sortRdd.collect().toBuffer)
        sc.stop()
      }
    }
    
    • 结果如下:


      运行结果
    • 整个数据按照用户浏览量time字段进行了升序排序
    sortBy使用方式

    使用代码如下:

     //按照对象BrowserLogInfo对象中的time字段进行升序排序
        //按照用户浏览时长 来升序 排序 浏览数据
        //第一个参数:函数,这里使用getTime获取time字段
        //第二个参数:ascending = true 升序,false降序
        val sortRdd = traceRdd.sortBy(x => x.getTime,ascending = true)
    
    • 算子的用法为:traceRdd.sortBy(x => x.getTime,ascending = true)
    • 第一个参数为函数,这里就简单使用了getTime获取time字段
    • 第二个参数为升序降序方式,ascending = true 升序,false降序

    spark中的sortBy算子就简单给大家说到这里,欢迎大家来交流,指出文中一些说错的地方,让我加深认识,愿大家没有bug,谢谢!

    相关文章

      网友评论

        本文标题:spark-sortBy算子

        本文链接:https://www.haomeiwen.com/subject/gqwuwhtx.html