美文网首页
流处理实践

流处理实践

作者: VaderWang | 来源:发表于2018-07-13 23:02 被阅读0次

首先是画3d直方图,一开始我是打算使用ECharts来的,但后面发现python居然可以画。

1529819848329.png

数据是MySQL里面的,弄了一些假数据。使用了ORM

from sqlalchemy import *
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base

engine = create_engine('mysql://root:123456@192.168.0.104/student?charset=utf8')
Session = sessionmaker(bind=engine)
sess = Session()
Base = declarative_base()

class UserAction(Base):
    __tablename__ = 'user_action'

    id = Column(Integer, primary_key=True)
    province = Column(String(255))
    month = Column(String(255))
    number = Column(Integer)
        
if __name__ == '__main__':
    Base.metadata.create_all(engine)

接下来就是画图了

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from plot_orm import UserAction
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D


engine = create_engine('mysql://root:123456@192.168.0.104/student?charset=utf8')
Session = sessionmaker(bind=engine)
session = Session()

def gather_provinces():
    users = session.query(UserAction)
    provinces = [user.province for user in users]
    provinces = list(set(provinces))
    return provinces


def gather_months():
    users = session.query(UserAction)
    months = [user.month for user in users]
    months = sorted(list(set(months)))
    return months


def gather_number(province, month):
    user = session.query(UserAction).filter_by(province=province, month=month)
    return user[0].number

def plot_3d_bars(x, y):
    # chinese config
    plt.rcParams['font.sans-serif'] = ['SimHei']

    # x--> months, y-->provinces
    fake_x = [i for i in range(len(x))]
    fake_y = [i for i in range(len(y))]

    _xx, _yy = np.meshgrid(x, y)
    fake_xx, fake_yy = np.meshgrid(fake_x, fake_y)

    # cal data
    xs, ys = _xx.ravel(), _yy.ravel()
    # print(xs, ys)
    fake_xs, fake_ys = fake_xx.ravel(), fake_yy.ravel()

    top = [gather_number(ys[i], xs[i]) for i in range(len(xs))]
    bottom = np.zeros_like(top)
    width = depth = 1

    # plot 3d bars
    fig = plt.figure()
    ax = fig.add_subplot(111, projection='3d')
    # print(fake_xs, fake_ys, bottom, width, depth, top)

    ax.bar3d(fake_xs, fake_ys, bottom, width, depth, top, shade=True)
    ax.set_title('V.Vader')

    ax.set_xlabel('month')
    ax.set_ylabel('province')
    ax.set_zlabel('number of people')
    # ax.set_xlim(x)
    ax.set_xticks(fake_x)
    ax.set_xticklabels(x)
    ax.set_yticks(fake_y)
    ax.set_yticklabels(y)
    plt.show()

def main():
    plot_3d_bars(['1', '2', '3', '4'], ['南昌', '北京', '上海', '杭州'])

if __name__ == '__main__':
    main()
Figure_1.png

启动zookeeper和Kafka

zkServer.sh start

kafka-server-start.sh  /home/hadoop/app/kafka_2.11-0.9.0.0/config/server.properties
1529808467435.png

先是使用Kafka发送数据给userActionLog

import time
import csv
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='192.168.0.106:9092')

csv_file = open('data/user_log.csv', 'r', encoding='utf-8')
reader = csv.reader(csv_file)
count = 0
for line in csv_file:
    if count > 0:
        info = line.split('\n')[0]
        producer.send('userActionLog', value=info.encode('utf-8'))
        print(info)
    count += 1
    time.sleep(1)
1529808722489.png

可以看到数据已经发好了(到Windows下开发的优点是IDE等比较happy,记得修改hosts文件不然会跑不起来)

接下来是写scale程序了,我之前是真的没有学过,所以花了2天学习了Scala,虽所Scala没有写过,上手还蛮简单的。

首先是Kafka的一些配置

package iceberg.kafka;

public class KafkaProperties {

    public static final String ZK = "192.168.0.106:2181";

    public static final String TOPIC = "userActionLog";

    public static final String BROKER_LIST = "192.168.0.106:9092";

    public static final String GROUP_ID = "V.Vader";
}

“18-30”岁、(2,3) “30-40”岁、(4, 5)“40-50”岁、6 “50-60”(7,8)岁男女购物人数

用户收藏数、购买数的top10商品,并把结果存储到MySQL数据库中

package iceberg

import java.sql.DriverManager
import java.util.HashMap

import iceberg.kafka.KafkaProperties
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}


