平台:IDEA2016
语言:java
代码链接:https://pan.baidu.com/s/1dEBquiP
关于数据的生成
数据非实际数据,由java代码生成。分别为:用户设备号UUID,时间戳,上下流量和下行流量。关键代码如下。
public static void main(String[] args) {
StringBuffer sb = new StringBuffer();
Random rand = new Random();
List<String> device = new ArrayList<String>();
for(int i=0;i<100;i++){
device.add(getUUID());
}
for(int j=0;j<1000;j++){
Calendar cal = Calendar.getInstance();
cal.setTime(new Date());
cal.add(Calendar.MINUTE,-rand.nextInt(6000));
long timeStamp =cal.getTimeInMillis();
String deviceId = device.get(rand.nextInt(100));
long upTraffic = rand.nextInt(100000);
long downTraffic= rand.nextInt(10000);
sb.append(timeStamp).append("\t").append(deviceId).append("\t").append(upTraffic).append("\t").append(downTraffic).append("\n");
}
PrintWriter pw = null;
try {
pw = new PrintWriter(new OutputStreamWriter(new FileOutputStream("d:\\app_log.txt")));
pw.write(sb.toString());
} catch (FileNotFoundException e) {
}finally {
pw.close();
}
}
public static String getUUID(){
return UUID.randomUUID().toString().replace("-","");
}
构建spark环境,读取在D盘生成的log日志
//1.创建spark配置文件和上下文对象
SparkConf conf = new SparkConf().setAppName("sparkTest").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
//2.读取日志文件并创建一个RDD,使用SparkContext的textFile()方法
JavaRDD<String> javaRDD = sc.textFile("D:\\app_log.txt");
将日志的RDD映射为key-value的格式
private static JavaPairRDD<String,AccessLogInfo> mapAccessLogRDD2Pair(JavaRDD<String> javaRDD){
//PairFunction中第一个string表示的是传入的参数,后面两个代表返回值javaRDD
return javaRDD.mapToPair(new PairFunction<String, String, AccessLogInfo>() {
private static final long serivaVersionUID = 1L;
@Override
//进行一行一行的读取
public Tuple2<String, AccessLogInfo> call(String javaRDD) throws Exception {
//根据\t进行切分
String[] accessLogSplited = javaRDD.split("\t");
//获取四个字段
long timestamp = Long.valueOf(accessLogSplited[0]);
String deviceID = accessLogSplited[1];
long upTraffic = Long.valueOf(accessLogSplited[2]);
long downTraffic = Long.valueOf(accessLogSplited[3]);
// 将时间戳,上行流量和下行流量封装为自定义的可序列化对象
AccessLogInfo accessLogInfo = new AccessLogInfo(timestamp,upTraffic,downTraffic);
return new Tuple2<String, AccessLogInfo>(deviceID,accessLogInfo);
}
});
}
根据deviceID进行聚合求出上行和下行的流量,及其最早访问的时间
private static JavaPairRDD<String,AccessLogInfo> aggregateByDeviceId( JavaPairRDD<String, AccessLogInfo> accessLogPairRdd){
//Function2的前两个accessLogInfo对应call的前两个,第三个是返回的
return accessLogPairRdd.reduceByKey(new Function2<AccessLogInfo, AccessLogInfo, AccessLogInfo>() {
private static final long serivaVersionUID = 1L;
@Override
public AccessLogInfo call(AccessLogInfo accessLogInfo1, AccessLogInfo accessLogInfo2) throws Exception {
long timestamp = accessLogInfo1.getTimestamp() < accessLogInfo2.getTimestamp()?accessLogInfo1.getTimestamp():accessLogInfo2.getTimestamp();
long upTraffic = accessLogInfo1.getUpTraffic()+accessLogInfo2.getUpTraffic();
long downTraffic=accessLogInfo1.getDownTraffic()+accessLogInfo2.getDownTraffic();
//进行聚合之后产生一个AccessLogInfo
AccessLogInfo accessLogInfo = new AccessLogInfo(timestamp,upTraffic,downTraffic);
return accessLogInfo;
}
});
}
根据key进行二次的排序,先按上行流量,在按下行流量,都一样的话按照时间戳排序,需要重写Ordered<AccessLogSortKey>的五个实现方法
@Override
public int compare(AccessLogSortKey other) {
if(upTraffic - other.upTraffic !=0){
return (int)(upTraffic - other.upTraffic);
}else if (downTraffic - other.downTraffic !=0){
return (int)(downTraffic - other.downTraffic );
}else if(timestamp -other.timestamp!=0){
return (int)(timestamp -other.timestamp);
}
return 0;
}
@Override
public int compareTo(AccessLogSortKey other) {
if(upTraffic - other.upTraffic !=0){
return (int)(upTraffic - other.upTraffic);
}else if (downTraffic - other.downTraffic !=0){
return (int)(downTraffic - other.downTraffic );
}else if(timestamp -other.timestamp!=0){
return (int)(timestamp -other.timestamp);
}
return 0;
}
@Override
public boolean $less(AccessLogSortKey other) {
if(upTraffic<other.upTraffic){
return true;
}else if(upTraffic==other.upTraffic&&downTraffic<other.downTraffic){
return true;
}else if(upTraffic==other.upTraffic&&downTraffic==other.downTraffic&×tamp<other.timestamp){
return true;
}
return false;
}
//这个是大于的方法
@Override
public boolean $greater(AccessLogSortKey other) {
if(upTraffic>other.upTraffic){
return true;
}else if(upTraffic==other.upTraffic&&downTraffic>other.downTraffic){
return true;
}else if(upTraffic==other.upTraffic&&downTraffic==other.downTraffic&×tamp>other.timestamp){
return true;
}
return false;
}
@Override
public boolean $less$eq(AccessLogSortKey other) {
if($less(other)){
return true;
}else if (upTraffic==other.upTraffic&&downTraffic==other.downTraffic&×tamp==other.timestamp){
return true;
}
return false;
}
@Override
public boolean $greater$eq(AccessLogSortKey other) {
if($greater(other)){
return true;
}else if (upTraffic==other.upTraffic&&downTraffic==other.downTraffic&×tamp==other.timestamp){
return true;
}
return false;
}
将RDD的key映射为二次排序的key,因为实现了Ordered所以自动排序
private static JavaPairRDD<AccessLogSortKey,String> mapRDDkey2SortKey(JavaPairRDD<String, AccessLogInfo> aggregateLogPairRDD){
//后两个为返回的参数
return aggregateLogPairRDD.mapToPair(new PairFunction<Tuple2<String,AccessLogInfo>, AccessLogSortKey,String>() {
private static final long serivaVersionUID = 1L;
@Override
//tuple的key是deviceID,value是AccessLogInfo
public Tuple2<AccessLogSortKey,String> call(Tuple2<String, AccessLogInfo> tuple ) throws Exception {
String deviceID= tuple._1;
AccessLogInfo accessLogInfo = tuple._2;
AccessLogSortKey accessLogSortKey = new AccessLogSortKey(accessLogInfo.getTimestamp(),accessLogInfo.getUpTraffic(),accessLogInfo.getDownTraffic());
//new 出去一个新的Tuple,这时候key变成了二次排序的key
return new Tuple2<AccessLogSortKey,String>(accessLogSortKey,deviceID);
}
});
}
Main方法
public static void main(String[] args) throws SQLException {
//1.创建spark配置文件和上下文对象
SparkConf conf = new SparkConf().setAppName("sparkTest").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
//2.读取日志文件并创建一个RDD,使用SparkContext的textFile()方法
JavaRDD<String> javaRDD = sc.textFile("D:\\app_log.txt");
//将RDD映射成为key-value格式,为后面的reducebykey聚合做准备。
JavaPairRDD<String, AccessLogInfo> accessLogPairRdd = mapAccessLogRDD2Pair(javaRDD);
//根据deviceID返回聚合后的结果
JavaPairRDD<String, AccessLogInfo> aggregateLogPairRDD = aggregateByDeviceId(accessLogPairRdd);
//将按照deviceID聚合的key映射为二次排序的key,value映射为deviceID
JavaPairRDD<AccessLogSortKey, String> accessLogSortRDD = mapRDDkey2SortKey(aggregateLogPairRDD);
///实现降序排序
JavaPairRDD<AccessLogSortKey, String> sortedAccessLogRDD= accessLogSortRDD.sortByKey(false);
//获取前 top 10
List<Tuple2<AccessLogSortKey, String>> top10DataList = sortedAccessLogRDD.take(10);
//创建dbhelp对象
db1 = new DBHelper();
String sql = "insert into spark(deviceId,upTraffic,downTraffic,timeStamp) values(?,?,?,?)";
//打印前top 10
for(Tuple2<AccessLogSortKey, String> data : top10DataList){
// System.out.println(data._2 +" "+data._1.getUpTraffic());
PreparedStatement pt = db1.conn.prepareStatement(sql);
pt.setString(1,data._2);
pt.setString(2,data._1.getUpTraffic()+"");
pt.setString(3,data._1.getDownTraffic()+"");
pt.setString(4,data._1.getTimestamp()+"");
//注意让pt执行
pt.executeUpdate();
}
//关闭上下文
sc.close();
}
JDBC连接数据库,把结果存储在mysql中,在AppLogSpark.class 中执行PreparedStatement
/**
* CREATE TABLE spark(
id INT(4) NOT NULL AUTO_INCREMENT PRIMARY KEY,
deviceId VARCHAR(255) ,
upTraffic VARCHAR(255) ,
downTraffic VARCHAR(255) ,
TIMESTAMP VARCHAR(255)
);
*/
public class DBHelper {
public static final String url = "jdbc:mysql://localhost:3306/test";
public static final String driver = "com.mysql.jdbc.Driver";
public static final String user = "root";
public static final String password ="*****";
//获取数据库链接
public Connection conn = null;
public DBHelper(){
try{
Class.forName(driver );
conn = DriverManager.getConnection(url,user,password);
}catch (Exception e){
e.printStackTrace();
}
}
public void close(){
try {
this.conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
最终结果如图
![](https://img.haomeiwen.com/i6231724/e9ec8553bebb4875.png)
网友评论