平台:IDEA 2016
语言:scala , java,python
代码链接:https://pan.baidu.com/s/1o7JMCvo
功能:用python模拟实时数据的生成,用crontab来执行shell脚本生成实时流数据,之后用flume把log数据输出到kafka,在IDEA中通过sparkStreaming时候消费kafka的数据,存放到Hbase中。
1.Python模式数据的生成
import random
import time
url_paths = [
"class/112.html",
"class/128.html",
"class/145.html",
"class/146.html",
"class/131.html",
"class/130.html",
"learn/821",
"course/list"
]
ip_slices = [132,156,124,10,29,167,143,187,30,46,55,63,72,87,98,168]
http_referers=[
"http://www.baidu.com/s?wd={query}",
"https://www.sogou.com/web?query={query}",
"http://cn.bing.com/search?q={query}",
"https://search.yahoo.com/search?p={query}",
]
search_keyword = [
"Spark SQL实战",
"Hadoop基础",
"Storm实战",
"Spark Streaming实战",
"大数据面试"
]
status_codes = ["200","404","500"]
def sample_url():
return random.sample(url_paths,1)[0]
def sample_id():
slice = random.sample(ip_slices,4)
return ".".join(str(item) for item in slice)
def sample_referer():
#一部分没有分流的地址,用“-”来表示,uniform用于在范围内生成随机数
if random.uniform(0,1)>0.2:
return "-"
#sample返回的是一个list,不取出来的话,是带一个[]的
refer_url=random.sample(http_referers,1)
query_url = random.sample(search_keyword,1)
return refer_url[0].format(query=query_url[0])
def sample_status():
return random.sample(status_codes,1)[0]
def generate_log(count=10):
time.str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
while count>=1:
query_log="{ip}\t{local_time}\t\"GET /{url} HTTP/1.1\"\t{status_code}\t{referer}".format(
url = sample_url(),ip = sample_id(),referer=sample_referer(),status_code = sample_status(),
local_time=time.str
)
print(query_log)
count = count -1
if __name__ == "__main__":
generate_log(100)
2.crontable 定时任务的配置
(1)编写log_generator.sh 文件,输入命令:python /home/hadoop/data/project/generate_log.py
(2)输入 crontable -e ,输入命令:*/1 * * * * /home/hadoop/data/project/log_generator.sh
3.flume配置文件及启动flume的命令
(1)flume对接到kafka的配置文件:streaming_project.conf(关于flume的配置可参考官网)
exec-memory-kafka.sources = exec-source
exec-memory-kafka.sinks = kafka-sink
exec-memory-kafka.channels = memory-channel
exec-memory-kafka.sources.exec-source.type = exec
exec-memory-kafka.sources.exec-source.command = tail -F /home/hadoop/data/project/logs/access.log
exec-memory-kafka.sources.exec-source.shell = /bin/sh -c
exec-memory-kafka.channels.memory-channel.type = memory
exec-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
exec-memory-kafka.sinks.kafka-sink.brokerList = hadoop000:9092
exec-memory-kafka.sinks.kafka-sink.topic = streamingtopic
exec-memory-kafka.sinks.kafka-sink.batchSize = 5
exec-memory-kafka.sinks.kafka-sink.requiredAcks = 1
exec-memory-kafka.sources.exec-source.channels = memory-channel
exec-memory-kafka.sinks.kafka-sink.channel = memory-channel
(2)启动flume之前先启动zookeeper和kafka
./zkServer.sh start 和 ./kafka-server-start.sh -daemon /home/hadoop/app/kafka_2.11-0.9.0.0/config/server.properties
(3)启动flume的命令
flume-ng agent --name exec-memory-kafka --conf /home/hadoop/app/apache-flume-1.6.0-cdh5.7.0-bin/conf --conf-file /home/hadoop/data/project/streaming_project2.conf
(4)可启动一个kafka的消费者来测试配置文件
kafka-console-consumer.sh --zookeeper hadoop000:2181 --topic streamingtopic
4.sparkStreaming对kafka消息进行消费
(1)运行在local模式下,先测试下流程是否打通,其中argsw为:hadoop000:2181 test streamingtopic
if(args.length != 4) {
System.err.println("Usage: KafkaReceiverWordCount <zkQuorum> <group> <topics> <numThreads>")
}
val Array(zkQuorum,groupId,topics,numThreads) = args
val sparkConf =new SparkConf().setAppName("ImoocStatStreamingApp").setMaster("local[5]")
val ssc = new StreamingContext(sparkConf,Seconds(60))
val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
val messages = KafkaUtils.createStream(ssc,zkQuorum,groupId,topicMap)
messages.map(_._2).count().print()
ssc.start()
ssc.awaitTermination()
(2)第二步骤:开始清理log日志。其中ClickLog为case class,DataUtils为日期处理类(详情见代码)
此时返回的cleanData为ClickLog对象,包含:ip:String ,time :String,courseId:Int,statusCode:Int, referer:String。
//第二步,开始清洗数据
val log = messages.map(_._2)
val cleanData = log.map(line=>{
val infos = line.split("\t")
//url= /class/128.html"
val url = infos(2).split(" ")(1)
var courseId=0
if(url.startsWith("/class")){
val courseIdHTML = url.split("/")(2)
//把课程为class的分离出来
courseId = courseIdHTML.substring(0,courseIdHTML.lastIndexOf(".")).toInt
}
ClickLog(infos(0), DataUtils.parseToMinute(infos(1)), courseId, infos(3).toInt, infos(4))
}).filter(clicklog => clicklog.courseId != 0) //把那些编号为0 的去掉
(3)第三步骤:统计今天到现在为止课程的访问人数,操作对象还是cleanData,但是case class 为CourseClickCount,同时CourseClickCountDAO为Hbase存储操作
// 测试步骤三:统计今天到现在为止实战课程的访问量
cleanData.map(x =>{
(x.time.substring(0,8)+"_"+x.courseId,1)
}).reduceByKey(_+_).foreachRDD(rdd =>{
rdd.foreachPartition(partitionRecords =>{
val list = new ListBuffer[CourseClickCount]
partitionRecords.foreach(pair =>{
list.append(CourseClickCount(pair._1,pair._2))
})
CourseClickCountDAO.save(list)
})
})
(4)关于hbase表的创建及java配置,分别有: course_clickcount 和course_search_clickcount
然后列族都是info, java 连接的代码如下:
public class HBaseUtils {
HBaseAdmin admin = null;
Configuration configuration = null;
private HBaseUtils(){
configuration = new Configuration();
configuration.set("hbase.zookeeper.quorum", "hadoop000:2181");
configuration.set("hbase.rootdir", "hdfs://hadoop000:8020/hbase");
try {
admin = new HBaseAdmin(configuration);
} catch (IOException e) {
}
}
private static HBaseUtils instance= null;
public static synchronized HBaseUtils getInstance(){
if (instance == null ){
instance = new HBaseUtils();
}
return instance;
}
public HTable getTable(String tableName){
HTable table = null ;
try {
table = new HTable(configuration,tableName);
} catch (IOException e) {
e.printStackTrace();
}
return table;
}
/**
* 添加一条记录到HBase表
* @param tableName HBase表名
* @param rowkey HBase表的rowkey
* @param cf HBase表的columnfamily
* @param column HBase表的列
* @param value 写入HBase表的值
*/
public void put(String tableName, String rowkey,String cf ,String column , String value ){
HTable table = getTable(tableName);
Put put = new Put(Bytes.toBytes(rowkey));
put.add(Bytes.toBytes(cf),Bytes.toBytes(column),Bytes.toBytes(value));
try {
table.put(put);
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
String tableName = "imooc_course_clickcount" ;
String rowkey = "20171111_88";
String cf = "info" ;
String column = "click_count";
String value = "30";
HBaseUtils.getInstance().put(tableName, rowkey, cf, column, value);
}
(5)第四步骤:统计今天为止从搜索引擎过来的数据,其中CourseSearchClickCount为case class ,然后CourseSearchClickCountDAO为保存的操作
// 测试步骤四:统计从搜索引擎过来的今天到现在为止实战课程的访问量
cleanData.map(x=>{
val referer = x.referer.replaceAll("//","/")
val splits = referer.split("/")
var host = ""
if(splits.length>2){
host=splits(1)
}
(host,x.courseId,x.time)
}).filter(_._1 !="").map(x =>{
(x._3.substring(0,8)+"_"+x._1+"_"+x._2,1)
}).reduceByKey(_+_).foreachRDD(rdd =>{
rdd.foreachPartition(partitionRecords =>{
val list = new ListBuffer[CourseSearchClickCount]
partitionRecords.foreach(pair =>{
list.append((CourseSearchClickCount(pair._1,pair._2)))
})
CourseSearchClickCountDAO.save(list)
})
})
5.两部分数据在Hbase中的存储结果


6.把项目进行服务器部署及其遇到问题
(1)先注释掉代码中的setMaster("local[5]"),用spark submit提交的时候指定
(2) mvn clean package -DskipTests 打包命令,会报src /main/scala/ package 没有找到,那是因为Scala和java文件没有关联起来,在pom文件中注释点编译定位文件即可。

(3) 编译之后,在项目的target目录下,可以找到相应的jar文件
(4)用spark提交命令到集群上运行,先看看当前启动的线程。
14914 NameNode
19347 Jps
15491 NodeManager
15049 DataNode
15385 ResourceManager
15225 SecondaryNameNode
10473 Kafka
16137 HRegionServer
15980 HMaster
4798 QuorumPeerMain
16607 Main
启动命令为:spark-submit --master local[5] --class com.lzk.spark.project.spark.ImoocStatStreamingApp
/home/hadoop/lib/sparktrain-1.0.jar hadoop000:2181 test streamingtopic 1
(5)遇到的问题

***即是缺少了package:--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0

添加进去hbase的所有jar --jars $(echo /home/hadoop/app/hbase-1.2.0-cdh5.7.0/lib/*.jar | tr '' ',')
把相关参数加入到spark的命令提交命令中去。
网友评论