object UserTop {
  def main(args: Array[String]): Unit = {

    val kafkaTopic: Map[String, Int] = Map[String, Int](KafkaProperties.TOPIC -> 1)

    val conf = new SparkConf().setAppName("KafkaSparkStream").setMaster("local[2]")

    val ssc = new StreamingContext(conf, Seconds(6))

    val stream = KafkaUtils.createStream(ssc, KafkaProperties.ZK, KafkaProperties.GROUP_ID, kafkaTopic, StorageLevel.MEMORY_ONLY)

    val logs = stream.map(_._2)

    val info = logs.map(line=>{(line.split(",")(8), line.split(",")(9))})
      .map(line=>{
        if (line._2.toInt == 0 ) {
          (line._1, "gender:female")
        }else{
          if (line._2.toInt == 1 ) {
            (line._1, "gender:male")
          }else{
            (line._1, "gender:others")
          }
        }
      })
    //“18-30”岁、(2,3) “30-40”岁、(4, 5)“40-50”岁、6 “50-60”(7,8)岁男女购物人数
    val info2 = info.map(line =>{
      if (line._1.toInt >= 2 ) {
        if (line._1.toInt <=3){
          (line._2, "age:18-30")
        }else{
          if (line._1.toInt <=5){
            (line._2, "age:30-40")
          }else{
            if (line._1.toInt <=6){
              (line._2, "age:40-50")
            }else{
              if (line._1.toInt <=8){
                (line._2, "age:50-60")
              }
            }
          }
        }
      }
      else {(line._2, "age:others")}
    })

    val result = info2.map((_, 1)).reduceByKey(_+_)
    //发送数据给kafka
    val props = new HashMap[String, Object]()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.106:9092")
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    result.foreachRDD(rdd=>{
      rdd.foreachPartition(partitionOfRecoeds=>{
        val producer = new KafkaProducer[String, String](props)
        partitionOfRecoeds.foreach(pair=>{
          val str = pair._1.toString + pair._2.toString
          val message = new ProducerRecord[String, String]("result", null, str)
          producer.send(message)
        })
      })
    })

    // (2)用户收藏数、购买数的top10商品,并把结果存储到mysql数据库中
    val task2 = logs.map(line=>{(line.split(",")(2), line.split(",")(7))})
    val collection = task2.filter(_._2=="3")
    val buy = task2.filter(_._2=="2")

    buy.map(collect => (collect._1, 1)).reduceByKey(_ + _).foreachRDD(rdd =>{
      val connection = createConnection()
      rdd.sortBy(_._2, false).take(10).foreach(pair=>{
        println(pair)
        val sql = "insert into top(cat, num) values ('"+ pair._1.toString +"' , '"+pair._2.toString+ "')"
        connection.createStatement().execute(sql)
        println("execute sql")
      })
    }
    )
    collection.map(collect => (collect._1, 1)).reduceByKey(_ + _).foreachRDD(rdd =>{
      val connection = createConnection()
      rdd.sortBy(_._2, false).take(10).foreach(pair=>{
        println(pair)
        val sql = "insert into top(cat, num) values ('"+ pair._1.toString +"' , '"+pair._2.toString+ "')"
        connection.createStatement().execute(sql)
        println("execute sql")
      })
    }
    )

    def createConnection()={
      Class.forName("com.mysql.jdbc.Driver")
      DriverManager.getConnection("jdbc:mysql://192.168.0.104:3306/student", "root","123456")
    }

    ssc.start()

    ssc.awaitTermination()
  }
}

接下来就是用flask显示数据了

之前一直用Django(其实我用的最多的web框架是spring boot)然后现在用了flask感觉这个flask也是 超级爽的

from flask import Flask, render_template
from flask_socketio import SocketIO
from kafka import KafkaConsumer
import time


app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app)


thread = None
consumer = KafkaConsumer('result', bootstrap_servers='192.168.0.106:9092')
result_list = [{"g0": "0"}, {"g1": "0"}, {"g2": "0"}, {"g3": "0"}, {"b0": "0"}, {"b1": "0"}, {"b2": "0"}, {"b3": "0"}]


def background_thread():
    for msg in consumer:
        str_msg = msg.value.decode('utf-8')
        info = str_msg.split(')')
        condition = info[0].split('(')[1]
        value = info[1]
        if condition == 'gender:male,age:18-30':
            result_list[0]['g0'] = value
        if condition == 'gender:male,age:30-40':
            result_list[1]['g1'] = value
        if condition == 'gender:male,age:40-50':
            result_list[2]['g2'] = value
        if condition == 'gender:male,age:50-60':
            result_list[3]['g3'] = value
        if condition == 'gender:female,age:18-30':
            result_list[4]['b0'] = value
        if condition == 'gender:female,age:30-40':
            result_list[5]['b1'] = value
        if condition == 'gender:female,age:40-50':
            result_list[6]['b2'] = value
        if condition == 'gender:female,age:50-60':
            result_list[7]['b3'] = value
        print('test_message', result_list)
        socketio.emit('test_message', {'data': result_list})
        time.sleep(1)


