美文网首页
Spark日志分析案例

Spark日志分析案例

作者: hipeer | 来源:发表于2018-11-01 14:03 被阅读0次

    通过日志分析每个移动设备对服务器的访问的总上行流量,下行流量。然后先根据上行流量倒排序,如果相等就根据下行流量倒排序,如果上行流量和下行流量都相等,就根据时间戳排序。

    AppAccessLog.java

    该类是整个Spark应用的主类,这里面主要写业务逻辑代码。包含计算总上行和下行流量,RDD的一些操作等。

    import java.util.ArrayList;
    import java.util.List;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.RowFactory;
    import org.apache.spark.sql.SparkSession;
    import org.apache.spark.sql.types.DataTypes;
    import org.apache.spark.sql.types.StructField;
    import org.apache.spark.sql.types.StructType;
    
    import scala.Tuple2;
    import scala.Tuple4;
    
    public class AppAccessLog {
    
        public static void main(String[] args) {
            
            SparkConf conf = new SparkConf().setAppName("AppAccessLog");
            
            JavaSparkContext sc = new JavaSparkContext(conf);
            SparkSession spark = SparkSession.builder().enableHiveSupport().getOrCreate();
            
            // create RDD
            JavaRDD<String> AppAccessLogRDD = sc.textFile("hdfs:///temp/data/access.log");
            
            // transform into PairRDD
            JavaPairRDD<String, AppAccessLogInfo> AppAccessLogPairRDD = 
                    mapToPairRDD(AppAccessLogRDD);
            
            // aggregate DeviceID 
            JavaPairRDD<String, AppAccessLogInfo> AppAccessAggregatePairRDD = 
                    agggregateToPairRDD(AppAccessLogPairRDD);
            
            // transform into sortByKeyPairRDD 
            JavaPairRDD<AppAccessLogSortInfo, String> AppAccessSortByKeyLogPairRDD = 
                    mapToSortByKeyPairRDD(AppAccessAggregatePairRDD);
            
            // transformation sortBykey
            JavaPairRDD<AppAccessLogSortInfo, String> resultRDD = 
                    AppAccessSortByKeyLogPairRDD.sortByKey(false);
            
            // get Top 10
            List<Tuple2<AppAccessLogSortInfo, String>> top10 = resultRDD.take(10);
            
            // print Top 10
            for(Tuple2<AppAccessLogSortInfo, String> t : top10) {
                System.out.println(t._2+" " + t._1.getUpTraffic() + " " + t._1.getDownTraffic() + " " + t._1.getTimpStamp());
            }
            
            // JDK 1.8
            // top10.forEach(t -> System.out.println(t._2+" " + t._1.getUpTraffic() + " " + t._1.getDownTraffic() + " " + t._1.getTimpStamp()));
            
            // create RowRDD
            JavaRDD<Row> rowRDD = mapToRowRDD(resultRDD);
            // create schema
            ArrayList<StructField> fields = getColumnName();
            StructType schema = DataTypes.createStructType(fields);
            // create DataFrame
            Dataset<Row> logDF = spark.createDataFrame(rowRDD, schema);
            
            // save to Hive
            logDF.write().mode("overwrite").saveAsTable("test.log");
            
            spark.close();
            sc.close();
        }
    
        // 
        private static ArrayList<StructField> getColumnName() {
            ArrayList<StructField> fields = new ArrayList<StructField>();  
            StructField field = null;  
            field = DataTypes.createStructField("timeStame", DataTypes.StringType, true);  
            fields.add(field);  
            field = DataTypes.createStructField("DeviceID", DataTypes.StringType, true);  
            fields.add(field);  
            field = DataTypes.createStructField("upTraffic", DataTypes.StringType, true);  
            fields.add(field);
            field = DataTypes.createStructField("downTraffic", DataTypes.StringType, true);  
            fields.add(field);
            return fields;
        }
    
        // transform resultRDD into RowRDD
        private static JavaRDD<Row> mapToRowRDD(JavaPairRDD<AppAccessLogSortInfo, String> resultRDD) {
    
            JavaRDD<Tuple4<String, String, String, String>> tempRDD = resultRDD
                    .map(new Function<Tuple2<AppAccessLogSortInfo, String>, Tuple4<String, String, String, String>>() {
    
                        private static final long serialVersionUID = 7952741378495112332L;
    
                        @Override
                        public Tuple4<String, String, String, String> call(Tuple2<AppAccessLogSortInfo, String> tuple)
                                throws Exception {
    
                            String DeviceID = tuple._2;
                            AppAccessLogSortInfo accessLogSortInfo = tuple._1;
    
                            return new Tuple4<String, String, String, String>(
                                    String.valueOf(accessLogSortInfo.getTimpStamp()), DeviceID,
                                    String.valueOf(accessLogSortInfo.getUpTraffic()),
                                    String.valueOf(accessLogSortInfo.getDownTraffic()));
                        }
                    });
            return tempRDD.map(new Function<Tuple4<String, String, String, String>, Row>() {
    
                private static final long serialVersionUID = -1227536252899303985L;
    
                @Override
                public Row call(Tuple4<String, String, String, String> tuple) throws Exception {
    
                    return RowFactory.create(tuple._1(), tuple._2(), tuple._3(), tuple._4());
                }
            });
            
        }
    
        // aggregate DeviceID calculate total of upTraffic/downTraffic and select
        // minimum timeStamp
        private static JavaPairRDD<String, AppAccessLogInfo> agggregateToPairRDD(
                JavaPairRDD<String, AppAccessLogInfo> appAccessLogPairRDD) {
    
            // transformation reduceByKey
            return appAccessLogPairRDD.reduceByKey(new Function2<AppAccessLogInfo, AppAccessLogInfo, AppAccessLogInfo>() {
    
                private static final long serialVersionUID = -8552789221394152834L;
    
                @Override
                public AppAccessLogInfo call(AppAccessLogInfo v1, AppAccessLogInfo v2) throws Exception {
    
                    Long timeStamp = v1.getTimeStamp() > v2.getTimeStamp() ? v2.getTimeStamp() : v1.getTimeStamp();
                    Long upTraffic = v1.getUpTraffic() + v2.getUpTraffic();
                    Long downTraffic = v1.getDownTraffic() + v2.getDownTraffic();
    
                    AppAccessLogInfo accessLogInfo = new AppAccessLogInfo();
                    accessLogInfo.setUpTraffic(upTraffic);
                    accessLogInfo.setDownTraffic(downTraffic);
                    accessLogInfo.setTimeStamp(timeStamp);
    
                    return accessLogInfo;
                }
            });
        }
    
        // transform AppAccessLogPairRDD into AppAccessSortByKeyLogPairRDD
        private static JavaPairRDD<AppAccessLogSortInfo, String> mapToSortByKeyPairRDD(
                JavaPairRDD<String, AppAccessLogInfo> AppAccessAggregatePairRDD) {
    
            // transformation mapToPair
            return AppAccessAggregatePairRDD
                    .mapToPair(new PairFunction<Tuple2<String, AppAccessLogInfo>, AppAccessLogSortInfo, String>() {
    
                        private static final long serialVersionUID = -4778843695438540948L;
    
                        @Override
                        public Tuple2<AppAccessLogSortInfo, String> call(Tuple2<String, AppAccessLogInfo> tuple)
                                throws Exception {
    
                            String DeviceID = tuple._1;
                            AppAccessLogInfo appAccessLogInfo = tuple._2;
    
                            AppAccessLogSortInfo accessLogSortInfo = new AppAccessLogSortInfo();
                            accessLogSortInfo.setTimpStamp(appAccessLogInfo.getTimeStamp());
                            accessLogSortInfo.setUpTraffic(appAccessLogInfo.getUpTraffic());
                            accessLogSortInfo.setDownTraffic(appAccessLogInfo.getDownTraffic());
    
                            return new Tuple2<AppAccessLogSortInfo, String>(accessLogSortInfo, DeviceID);
                        }
                    });
        }
    
        // transform AppAccessLogRDD into AppAccessLogPairRDD
        private static JavaPairRDD<String, AppAccessLogInfo> mapToPairRDD(JavaRDD<String> AppAccessLogRDD) {
    
            // transformation mapToPair
            return AppAccessLogRDD.mapToPair(new PairFunction<String, String, AppAccessLogInfo>() {
    
                private static final long serialVersionUID = 5998646612001714125L;
    
                @Override
                public Tuple2<String, AppAccessLogInfo> call(String line) throws Exception {
    
                    String[] lineSplitArray = line.split("\t");
    
                    String DeviceID = lineSplitArray[1];
                    Long timeStamp = Long.valueOf(lineSplitArray[0]);
                    Long upTraffic = Long.valueOf(lineSplitArray[2]);
                    Long downTraffic = Long.valueOf(lineSplitArray[3]);
    
                    AppAccessLogInfo appAccessLogInfo = new AppAccessLogInfo();
                    appAccessLogInfo.setTimeStamp(timeStamp);
                    appAccessLogInfo.setUpTraffic(upTraffic);
                    appAccessLogInfo.setDownTraffic(downTraffic);
    
                    return new Tuple2<String, AppAccessLogInfo>(DeviceID, appAccessLogInfo);
                }
            });
        }
    }
    
    

    这个类中除了mian方法以外,还有三个比较重要的方法,mapToPairRDDagggregateToPairRDDmapToSortByKeyPairRDD

    1. 从main方法开始看,首先需要创建一个SparkContext,然后通过SparkContext来创建初始RDD。在这个过程中。Spark会完成一系列的初始化工作,包括向Master注册Application,启动Excutor,以及Excutor的反向注册等。

    2. 接下来,调用mapToPair方法把初始RDD转换成PairRDD,为了后面做聚合操作,在这里,用一个实体类AppAccessLogInfo把每条记录的upTraffic,downTraffic,timeStamp进行封装,然后,使用DeviceID来作为Key,AppAccessLogInfo类对象作为值,最终得到一个PairRDD。

    3. 然后,调用agggregateToPairRDD方法对AppAccessLogPairRDD做聚合操作,在这个方法中,调用了AppAccessLogPairRDD的reduceByKey方法通过DeviceID(设备ID)来计算每台设备的总上行流量/下行流量,由于每台设备对应多个访问时间戳,在这里取最小的当作后面排序的依据。

    4. 当做完reduceByKey之后,就需要对总上行流量,总下行流量,时间戳进行排序了。首先,要想使用PairRDD的sortByKey方法,需要改变RDD的结构,这里需要调用mapToSortByKeyPairRDD方法,该方法需要另一个实体类AppAccessLogSortInfo,把使用aggragateToPairRDD方法得到的AppAccessAggregatePairRDD中的每个Tuple2类型的元素的value所封装的信息提取出来,并把这些信息重新使用AppAccessLogSortInfo类来封装,来组成一个能够实现Key排序的AppAccessLogSortByKeyPairRDD。

    5. 最后调用AppAccessLogSortByKeyPairRDD的sortByKey方法排序,并获取访问流量最大的前十个设备。

    6. 把最后结果保存到Hive,首先需要把resultRDD转换成JavaRDD<Row> 类型的RDD,在 mapToRowRDD这个方法中,把所有的数据都提取出来,并把每一行包装成Tuple4类型(四个字段),然后使用RowFactory.create()方法,把每一个Tuple4转换成Row对象(要把Tuple4的每一个元素都传进RowFactory.create()方法),就得到了JavaRDD<Row>类型的RDD。接下来需要创建Schema,首先要对Row对象的每一个元素创建StructField对象也就是field(列名),使用DataTypes.createStructField()方法,其中第一个参数是列名,第二个参数是列字段类型,第三个参数代表是否允许为空。最后使用DataTypes.createStructTypes()方法创建Schema,该方法的参数是一个存放StructField对象的集合(里面存放着RowRDD每一列的列名),返回值为StructType。最后调用SparkSession对象的createDataFrame方法,创建一个DataFrame,其中第一个参数是RowRDD,第二个参数是Schema,返回值是Dataset<Row>类型。然后把创建好的DataFrame保存到Hive表

    AppAccessLogInfo.java

    由于该类对象其实是作为PairRDD的value,需要在网络间传输,所以需要实现Serializable接口,使之能够进行序列化和反序列化。

    import java.io.Serializable;
    
    public class AppAccessLogInfo implements Serializable{
    
        private static final long serialVersionUID = 2298114085058810487L;
    
        private Long timeStamp;
        private Long upTraffic;
        private Long downTraffic;
        
        public AppAccessLogInfo() {}
        
        public AppAccessLogInfo(Long timeStamp, Long upTraffic, Long downTraffic) {
            this.timeStamp = timeStamp;
            this.upTraffic = upTraffic;
            this.downTraffic = downTraffic;
        }
        
        public Long getTimeStamp() {
            return timeStamp;
        }
    
        public void setTimeStamp(Long timeStamp) {
            this.timeStamp = timeStamp;
        }
        public Long getUpTraffic() {
            return upTraffic;
        }
    
        public void setUpTraffic(Long upTraffic) {
            this.upTraffic = upTraffic;
        }
    
        public Long getDownTraffic() {
            return downTraffic;
        }
    
        public void setDownTraffic(Long downTraffic) {
            this.downTraffic = downTraffic;
        }
            
    }
    
    
    AppAccessLogSortInfo.java

    和AppAccessLogInfo类一样,这个类也需要实现Serializable接口,同时还需要实现Ordered接口,因为需要对此类的对象进行排序。

    import java.io.Serializable;
    
    import scala.math.Ordered;
    
    /**
     *      need implements Ordered interface and Serializable interface
     *
     */
    public class AppAccessLogSortInfo implements Ordered<AppAccessLogSortInfo>, Serializable {
    
        private static final long serialVersionUID = 7006437160384780829L;
    
        private Long timeStamp;
        private Long upTraffic;
        private Long downTraffic;
    
        public AppAccessLogSortInfo() {}
    
        public AppAccessLogSortInfo(Long timeStamp, Long upTraffic, Long downTraffic) {
            super();
            this.timeStamp = timeStamp;
            this.upTraffic = upTraffic;
            this.downTraffic = downTraffic;
        }
    
        @Override
        public boolean $greater(AppAccessLogSortInfo other) {
            if (upTraffic > other.upTraffic) {
                return true;
            } else if (upTraffic == other.upTraffic && downTraffic > other.downTraffic) {
                return true;
            } else if (upTraffic == other.upTraffic && downTraffic == other.downTraffic && timeStamp > other.timeStamp) {
                return true;
            }
            return false;
        }
    
        @Override
        public boolean $greater$eq(AppAccessLogSortInfo other) {
            if($greater(other)) {
                return true;
            } else if (upTraffic == other.upTraffic && downTraffic == other.downTraffic && timeStamp == other.timeStamp){ 
                return true;
            }
            return false;
        }
    
        @Override
        public boolean $less(AppAccessLogSortInfo other) {
            if(upTraffic < other.upTraffic) {
                return true;
            } else if (upTraffic == other.upTraffic && downTraffic < other.downTraffic) {
                return true;
            } else if (upTraffic == other.upTraffic && downTraffic == other.downTraffic && timeStamp < other.timeStamp) {
                return true;
            }
            return false;
        }
    
        @Override
        public boolean $less$eq(AppAccessLogSortInfo other) {
            if($less(other)) {
                return true;
            } else if (upTraffic == other.upTraffic && downTraffic == other.downTraffic && timeStamp == other.timeStamp) {
                return true;
            }
            return false;
        }
    
        @Override
        public int compare(AppAccessLogSortInfo other) {
            
            int timeStampGap = (int) (timeStamp - other.timeStamp);
            int upTrafficGap = (int) (upTraffic - other.upTraffic);
            int downTrafficGap = (int) (downTraffic - other.downTraffic);
            
            if(upTrafficGap != 0) {
                return upTrafficGap;
            } else if (downTrafficGap != 0) {
                return downTrafficGap;
            } else if (timeStampGap != 0) {
                return timeStampGap;
            }
            return 0;
        }
    
        @Override
        public int compareTo(AppAccessLogSortInfo other) {
        
            int timeStampGap = (int) (timeStamp - other.timeStamp);
            int upTrafficGap = (int) (upTraffic - other.upTraffic);
            int downTrafficGap = (int) (downTraffic - other.downTraffic);
            
            if(upTrafficGap != 0) {
                return upTrafficGap;
            } else if (downTrafficGap != 0) {
                return downTrafficGap;
            } else if (timeStampGap != 0) {
                return timeStampGap;
            }
            return 0;
        }
    
        public Long getTimpStamp() {
            return timeStamp;
        }
    
        public void setTimpStamp(Long timpStamp) {
            this.timeStamp = timpStamp;
        }
    
        public Long getUpTraffic() {
            return upTraffic;
        }
    
        public void setUpTraffic(Long upTraffic) {
            this.upTraffic = upTraffic;
        }
    
        public Long getDownTraffic() {
            return downTraffic;
        }
    
        public void setDownTraffic(Long downTraffic) {
            this.downTraffic = downTraffic;
        }
    
        @Override
        public int hashCode() {
            final int prime = 31;
            int result = 1;
            result = prime * result + ((downTraffic == null) ? 0 : downTraffic.hashCode());
            result = prime * result + ((timeStamp == null) ? 0 : timeStamp.hashCode());
            result = prime * result + ((upTraffic == null) ? 0 : upTraffic.hashCode());
            return result;
        }
    
        @Override
        public boolean equals(Object obj) {
            if (this == obj)
                return true;
            if (obj == null)
                return false;
            if (getClass() != obj.getClass())
                return false;
            AppAccessLogSortInfo other = (AppAccessLogSortInfo) obj;
            if (downTraffic == null) {
                if (other.downTraffic != null)
                    return false;
            } else if (!downTraffic.equals(other.downTraffic))
                return false;
            if (timeStamp == null) {
                if (other.timeStamp != null)
                    return false;
            } else if (!timeStamp.equals(other.timeStamp))
                return false;
            if (upTraffic == null) {
                if (other.upTraffic != null)
                    return false;
            } else if (!upTraffic.equals(other.upTraffic))
                return false;
            return true;
        }
    
    }
    
    

    相关文章

      网友评论

          本文标题:Spark日志分析案例

          本文链接:https://www.haomeiwen.com/subject/ywatxqtx.html