在之前Spark Streaming&Flume&Kafka打造通用流处理平台中已经一起学习了环境的搭建,接下来在此基础上做Spark Streaming处理日志实战。
模拟日志生成
通过python代码模拟实时生成日志,代码如下
generate_log.py:
#coding=UTF-8
import random
import time
url_paths=[
"class/112.html",
"class/128.html",
"class/145.html",
"class/130.html",
"class/146.html",
"class/131.html",
"learn/821",
"course/list"
]
ip_slices=[132,156,124,10,29,167,143,187,30,46,55,63,72,87,98,168]
http_referers=[
"https://www.baidu.com/s?wd={query}",
"https://www.sogou.com/web?query={query}",
"https://cn.bing.com/search?q={query}",
"https://www.so.com/s?q={query}"
]
search_keyword=[
"spark sql实战",
"hadoop 基础",
"storm实战",
"spark streaming实战"
]
status_code=["200","404","500"]
def sample_status_code():
return random.sample(status_code,1)[0]
def sample_referer():
if random.uniform(0,1)>0.2:
return "-"
refer_str=random.sample(http_referers,1)
query_str=random.sample(search_keyword,1)
return refer_str[0].format(query=query_str[0])
def sample_url():
return random.sample(url_paths,1)[0]
def sample_ip():
slice=random.sample(ip_slices,4)
return ".".join([str(item) for item in slice])
def generate_log(count=10):
time_str=time.strftime("%Y-%m-%d %H:%M:%S",time.localtime())
f=open("/root/data/streaming_access.log","w+")
while count >=1:
query_log="{ip}\t{local_time}\t\"GET /{url} HTTP/1.1\"\t{status_code}\t{refer}".format(url=sample_url(),ip=sample_ip(),refer=sample_referer(),status_code=sample_status_code(),local_time=time_str)
print(query_log)
f.write(query_log+"\n")
count=count-1
if __name__ == '__main__':
# 每一分钟生成一次日志信息
while True:
generate_log()
time.sleep(60)
这段代码的含义是每一分钟随机生成10条(可修改)日志并写入到本地文件中,这里的文件路径是我虚拟机中的路径,可随意变动,要和后面flume source的路径相同。生成的日志格式如下。
IP地址 | 时间 | 请求方式、url、引擎版本 | 状态码 | 搜索引擎提供商 |
---|---|---|---|---|
187.98.156.46 | 2019-05-29 | GET /class/112.html HTTP/1.1 | 500 | https://www.sogou.com/web?query=xx |
在服务器上通过以下命令可运行python程序
//前提是虚拟机已经安装了python的环境,要是没有安装可百度自行安装
python generate_log.py
环境测试
使用Spark Streaming&Flume&Kafka打造通用流处理平台中搭建的环境,唯一的区别是改造flume agent,在日志文件中读取生成的日志,在输出到Kafka,Spark Streaming在Kafka中消费并处理日志。
agent 修改
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command= tail -F /root/data/streaming_access.log
a1.sources.r1.shell = /bin/sh -c
# Describe the sink
#设置Kafka接收器
a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink
#设置Kafka的broker地址和端口号
a1.sinks.k1.brokerList=192.168.30.131:9092
#设置Kafka的Topic
a1.sinks.k1.topic=kafka_spark
#设置序列化方式
a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder
a1.sinks.k1.batchSize = 3
a1.sinks.k1.requiredAcks = 1
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
sources的路径要和日志生成程序文件写入路径一致。
测试
启动Kafka环境、flume agent、日志生成程序、本地Spark Streaming程序。
观察本地客户端日志,输出了产生的日志,说明环境可用。若没有输出,建议先阅读以前的课程。
image.png
日志处理
需求:
统计今天到目前为止从搜索引擎过来的课程的访问量
产品推销是很重要的一环,于是很多公司都会给各种各样的平台流水来为自己的产品做推销。但是怎么判断那个平台的推销效果比较适合自己呢?根据大数据实时处理推销平台过来的数据,根据推销平台和产品ID为key做一个访问量的Top排序。这样可增加效果好的平台的投资,结束和效果不好平台的合作。这样不仅大大增加了推销的效果还节约了成本。在这个例子中搜索引擎类似推销平台,课程类似产品。
处理结果存储选型
处理结果可存储在关系型数据库中,如myql oracle等
也可以存储在NoSql中,如hbase,redis等。
由于需求中有访问量累计的操作
若使用mysql,需要每次存储时,先根据主键把数据查询出来,做了相加之后在做更新,比较麻烦。
使用hbase,其中客户端在保存时有incrementColumnValue这个方法,自动与数据库中的记录相加,所以这里使用hbase。
安装配置 HBase
下载、解压、配置环境变量
conf/hbase-env.sh
修改JAVA_HOME
export HBASE_MANAGES_ZK=false
conf/hbase-site.xml:
<configuration>
<property>
<name>hbase.rootdir</name>
<value>hdfs://192.168.30.130:8092/hbase</value>
</property>
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>192.168.30.131:2181</value>
</property>
</configuration>
conf/regionservers:
localhost
HBase 建表
// 1 启动hbase,在此之前启动zookeeper,hadoop环境
start-hbase.sh
// 2 启动shell
hbase shell
// 3 建表
create 'course_search_clickcount','info'
// 4 查看数据表
list
// 5 查看数据表信息
describe 'course_search_clickcount'
// 6 查看表数据
scan 'course_search_clickcount'
业务开发
消费kafka数据、数据清洗与统计
1)实体类
CourseSearchClickCount.scala:
/**
* 从搜索引擎过来的课程点击数实体类
* @param day_search_course
* @param click_count
*/
case class CourseSearchClickCount(day_search_course: String, click_count: Long)
3)添加依赖
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>
<!-- hadoop -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
3)工具类
DateUtils.scala
mport java.util.Date
import org.apache.commons.lang3.time.FastDateFormat
/**
* 日期时间工具类
*/
object DateUtils {
val OLD_FORMAT = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")
val TARGET_FORMAT = FastDateFormat.getInstance("yyyyMMddHHmmss")
def getTime(time: String) = {
OLD_FORMAT.parse(time).getTime
}
def parseToMinute(time: String) = {
TARGET_FORMAT.format(new Date(getTime(time)))
}
def main(args: Array[String]): Unit = {
println(parseToMinute("2018-9-6 13:58:01"))
}
}
HBaseUtils.java
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
/**
* HBase操作工具类,Java工具类建议采用单例模式封装
*/
public class HBaseUtils {
HBaseAdmin admin = null;
Configuration configuration = null;
/**
* 私有构造方法
*/
private HBaseUtils() {
configuration = new Configuration();
configuration.set("hbase.zookeeper.quorum", "localhost:2181");
configuration.set("hbase.rootdir", "hdfs://localhost:8020/hbase");
try {
admin = new HBaseAdmin(configuration);
} catch (IOException e) {
e.printStackTrace();
}
}
private static HBaseUtils instance = null;
public static synchronized HBaseUtils getInstance() {
if (null == instance) {
instance = new HBaseUtils();
}
return instance;
}
/**
* 根据表名获取到HTable实例
*
* @param tableName
* @return
*/
public HTable getTable(String tableName) {
HTable table = null;
try {
table = new HTable(configuration, tableName);
} catch (IOException e) {
e.printStackTrace();
}
return table;
}
/**
* 添加一条记录到表中
*
* @param tableName
* @param rowkey
* @param cf
* @param column
* @param value
*/
public void put(String tableName, String rowkey, String cf, String column, String value) {
HTable table = getTable(tableName);
Put put = new Put(Bytes.toBytes(rowkey));
put.add(Bytes.toBytes(cf), Bytes.toBytes(column), Bytes.toBytes(value));
try {
table.put(put);
} catch (IOException e) {
e.printStackTrace();
}
}
}
4)数据库操作
CourseSearchClickCountDAO.scala
import com.lihaogn.spark.project.utils.HBaseUtils
import com.lihaogn.sparkProject.domain.{CourseClickCount, CourseSearchClickCount}
import org.apache.hadoop.hbase.client.Get
import org.apache.hadoop.hbase.util.Bytes
import scala.collection.mutable.ListBuffer
/**
* 数据访问层,从搜索引擎过来的课程点击数
*/
object CourseSearchClickCountDAO {
val tableName = "course_search_clickcount"
val cf = "info"
val qualifer = "click_count"
/**
* 保存数据到HBase
*
* @param list
*/
def save(list: ListBuffer[CourseSearchClickCount]): Unit = {
val table = HBaseUtils.getInstance().getTable(tableName)
for (ele <- list) {
table.incrementColumnValue(Bytes.toBytes(ele.day_search_course),
Bytes.toBytes(cf),
Bytes.toBytes(qualifer),
ele.click_count)
}
}
/**
* 根据rowkey查询值
*
* @param day_search_course
* @return
*/
def count(day_search_course: String): Long = {
val table = HBaseUtils.getInstance().getTable(tableName)
val get = new Get(Bytes.toBytes(day_search_course))
val value = table.get(get).getValue(cf.getBytes, qualifer.getBytes)
if (value == null) {
0L
} else
Bytes.toLong(value)
}
}
5)主类
import com.imooc.spark.demain.{ClickLog, CourseSearchClickCount}
import com.imooc.spark.dto.CourseSearchClickCountDAO
import com.imooc.spark.util.DateUtils
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
/**
* description
*
* @author zhiying.dong@hand-china.com 2019/05/24 16:54
*/
object SparkStreamingApp {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("DirectKafka")
.setMaster("local[2]")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val ssc = new StreamingContext(conf, Seconds(2))
val topicsSet = Array("kafka_spark")
val kafkaParams = mutable.HashMap[String, String]()
//必须添加以下参数,否则会报错
kafkaParams.put("bootstrap.servers", "192.168.30.131:9092")
kafkaParams.put("group.id", "group1")
kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
val messages = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams
)
)
// 步骤一:测试数据接收
val logs = messages.map(_.value)
// 步骤二:数据清洗
val cleanData = logs.map(line => {
val infos = line.split("\t")
val url = infos(2).split(" ")(1)
var courseId = 0
// 获取课程编号
if (url.startsWith("/class")) {
val courseHtml = url.split("/")(2)
courseId = courseHtml.substring(0,courseHtml.lastIndexOf(".")).toInt
}
ClickLog(infos(0), DateUtils.parseToMinute(infos(1)), courseId, infos(3).toInt, infos(4))
}).filter(clicklog => clicklog.courseId != 0)
// 步骤三:统计从搜索引擎过来的从今天开始到现在的课程的访问量
cleanData.map(x=>{
val referer=x.referer.replaceAll("//","/")
val splits=referer.split("/")
var host=""
if(splits.length>2) {
host=splits(1)
}
(host,x.courseId,x.time)
}).filter(_._1!="").map(x=>{
(x._3.substring(0,8)+"_"+x._1+"_"+x._2,1)
}).reduceByKey(_+_).foreachRDD(rdd=>{
rdd.foreachPartition(partitionRecords=>{
val list =new ListBuffer[CourseSearchClickCount]
partitionRecords.foreach(pair=>{
list.append(CourseSearchClickCount(pair._1,pair._2))
})
// 写入数据库
CourseSearchClickCountDAO.save(list)
})
})
// 开始计算
ssc.start()
ssc.awaitTermination()
}
}
6)测试
启动Kafka环境、flume agent、日志生成程序、Hadoop环境、hbase环境、本地Spark Streaming程序。
查看hbase中course_search_clickcount表,出现以下内容说明处理结果成功存储到了hbase
image.png
7)后续操作
通过以上几步已经把日志通过搜索引擎+课程编号为主键,和总点击量存储到了数据库。此时可通过Javaweb结合echars等开源展示框架为企业进行展示,实现可视化效果。
总结
这次实战是学习的慕课网上spark steaming的课程,只是找找Spark Streaming实战的感觉,从搭建环境到日志处理实战,理解了Spark Streaming整套处理的流程。其中很多代码都是照着视频一行一行敲出来的,下来还需要更多的扩展才行。接下来准备学习Scala的知识,在业务逻辑开发这一部分做更多的深入。
网友评论