@socketio.on('test_connect')
def connect(message):
    print('message', message)
    global thread
    if thread is None:
        print('thread is None starting socket_io')
        thread = socketio.start_background_task(target=background_thread)
    socketio.emit('connected', {'data': 'server connected'})


@app.route('/')
def hello_world():
    return render_template("index.html")


if __name__ == '__main__':
    socketio.run(app, debug=True)

剩下的就是页面显示了。

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>DashBoard</title>
    <script src="static/js/socket.io.js"></script>
    <script src="static/js/jquery-3.1.1.min.js"></script>
    <script src="static/js/highcharts.js"></script>
    <script src="static/js/exporting.js"></script>

    <script type="text/javascript" charset="utf-8">
        var socket = io.connect('http://' + document.domain + ':' + location.port);
        socket.on('connect', function() {
            socket.send('user haas connected!')
            socket.emit('test_connect', {data: 'I\'m connected!'});
        });
        socket.on('connected',function(data){
            console.log(data)
        });

        socket.on('test_message',function(message){
            var obj = eval(message);
            var result = obj['data']

            $('#g1').html(result[0]['g0'])
            $('#g2').html(result[1]['g1'])

            $('#g3').html(result[2]['g2'])
            $('#g4').html(result[3]['g3'])

            $('#b1').html(result[4]['b0'])
            $('#b2').html(result[5]['b1'])

            $('#b3').html(result[6]['b2'])
            $('#b4').html(result[7]['b3'])


        });

    </script>
</head>
<body>
<div>
    <b>Girl:18-30: </b><b id="g1"></b>
    <b>Girl:30-40: </b><b id="g2"></b>
    <b>Girl:40-50: </b><b id="g3"></b>
    <b>Girl:50-60: </b><b id="g4"></b>
    <br>
    <b>Boy:18-30: </b><b id="b1"></b>
    <b>Boy:30-40: </b><b id="b2"></b>
    <b>Boy:40-50: </b><b id="b3"></b>
    <b>Boy:50-60: </b><b id="b4"></b>
</div>
<div id="container" style="width: 600px;height:400px;"></div>

<script type="text/javascript">
    $(document).ready(function () {
    Highcharts.setOptions({
        global: {
            useUTC: false
        }
    });

    Highcharts.chart('container', {
        chart: {
            type: 'spline',
            animation: Highcharts.svg, // don't animate in old IE
            marginRight: 10,
            events: {
                load: function () {

                    // set up the updating of the chart each second
                    var series1 = this.series[0];
                    var series2 = this.series[1];
                    var series3 = this.series[2];
                    var series4 = this.series[3];
                    var series5 = this.series[4];
                    var series6 = this.series[5];
                    var series7 = this.series[6];
                    var series8 = this.series[7];

                    setInterval(function () {

                        var x = (new Date()).getTime(), // current time

                        count1 = $('#g1').text();
                        y = parseInt(count1);
                        series1.addPoint([x, y], true, true);

                        count2 = $('#g2').text();
                        z = parseInt(count2);
                        series2.addPoint([x, z], true, true);

                        count3 = $('#g3').text();
                        z1 = parseInt(count3);
                        series3.addPoint([x, z1], true, true);

                        count4 = $('#g4').text();
                        z2 = parseInt(count4);
                        series4.addPoint([x, z2], true, true);

                        count5 = $('#b1').text();
                        z5 = parseInt(count5);
                        series5.addPoint([x, z5], true, true);

                         count6 = $('#b2').text();
                        z6 = parseInt(count6);
                        series6.addPoint([x, z6], true, true);

                         count7 = $('#b3').text();
                        z7 = parseInt(count7);
                        series7.addPoint([x, z7], true, true);

                         count8 = $('#b4').text();
                        z8 = parseInt(count8);
                        series8.addPoint([x, z8], true, true);

                    }, 1000);
                }
            }
        },
        title: {
            text: '男女生购物人数实时分析'
        },
        xAxis: {
            type: 'datetime',
            tickPixelInterval: 50
        },
        yAxis: {
            title: {
                text: '数量'
            },
            plotLines: [{
                value: 0,
                width: 1,
                color: '#808080'
            }]
        },
        tooltip: {
            formatter: function () {
                return '<b>' + this.series.name + '</b><br/>' +
                    Highcharts.dateFormat('%Y-%m-%d %H:%M:%S', this.x) + '<br/>' +
                    Highcharts.numberFormat(this.y, 2);
            }
        },
        legend: {
            enabled: true
        },
        exporting: {
            enabled: true
        },
        series: [
            {
            name: '女生18-30购物人数',
            data: (function () {
                // generate an array of random data
                var data = [],
                    time = (new Date()).getTime(),
                    i;

                for (i = -19; i <= 0; i += 1) {
                    data.push({
                        x: time + i * 1000,
                        y: Math.random()
                    });
                }
                return data;
            }())
        }
        ,
        {
            name: '女生30-40购物人数',
            data: (function () {
                // generate an array of random data
                var data = [],
                    time = (new Date()).getTime(),
                    i;

                for (i = -19; i <= 0; i += 1) {
                    data.push({
                        x: time + i * 1000,
                        y: Math.random()
                    });
                }
                return data;
            }())
        }
        ,
        {
            name: '女生40-50购物人数',
            data: (function () {
                // generate an array of random data
                var data = [],
                    time = (new Date()).getTime(),
                    i;

                for (i = -19; i <= 0; i += 1) {
                    data.push({
                        x: time + i * 1000,
                        y: Math.random()
                    });
                }
                return data;
            }())
        }
        ,
        {
            name: '女生50-60购物人数',
            data: (function () {
                // generate an array of random data
                var data = [],
                    time = (new Date()).getTime(),
                    i;

                for (i = -19; i <= 0; i += 1) {
                    data.push({
                        x: time + i * 1000,
                        y: Math.random()
                    });
                }
                return data;
            }())
        },
        {
            name: '男生18-30购物人数',
            data: (function () {
                // generate an array of random data
                var data = [],
                    time = (new Date()).getTime(),
                    i;

                for (i = -19; i <= 0; i += 1) {
                    data.push({
                        x: time + i * 1000,
                        y: Math.random()
                    });
                }
                return data;
            }())
        }
        ,
            {
            name: '男生30-40购物人数',
            data: (function () {
                // generate an array of random data
                var data = [],
                    time = (new Date()).getTime(),
                    i;

                for (i = -19; i <= 0; i += 1) {
                    data.push({
                        x: time + i * 1000,
                        y: Math.random()
                    });
                }
                return data;
            }())
        }
        ,
            {
            name: '男生40-50购物人数',
            data: (function () {
                // generate an array of random data
                var data = [],
                    time = (new Date()).getTime(),
                    i;

                for (i = -19; i <= 0; i += 1) {
                    data.push({
                        x: time + i * 1000,
                        y: Math.random()
                    });
                }
                return data;
            }())
        }
        ,
            {
            name: '男生50-60购物人数',
            data: (function () {
                // generate an array of random data
                var data = [],
                    time = (new Date()).getTime(),
                    i;

                for (i = -19; i <= 0; i += 1) {
                    data.push({
                        x: time + i * 1000,
                        y: Math.random()
                    });
                }
                return data;
            }())
        }
        ,
        ]
    });
});
</script>
</body>
</html>
1529809637345.png

