首先是画3d直方图,一开始我是打算使用ECharts来的,但后面发现python居然可以画。
1529819848329.png数据是MySQL里面的,弄了一些假数据。使用了ORM
from sqlalchemy import *
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
engine = create_engine('mysql://root:123456@192.168.0.104/student?charset=utf8')
Session = sessionmaker(bind=engine)
sess = Session()
Base = declarative_base()
class UserAction(Base):
__tablename__ = 'user_action'
id = Column(Integer, primary_key=True)
province = Column(String(255))
month = Column(String(255))
number = Column(Integer)
if __name__ == '__main__':
Base.metadata.create_all(engine)
接下来就是画图了
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from plot_orm import UserAction
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
engine = create_engine('mysql://root:123456@192.168.0.104/student?charset=utf8')
Session = sessionmaker(bind=engine)
session = Session()
def gather_provinces():
users = session.query(UserAction)
provinces = [user.province for user in users]
provinces = list(set(provinces))
return provinces
def gather_months():
users = session.query(UserAction)
months = [user.month for user in users]
months = sorted(list(set(months)))
return months
def gather_number(province, month):
user = session.query(UserAction).filter_by(province=province, month=month)
return user[0].number
def plot_3d_bars(x, y):
# chinese config
plt.rcParams['font.sans-serif'] = ['SimHei']
# x--> months, y-->provinces
fake_x = [i for i in range(len(x))]
fake_y = [i for i in range(len(y))]
_xx, _yy = np.meshgrid(x, y)
fake_xx, fake_yy = np.meshgrid(fake_x, fake_y)
# cal data
xs, ys = _xx.ravel(), _yy.ravel()
# print(xs, ys)
fake_xs, fake_ys = fake_xx.ravel(), fake_yy.ravel()
top = [gather_number(ys[i], xs[i]) for i in range(len(xs))]
bottom = np.zeros_like(top)
width = depth = 1
# plot 3d bars
fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')
# print(fake_xs, fake_ys, bottom, width, depth, top)
ax.bar3d(fake_xs, fake_ys, bottom, width, depth, top, shade=True)
ax.set_title('V.Vader')
ax.set_xlabel('month')
ax.set_ylabel('province')
ax.set_zlabel('number of people')
# ax.set_xlim(x)
ax.set_xticks(fake_x)
ax.set_xticklabels(x)
ax.set_yticks(fake_y)
ax.set_yticklabels(y)
plt.show()
def main():
plot_3d_bars(['1', '2', '3', '4'], ['南昌', '北京', '上海', '杭州'])
if __name__ == '__main__':
main()
Figure_1.png
启动zookeeper和Kafka
zkServer.sh start
kafka-server-start.sh /home/hadoop/app/kafka_2.11-0.9.0.0/config/server.properties
1529808467435.png
先是使用Kafka发送数据给userActionLog
import time
import csv
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='192.168.0.106:9092')
csv_file = open('data/user_log.csv', 'r', encoding='utf-8')
reader = csv.reader(csv_file)
count = 0
for line in csv_file:
if count > 0:
info = line.split('\n')[0]
producer.send('userActionLog', value=info.encode('utf-8'))
print(info)
count += 1
time.sleep(1)
1529808722489.png
可以看到数据已经发好了(到Windows下开发的优点是IDE等比较happy,记得修改hosts文件不然会跑不起来)
接下来是写scale程序了,我之前是真的没有学过,所以花了2天学习了Scala,虽所Scala没有写过,上手还蛮简单的。
首先是Kafka的一些配置
package iceberg.kafka;
public class KafkaProperties {
public static final String ZK = "192.168.0.106:2181";
public static final String TOPIC = "userActionLog";
public static final String BROKER_LIST = "192.168.0.106:9092";
public static final String GROUP_ID = "V.Vader";
}
“18-30”岁、(2,3) “30-40”岁、(4, 5)“40-50”岁、6 “50-60”(7,8)岁男女购物人数
用户收藏数、购买数的top10商品,并把结果存储到MySQL数据库中
package iceberg
import java.sql.DriverManager
import java.util.HashMap
import iceberg.kafka.KafkaProperties
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
object UserTop {
def main(args: Array[String]): Unit = {
val kafkaTopic: Map[String, Int] = Map[String, Int](KafkaProperties.TOPIC -> 1)
val conf = new SparkConf().setAppName("KafkaSparkStream").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(6))
val stream = KafkaUtils.createStream(ssc, KafkaProperties.ZK, KafkaProperties.GROUP_ID, kafkaTopic, StorageLevel.MEMORY_ONLY)
val logs = stream.map(_._2)
val info = logs.map(line=>{(line.split(",")(8), line.split(",")(9))})
.map(line=>{
if (line._2.toInt == 0 ) {
(line._1, "gender:female")
}else{
if (line._2.toInt == 1 ) {
(line._1, "gender:male")
}else{
(line._1, "gender:others")
}
}
})
//“18-30”岁、(2,3) “30-40”岁、(4, 5)“40-50”岁、6 “50-60”(7,8)岁男女购物人数
val info2 = info.map(line =>{
if (line._1.toInt >= 2 ) {
if (line._1.toInt <=3){
(line._2, "age:18-30")
}else{
if (line._1.toInt <=5){
(line._2, "age:30-40")
}else{
if (line._1.toInt <=6){
(line._2, "age:40-50")
}else{
if (line._1.toInt <=8){
(line._2, "age:50-60")
}
}
}
}
}
else {(line._2, "age:others")}
})
val result = info2.map((_, 1)).reduceByKey(_+_)
//发送数据给kafka
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.106:9092")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
result.foreachRDD(rdd=>{
rdd.foreachPartition(partitionOfRecoeds=>{
val producer = new KafkaProducer[String, String](props)
partitionOfRecoeds.foreach(pair=>{
val str = pair._1.toString + pair._2.toString
val message = new ProducerRecord[String, String]("result", null, str)
producer.send(message)
})
})
})
// (2)用户收藏数、购买数的top10商品,并把结果存储到mysql数据库中
val task2 = logs.map(line=>{(line.split(",")(2), line.split(",")(7))})
val collection = task2.filter(_._2=="3")
val buy = task2.filter(_._2=="2")
buy.map(collect => (collect._1, 1)).reduceByKey(_ + _).foreachRDD(rdd =>{
val connection = createConnection()
rdd.sortBy(_._2, false).take(10).foreach(pair=>{
println(pair)
val sql = "insert into top(cat, num) values ('"+ pair._1.toString +"' , '"+pair._2.toString+ "')"
connection.createStatement().execute(sql)
println("execute sql")
})
}
)
collection.map(collect => (collect._1, 1)).reduceByKey(_ + _).foreachRDD(rdd =>{
val connection = createConnection()
rdd.sortBy(_._2, false).take(10).foreach(pair=>{
println(pair)
val sql = "insert into top(cat, num) values ('"+ pair._1.toString +"' , '"+pair._2.toString+ "')"
connection.createStatement().execute(sql)
println("execute sql")
})
}
)
def createConnection()={
Class.forName("com.mysql.jdbc.Driver")
DriverManager.getConnection("jdbc:mysql://192.168.0.104:3306/student", "root","123456")
}
ssc.start()
ssc.awaitTermination()
}
}
接下来就是用flask显示数据了
之前一直用Django(其实我用的最多的web框架是spring boot)然后现在用了flask感觉这个flask也是 超级爽的
from flask import Flask, render_template
from flask_socketio import SocketIO
from kafka import KafkaConsumer
import time
app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app)
thread = None
consumer = KafkaConsumer('result', bootstrap_servers='192.168.0.106:9092')
result_list = [{"g0": "0"}, {"g1": "0"}, {"g2": "0"}, {"g3": "0"}, {"b0": "0"}, {"b1": "0"}, {"b2": "0"}, {"b3": "0"}]
def background_thread():
for msg in consumer:
str_msg = msg.value.decode('utf-8')
info = str_msg.split(')')
condition = info[0].split('(')[1]
value = info[1]
if condition == 'gender:male,age:18-30':
result_list[0]['g0'] = value
if condition == 'gender:male,age:30-40':
result_list[1]['g1'] = value
if condition == 'gender:male,age:40-50':
result_list[2]['g2'] = value
if condition == 'gender:male,age:50-60':
result_list[3]['g3'] = value
if condition == 'gender:female,age:18-30':
result_list[4]['b0'] = value
if condition == 'gender:female,age:30-40':
result_list[5]['b1'] = value
if condition == 'gender:female,age:40-50':
result_list[6]['b2'] = value
if condition == 'gender:female,age:50-60':
result_list[7]['b3'] = value
print('test_message', result_list)
socketio.emit('test_message', {'data': result_list})
time.sleep(1)
@socketio.on('test_connect')
def connect(message):
print('message', message)
global thread
if thread is None:
print('thread is None starting socket_io')
thread = socketio.start_background_task(target=background_thread)
socketio.emit('connected', {'data': 'server connected'})
@app.route('/')
def hello_world():
return render_template("index.html")
if __name__ == '__main__':
socketio.run(app, debug=True)
剩下的就是页面显示了。
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>DashBoard</title>
<script src="static/js/socket.io.js"></script>
<script src="static/js/jquery-3.1.1.min.js"></script>
<script src="static/js/highcharts.js"></script>
<script src="static/js/exporting.js"></script>
<script type="text/javascript" charset="utf-8">
var socket = io.connect('http://' + document.domain + ':' + location.port);
socket.on('connect', function() {
socket.send('user haas connected!')
socket.emit('test_connect', {data: 'I\'m connected!'});
});
socket.on('connected',function(data){
console.log(data)
});
socket.on('test_message',function(message){
var obj = eval(message);
var result = obj['data']
$('#g1').html(result[0]['g0'])
$('#g2').html(result[1]['g1'])
$('#g3').html(result[2]['g2'])
$('#g4').html(result[3]['g3'])
$('#b1').html(result[4]['b0'])
$('#b2').html(result[5]['b1'])
$('#b3').html(result[6]['b2'])
$('#b4').html(result[7]['b3'])
});
</script>
</head>
<body>
<div>
<b>Girl:18-30: </b><b id="g1"></b>
<b>Girl:30-40: </b><b id="g2"></b>
<b>Girl:40-50: </b><b id="g3"></b>
<b>Girl:50-60: </b><b id="g4"></b>
<br>
<b>Boy:18-30: </b><b id="b1"></b>
<b>Boy:30-40: </b><b id="b2"></b>
<b>Boy:40-50: </b><b id="b3"></b>
<b>Boy:50-60: </b><b id="b4"></b>
</div>
<div id="container" style="width: 600px;height:400px;"></div>
<script type="text/javascript">
$(document).ready(function () {
Highcharts.setOptions({
global: {
useUTC: false
}
});
Highcharts.chart('container', {
chart: {
type: 'spline',
animation: Highcharts.svg, // don't animate in old IE
marginRight: 10,
events: {
load: function () {
// set up the updating of the chart each second
var series1 = this.series[0];
var series2 = this.series[1];
var series3 = this.series[2];
var series4 = this.series[3];
var series5 = this.series[4];
var series6 = this.series[5];
var series7 = this.series[6];
var series8 = this.series[7];
setInterval(function () {
var x = (new Date()).getTime(), // current time
count1 = $('#g1').text();
y = parseInt(count1);
series1.addPoint([x, y], true, true);
count2 = $('#g2').text();
z = parseInt(count2);
series2.addPoint([x, z], true, true);
count3 = $('#g3').text();
z1 = parseInt(count3);
series3.addPoint([x, z1], true, true);
count4 = $('#g4').text();
z2 = parseInt(count4);
series4.addPoint([x, z2], true, true);
count5 = $('#b1').text();
z5 = parseInt(count5);
series5.addPoint([x, z5], true, true);
count6 = $('#b2').text();
z6 = parseInt(count6);
series6.addPoint([x, z6], true, true);
count7 = $('#b3').text();
z7 = parseInt(count7);
series7.addPoint([x, z7], true, true);
count8 = $('#b4').text();
z8 = parseInt(count8);
series8.addPoint([x, z8], true, true);
}, 1000);
}
}
},
title: {
text: '男女生购物人数实时分析'
},
xAxis: {
type: 'datetime',
tickPixelInterval: 50
},
yAxis: {
title: {
text: '数量'
},
plotLines: [{
value: 0,
width: 1,
color: '#808080'
}]
},
tooltip: {
formatter: function () {
return '<b>' + this.series.name + '</b><br/>' +
Highcharts.dateFormat('%Y-%m-%d %H:%M:%S', this.x) + '<br/>' +
Highcharts.numberFormat(this.y, 2);
}
},
legend: {
enabled: true
},
exporting: {
enabled: true
},
series: [
{
name: '女生18-30购物人数',
data: (function () {
// generate an array of random data
var data = [],
time = (new Date()).getTime(),
i;
for (i = -19; i <= 0; i += 1) {
data.push({
x: time + i * 1000,
y: Math.random()
});
}
return data;
}())
}
,
{
name: '女生30-40购物人数',
data: (function () {
// generate an array of random data
var data = [],
time = (new Date()).getTime(),
i;
for (i = -19; i <= 0; i += 1) {
data.push({
x: time + i * 1000,
y: Math.random()
});
}
return data;
}())
}
,
{
name: '女生40-50购物人数',
data: (function () {
// generate an array of random data
var data = [],
time = (new Date()).getTime(),
i;
for (i = -19; i <= 0; i += 1) {
data.push({
x: time + i * 1000,
y: Math.random()
});
}
return data;
}())
}
,
{
name: '女生50-60购物人数',
data: (function () {
// generate an array of random data
var data = [],
time = (new Date()).getTime(),
i;
for (i = -19; i <= 0; i += 1) {
data.push({
x: time + i * 1000,
y: Math.random()
});
}
return data;
}())
},
{
name: '男生18-30购物人数',
data: (function () {
// generate an array of random data
var data = [],
time = (new Date()).getTime(),
i;
for (i = -19; i <= 0; i += 1) {
data.push({
x: time + i * 1000,
y: Math.random()
});
}
return data;
}())
}
,
{
name: '男生30-40购物人数',
data: (function () {
// generate an array of random data
var data = [],
time = (new Date()).getTime(),
i;
for (i = -19; i <= 0; i += 1) {
data.push({
x: time + i * 1000,
y: Math.random()
});
}
return data;
}())
}
,
{
name: '男生40-50购物人数',
data: (function () {
// generate an array of random data
var data = [],
time = (new Date()).getTime(),
i;
for (i = -19; i <= 0; i += 1) {
data.push({
x: time + i * 1000,
y: Math.random()
});
}
return data;
}())
}
,
{
name: '男生50-60购物人数',
data: (function () {
// generate an array of random data
var data = [],
time = (new Date()).getTime(),
i;
for (i = -19; i <= 0; i += 1) {
data.push({
x: time + i * 1000,
y: Math.random()
});
}
return data;
}())
}
,
]
});
});
</script>
</body>
</html>
1529809637345.png
这样socket就可以把数据传出去了,看起来还是蛮有意思的,所以接下来打算使用Hbase,redis,python,tensorflow来写一个实时处理股票并显示的系统前台页面使用ECharts,期待自己的表现。(超级期待)
1529809692848.png 1529809823927.png对啦下面是可以选择的,这样就可以筛选多个信息了
网友评论