美文网首页互联网&大数据应用学习大数据玩转大数据
基于spark的用户访问流量统计(java版)

基于spark的用户访问流量统计(java版)

作者: _Kantin | 来源:发表于2017-12-11 17:46 被阅读99次

平台:IDEA2016
语言:java
代码链接:https://pan.baidu.com/s/1dEBquiP

关于数据的生成

数据非实际数据,由java代码生成。分别为:用户设备号UUID,时间戳,上下流量和下行流量。关键代码如下。
 public static void main(String[] args) {
      StringBuffer sb = new StringBuffer();
      Random rand = new Random();
      List<String> device = new ArrayList<String>();
      for(int i=0;i<100;i++){
          device.add(getUUID());
      }
      for(int j=0;j<1000;j++){
          Calendar cal = Calendar.getInstance();
          cal.setTime(new Date());
          cal.add(Calendar.MINUTE,-rand.nextInt(6000));
          long timeStamp =cal.getTimeInMillis();
          String deviceId = device.get(rand.nextInt(100));

          long upTraffic = rand.nextInt(100000);
          long downTraffic= rand.nextInt(10000);
          sb.append(timeStamp).append("\t").append(deviceId).append("\t").append(upTraffic).append("\t").append(downTraffic).append("\n");
      }
      PrintWriter pw = null;
      try {
          pw = new PrintWriter(new OutputStreamWriter(new FileOutputStream("d:\\app_log.txt")));
          pw.write(sb.toString());
      } catch (FileNotFoundException e) {

      }finally {
          pw.close();
      }

  }
  public static  String getUUID(){
      return UUID.randomUUID().toString().replace("-","");
  }

构建spark环境,读取在D盘生成的log日志

 //1.创建spark配置文件和上下文对象
      SparkConf conf = new SparkConf().setAppName("sparkTest").setMaster("local");
      JavaSparkContext sc = new JavaSparkContext(conf);

      //2.读取日志文件并创建一个RDD,使用SparkContext的textFile()方法
      JavaRDD<String> javaRDD = sc.textFile("D:\\app_log.txt");

将日志的RDD映射为key-value的格式

 private static JavaPairRDD<String,AccessLogInfo>  mapAccessLogRDD2Pair(JavaRDD<String> javaRDD){
      //PairFunction中第一个string表示的是传入的参数,后面两个代表返回值javaRDD
      return javaRDD.mapToPair(new PairFunction<String, String, AccessLogInfo>() {

          private static  final  long  serivaVersionUID = 1L;
          @Override
          //进行一行一行的读取
          public Tuple2<String, AccessLogInfo> call(String javaRDD) throws Exception {
              //根据\t进行切分
              String[] accessLogSplited = javaRDD.split("\t");
              //获取四个字段
              long timestamp = Long.valueOf(accessLogSplited[0]);
              String deviceID = accessLogSplited[1];
              long upTraffic = Long.valueOf(accessLogSplited[2]);
              long downTraffic = Long.valueOf(accessLogSplited[3]);
              // 将时间戳,上行流量和下行流量封装为自定义的可序列化对象
              AccessLogInfo accessLogInfo = new AccessLogInfo(timestamp,upTraffic,downTraffic);
              return new Tuple2<String, AccessLogInfo>(deviceID,accessLogInfo);
          }
      });
}

根据deviceID进行聚合求出上行和下行的流量,及其最早访问的时间

 private static JavaPairRDD<String,AccessLogInfo> aggregateByDeviceId( JavaPairRDD<String, AccessLogInfo> accessLogPairRdd){
        //Function2的前两个accessLogInfo对应call的前两个,第三个是返回的
        return accessLogPairRdd.reduceByKey(new Function2<AccessLogInfo, AccessLogInfo, AccessLogInfo>() {
            private static  final  long  serivaVersionUID = 1L;
            @Override
            public AccessLogInfo call(AccessLogInfo accessLogInfo1, AccessLogInfo accessLogInfo2) throws Exception {
                long timestamp = accessLogInfo1.getTimestamp() < accessLogInfo2.getTimestamp()?accessLogInfo1.getTimestamp():accessLogInfo2.getTimestamp();
                long upTraffic = accessLogInfo1.getUpTraffic()+accessLogInfo2.getUpTraffic();
                long downTraffic=accessLogInfo1.getDownTraffic()+accessLogInfo2.getDownTraffic();
                //进行聚合之后产生一个AccessLogInfo
                AccessLogInfo accessLogInfo = new AccessLogInfo(timestamp,upTraffic,downTraffic);
                return accessLogInfo;
            }
        });
    }