这样socket就可以把数据传出去了,看起来还是蛮有意思的,所以接下来打算使用Hbase,redis,python,tensorflow来写一个实时处理股票并显示的系统前台页面使用ECharts,期待自己的表现。(超级期待)

1529809692848.png 1529809823927.png

对啦下面是可以选择的,这样就可以筛选多个信息了

相关文章

  • 流处理实践

    首先是画3d直方图,一开始我是打算使用ECharts来的,但后面发现python居然可以画。 数据是MySQL里面...

  • 基于 Apache Pulsar 和 Apache Spark

    本文介绍了以 Pulsar 做流数据平台,使用 Spark 进行批流一体数据处理的编程实践。 (阅读本文需要约 1...

  • Flink 原理详解

    Flink 是一个流处理框架,支持流处理和批处理,特点是流处理有限,可容错,可扩展,高吞吐,低延迟。 流处理是处理...

  • 处理流、转换流和输入/输出流体系

    1.处理流 处理流的典型思路: 使用处理流包装节点流,程序通过处理流来执行输入输出功能,让节点流与底层IO...

  • Spark Streaming学习(一)流处理框架,Spark运

    流处理框架 Spark Streaming和其他流处理框架的对比 处理模式: 一种是原生流处理(Native)方式...

  • Spark Streaming实时流处理-1.初识实时流处理

    目录 业务现状分析 实时流处理产生背景 实时流处理概述 离线计算与实时计算对比 实时流处理框架对比 实时流处理架构...

  • Spark权威指南读书笔记(五):流处理

    第二十章 流处理基础 什么是流处理 流处理是连续处理新到来的数据以更新计算结果的行为。在流处理中,输入数据是无边界...

  • Storm介绍之概念

    什么是实时流计算? 主要的处理模式可以分为:流处理,批处理 流处理是直接处理,有时也分为在线,离线,近线(st...

  • 流处理

    流处理的主要接口和类主要分为以上三个 package All.D14.Steam_;import java.uti...

  • IO流——处理流

    处理流之一:缓冲流 处理流,就是“套接”在已有的流的基础上。 1. 缓冲流 BufferedInputStream...

网友评论

      本文标题:流处理实践

      本文链接:https://www.haomeiwen.com/subject/ggzjyftx.html