现有100W+条数据,格式如下:
3|2016年9月16日,星期五,23:13:09|192.168.1.102|那谁|武士|男|27|0|175510/800000000
3|2016年9月17日,星期六,22:33:16|192.168.1.102|那谁|武士|男|27|0|175510/800000000
2|2016年9月17日,星期六,22:52:50|192.168.1.102|那个谁|法师|男|47|10730|948179/800000000
3|2016年9月17日,星期六,23:04:24|192.168.1.102|那个谁|法师|男|999|10710|960648/800000000
2|2016年9月18日,星期日,09:33:40|192.168.1.102|那个谁|法师|男|999|10710|960648/800000000
3|2016年9月18日,星期日,10:30:34|192.168.1.102|那个谁|法师|女|999|10370|5709563/800000000
3|2016年9月18日,星期日,11:05:10|192.168.1.102|那个谁|法师|女|999|10370|5709563/800000000
2|2016年9月24日,星期六,12:17:30|192.168.1.102|那个谁|法师|女|999|10370|5709563/800000000
6|2016年9月24日,星期六,12:22:48|那个谁|0|160|10480
6|2016年9月24日,星期六,12:23:15|那个谁|0|2|10482
3|2016年9月24日,星期六,13:23:44|192.168.1.102|那个谁|法师|男|999|10462|5715522/800000000
1.查询一段时间内新上线的玩家
(1)方法1:(该方法较低级)
//起始时间
val startDate=args(0)
//结束时间
val endDate=args(1)
//查询条件
val dateDormat1=new SimpleDateFormat("yyyy-MM-dd")
//查询条件的起始时间
val startTime=dateDormat1.parse(startDate).getTime
//查询条件的截止时间
val endTime=dateDormat1.parse(endDate).getTime
val dateFormat2=new SimpleDateFormat("yyyy年MM月dd日,E,HH:mm:ss")
val conf=new SparkConf().setAppName("GameKPI").setMaster("local")
val sc=new SparkContext(conf)
//以后从哪里读取数据
val lines=sc.textFile(args(2))
//整理并过滤
val splited = lines.map(line => {
line.split("[|]")
})
//按日期过滤
val filted = splited.filter(fields => {
val t = fields(0)
val time = fields(1)
val timeLong = dateFormat2.parse(time).getTime
t.equals("1") && timeLong >= startTime && timeLong < endTime
})
val dnu=filted.count()
println(dnu)
sc.stop()
(2)方法二:该方法缺点是在map过程中不断地序列化,数量量大的时候操作缓慢
//起始时间
val startDate=args(0)
//结束时间
val endDate=args(1)
//查询条件
val dateDormat1=new SimpleDateFormat("yyyy-MM-dd")
//查询条件的起始时间
val startTime=dateDormat1.parse(startDate).getTime
//查询条件的截止时间
val endTime=dateDormat1.parse(endDate).getTime
val dateFormat2=new SimpleDateFormat("yyyy年MM月dd日,E,HH:mm:ss")
val conf=new SparkConf().setAppName("GameKPI").setMaster("local")
val sc=new SparkContext(conf)
//以后从哪里读取数据
val lines=sc.textFile(args(2))
//整理并过滤
val splited = lines.map(line => {
line.split("[|]")
})
//按日期过滤
val filtedByType = splited.filter(fields => {
//一个Task会创建很多的FilterUtils实例,因为处理一条数据就会创建一个实例
val fu =new FilterUtilsv3
fu.filterByType(fields,"1")
})
//按类型过滤
val filted = filtedByType.filter(fields => {
//一个Task会创建很多的FilterUtils实例,因为处理一条数据就会创建一个实例
val fu =new FilterUtilsv3
fu.filterByTime(fields, startTime, endTime)
})
val dnu=filted.count()
println(dnu)
sc.stop()
FilterUtilsv3类:
classFilterUtilsv4 {
//如果object使用了成员变量,那么会出现线程安全问题,因为object是一个单例,多线程可以同时使用
val dateFormat=new SimpleDateFormat("yyyy年MM月dd日,E,HH:mm:ss")
//按类型进行比较
def filterByType(fields:Array[String],tp:String)={
val _tp=fields(0)
_tp==tp
}
//按时间进行比较
def filterByTime(fields:Array[String],startTime:Long,endTime:Long)={
val time=fields(1)
val timeLong=dateFormat.parse(time).getTime
timeLong>=startTime&&timeLong
}
}
(3)在drive端创建实例,Executor使用的时候只会调用一次
//起始时间
val startDate=args(0)
//结束时间
val endDate=args(1)
//查询条件
val dateDormat1=new SimpleDateFormat("yyyy-MM-dd")
//查询条件的起始时间
val startTime=dateDormat1.parse(startDate).getTime
//查询条件的截止时间
val endTime=dateDormat1.parse(endDate).getTime
val dateFormat2=new SimpleDateFormat("yyyy年MM月dd日,E,HH:mm:ss")
val conf=new SparkConf().setAppName("GameKPI").setMaster("local[4]")
val sc=new SparkContext(conf)
//以后从哪里读取数据
val lines=sc.textFile(args(2))
//整理并过滤
val splited = lines.map(line => {
line.split("[|]")
})
//按日期过滤
val filtedByType = splited.filter(fields => {
FilterUtilsv4.filterByType(fields,"1")
})
//按类型过滤
val filted = filtedByType.filter(fields => {
FilterUtilsv4.filterByTime(fields, startTime, endTime)
})
val dnu=filted.count()
println(dnu)
sc.stop()
FilterUtilsv4类:
bject FilterUtilsv4 {
//如果object使用了成员变量,那么会出现线程安全问题,因为object是一个单例,多线程可以同时使用
//该方法是线程不安全的,当local【】中有多个线程时会抛异常
// val dateFormat=new SimpleDateFormat("yyyy年MM月dd日,E,HH:mm:ss")
//该线程是安全的
val dateFormat=FastDateFormat.getInstance("yyyy年MM月dd日,E,HH:mm:ss")
//按类型进行比较
def filterByType(fields:Array[String],tp:String)={
val _tp=fields(0)
_tp==tp
}
//按时间进行比较
def filterByTime(fields:Array[String],startTime:Long,endTime:Long)={
val time=fields(1)
val timeLong=dateFormat.parse(time).getTime
timeLong>=startTime&&timeLong
}
}
网友评论