根据key进行二次的排序,先按上行流量,在按下行流量,都一样的话按照时间戳排序,需要重写Ordered<AccessLogSortKey>的五个实现方法

  @Override
    public int compare(AccessLogSortKey other) {
        if(upTraffic - other.upTraffic !=0){
            return (int)(upTraffic - other.upTraffic);
        }else if (downTraffic - other.downTraffic !=0){
            return (int)(downTraffic - other.downTraffic );
        }else if(timestamp -other.timestamp!=0){
            return (int)(timestamp -other.timestamp);
        }
        return 0;
    }

    @Override
    public int compareTo(AccessLogSortKey other) {
        if(upTraffic - other.upTraffic !=0){
            return (int)(upTraffic - other.upTraffic);
        }else if (downTraffic - other.downTraffic !=0){
            return (int)(downTraffic - other.downTraffic );
        }else if(timestamp -other.timestamp!=0){
            return (int)(timestamp -other.timestamp);
        }
        return 0;
    }

    @Override
    public boolean $less(AccessLogSortKey other) {
        if(upTraffic<other.upTraffic){
            return true;
        }else if(upTraffic==other.upTraffic&&downTraffic<other.downTraffic){
            return true;
        }else if(upTraffic==other.upTraffic&&downTraffic==other.downTraffic&&timestamp<other.timestamp){
            return true;
        }
        return false;
    }

    //这个是大于的方法
    @Override
    public boolean $greater(AccessLogSortKey other) {
        if(upTraffic>other.upTraffic){
            return true;
        }else if(upTraffic==other.upTraffic&&downTraffic>other.downTraffic){
            return true;
        }else if(upTraffic==other.upTraffic&&downTraffic==other.downTraffic&&timestamp>other.timestamp){
            return true;
        }
        return false;
    }

    @Override
    public boolean $less$eq(AccessLogSortKey other) {
        if($less(other)){
            return true;
        }else if (upTraffic==other.upTraffic&&downTraffic==other.downTraffic&&timestamp==other.timestamp){
            return true;
        }
        return false;
    }

    @Override
    public boolean $greater$eq(AccessLogSortKey other) {
         if($greater(other)){
            return true;
        }else if (upTraffic==other.upTraffic&&downTraffic==other.downTraffic&&timestamp==other.timestamp){
             return true;
         }
         return false;
    }

将RDD的key映射为二次排序的key,因为实现了Ordered所以自动排序

 private  static  JavaPairRDD<AccessLogSortKey,String> mapRDDkey2SortKey(JavaPairRDD<String, AccessLogInfo> aggregateLogPairRDD){
        //后两个为返回的参数
      return  aggregateLogPairRDD.mapToPair(new PairFunction<Tuple2<String,AccessLogInfo>, AccessLogSortKey,String>() {
          private static  final  long  serivaVersionUID = 1L;
            @Override
            //tuple的key是deviceID,value是AccessLogInfo
            public Tuple2<AccessLogSortKey,String> call(Tuple2<String, AccessLogInfo> tuple ) throws Exception {
                String deviceID= tuple._1;
                AccessLogInfo accessLogInfo = tuple._2;
                AccessLogSortKey accessLogSortKey = new AccessLogSortKey(accessLogInfo.getTimestamp(),accessLogInfo.getUpTraffic(),accessLogInfo.getDownTraffic());
                //new 出去一个新的Tuple,这时候key变成了二次排序的key
                return new Tuple2<AccessLogSortKey,String>(accessLogSortKey,deviceID);
            }
        });
    }

Main方法

   public static void main(String[] args) throws SQLException {
        //1.创建spark配置文件和上下文对象
        SparkConf conf = new SparkConf().setAppName("sparkTest").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);

        //2.读取日志文件并创建一个RDD,使用SparkContext的textFile()方法
        JavaRDD<String> javaRDD = sc.textFile("D:\\app_log.txt");

        //将RDD映射成为key-value格式,为后面的reducebykey聚合做准备。
        JavaPairRDD<String, AccessLogInfo> accessLogPairRdd = mapAccessLogRDD2Pair(javaRDD);
        //根据deviceID返回聚合后的结果
        JavaPairRDD<String, AccessLogInfo> aggregateLogPairRDD = aggregateByDeviceId(accessLogPairRdd);

        //将按照deviceID聚合的key映射为二次排序的key,value映射为deviceID
        JavaPairRDD<AccessLogSortKey, String> accessLogSortRDD = mapRDDkey2SortKey(aggregateLogPairRDD);
        ///实现降序排序
        JavaPairRDD<AccessLogSortKey, String> sortedAccessLogRDD= accessLogSortRDD.sortByKey(false);
        //获取前 top 10
        List<Tuple2<AccessLogSortKey, String>> top10DataList = sortedAccessLogRDD.take(10);
        //创建dbhelp对象
        db1 = new DBHelper();

        String sql  = "insert into spark(deviceId,upTraffic,downTraffic,timeStamp) values(?,?,?,?)";

        //打印前top 10
        for(Tuple2<AccessLogSortKey, String> data :  top10DataList){
//            System.out.println(data._2 +" "+data._1.getUpTraffic());
            PreparedStatement pt  = db1.conn.prepareStatement(sql);
            pt.setString(1,data._2);
            pt.setString(2,data._1.getUpTraffic()+"");
            pt.setString(3,data._1.getDownTraffic()+"");
            pt.setString(4,data._1.getTimestamp()+"");
            //注意让pt执行
            pt.executeUpdate();
        }

        //关闭上下文
        sc.close();

    }

JDBC连接数据库,把结果存储在mysql中,在AppLogSpark.class 中执行PreparedStatement

  /**
 * CREATE TABLE spark(
     id INT(4) NOT NULL AUTO_INCREMENT PRIMARY KEY,
     deviceId VARCHAR(255) , 
     upTraffic VARCHAR(255) ,
     downTraffic VARCHAR(255) ,
     TIMESTAMP VARCHAR(255) 
 );
 */
public class DBHelper {
    public  static  final String url  = "jdbc:mysql://localhost:3306/test";
    public  static  final String driver  = "com.mysql.jdbc.Driver";
    public  static  final String user = "root";
    public  static  final String password ="*****";

    //获取数据库链接
    public Connection conn = null;
    public DBHelper(){
        try{
            Class.forName(driver );
            conn = DriverManager.getConnection(url,user,password);
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    public void close(){
        try {
            this.conn.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}

最终结果如图

image.png

相关文章

网友评论

    本文标题:基于spark的用户访问流量统计(java版)

    本文链接:https://www.haomeiwen.com/subject/fofwixtx.html