flume+kafka+SparkStreaming+mysql

作者: __元昊__ | 来源:发表于2019-03-08 17:28 被阅读61次

    一、概述

    本篇文章主要介绍如何使用SparkStreaming + flume + Kafka 实现实时数据的计算,并且使用高德地图API实现热力图的展示。

    背景知识:

    在有些场合,我们需要了解当前人口的流动情况,比如,需要实时监控一些旅游景点旅客的密集程度,这时可以使用GPS定位系统将该区域内旅客的IP数据进行计算,但是GPS定位系统也有一定的缺点,不是每个旅客都会GPS功能,这时可以使用“信令”来获取个人定位信息。所谓“信令”就是每个手机会不是的向附近最近的基站发送定位信息,除非手机关机。相信每个人在做车旅游的时候每经过一个地方都会受到某个地区的短信,“某某城市欢迎你的来访”等信息,移动电信应用就是利用“信令”来监控每个的定位信息。(同时也可以看出大数据下个人隐私很难受到保护)。

    1. 项目架构

    image
    其中Stronm部分以及被SparkStreaming取代了,改用SparkStreaming。

    在这里我们使用了 flume来抽取日志数据,使用 Python 模拟数据。在经过 flume 将数据抽取到 Kafka 中,SparkStreaming 会实时消费数据,然后计算结果实时写入 MySQL数据库中,然后我们可以将结果送到后台应用中使用和可视化展示。

    2. 环境以及软件说明

    • Spark-2.11.8
    • zookeeper-3.4.5
    • flume
    • kafka_2.11-0.9.0.0

    二、实战

    1. 模拟数据

    #coding=UTF-8
    
    import random
    import time
    
    phone=[
        "13869555210",
        "18542360152",
        "15422556663",
        "18852487210",
        "13993584664",
        "18754366522",
        "15222436542",
        "13369568452",
        "13893556666",
        "15366698558"
    ]
    
    location=[
        "116.191031, 39.988585",
        "116.389275, 39.925818",
        "116.287444, 39.810742",
        "116.481707, 39.940089",
        "116.410588, 39.880172",
        "116.394816, 39.91181",
        "116.416002, 39.952917"
    ]
    
    def sample_phone():
        return random.sample(phone,1)[0]
    def sample_location():
        return random.sample(location, 1)[0]
    
    def generator_log(count=10):
        time_str=time.strftime("%Y-%m-%d %H:%M:%S",time.localtime())
        f=open("/opt/log.txt","a+")
        while count>=1:
            query_log="{phone}\t{location}\t{date}".format(phone=sample_phone(),location=sample_location(),date=time_str)
            f.write(query_log+"\n")
         #   print query_log
            count=count-1
    
    if __name__=='__main__':
        generator_log(100)
    

    上述代码复制到pycharm,创一个python文件phoneData.py,把该py文件拷贝进linux系统。

    [root@hadoop03 modules]# python phoneData.py
    

    模拟生成数据。

    2. Flume 配置

    在Flume安装目录下添加配置文件 storm_pro.conf:

    agent.sources = s1                                                                                                                  
    agent.channels = c1                                                                                                                 
    agent.sinks = k1                                                                                                                    
                                                                                                                                          
    agent.sources.s1.type=exec                                                                                                          
    agent.sources.s1.command=tail -F /opt/log.txt                                                                               
    agent.sources.s1.channels=c1                                                                                                        
    agent.channels.c1.type=memory                                                                                                       
    agent.channels.c1.capacity=10000                                                                                                    
    agent.channels.c1.transactionCapacity=100                                                                                           
                                                                                                                                          
    #设置Kafka接收器                                                                                                                    
    agent.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink                                                                          
    #设置Kafka的broker地址和端口号                                                                                                      
    agent.sinks.k1.brokerList=hadoop01:9092,hadoop02:9092,hadoop03:9092                                                                                               
    #设置Kafka的Topic                                                                                                                   
    agent.sinks.k1.topic=storm_kafka                                                                                                     
    #设置序列化方式                                                                                                                     
    agent.sinks.k1.serializer.class=kafka.serializer.StringEncoder                                                                      
    agent.sinks.k1.channel=c1
    

    注意:上面配置中path指定读取数据的文件,可自行创建。topic_id 参数为下文kafka中需要创建的 topic主题。

    启动linux环境

    (1)启动zookeeper,kafka,flume,hdfs

    [root@hadoop03 conf]# zkServer.sh start
    [root@hadoop03 conf]# kafka-server-start.sh /opt/modules/app/kafka/config/server.properties
    [root@hadoop03 modules]# start-dfs.sh
    [root@hadoop03 modules]# bin/flume-ng agent -n agent -c conf -f conf/storm_pro.conf -Dflume.root.logger=INFO,console
    

    (2)创建一个flume里面定义的topic
    代码中的storm_kafka 是/opt/modules/app/flume/conf/中的storm_pro.conf文件里面的agent.sinks.k1.topic

    kafka-topics.sh --create --zookeeper hadoop01:2181 --replication-factor 1 --partitions 1 --topic storm_kafka
    

    (3)开启消费者,消费内容

    kafka-console-consumer.sh --zookeeper hadoop01:2181 -from-beginning --topic storm_kafka
    

    (4)在执行python phoneData.py模拟数据产生,在kafka消费者界面就会接受到新生成的数据
    (5)试验成功后我们停止kafka的消费者,离开hdfs安全模式

    hdfs dfsadmin -safemode leave
    

    我们准备编写SparkStreaming代码,用SparkStreaming来消费处理新生成的数据。

    maven

    <?xml version="1.0" encoding="UTF-8"?>
    
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>com.neusoft</groupId>
      <artifactId>sparkDemo</artifactId>
    
      <version>1.0-SNAPSHOT</version>
    
      <name>sparkDemo</name>
      <!-- FIXME change it to the project's website -->
      <url>http://www.example.com</url>
    
      <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.7</maven.compiler.source>
        <maven.compiler.target>1.7</maven.compiler.target>
        <spark.version>2.2.1</spark.version>
      </properties>
    
      <dependencies>
        <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>4.11</version>
          <scope>test</scope>
        </dependency>
    
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_2.11</artifactId>
          <version>2.2.1</version>
        </dependency>
    
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming_2.11</artifactId>
          <version>${spark.version}</version>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming-kafka_2.11</artifactId>
          <version>1.6.2</version>
        </dependency>
        <dependency>
          <groupId>mysql</groupId>
          <artifactId>mysql-connector-java</artifactId>
          <version>5.1.47</version>
        </dependency>
      </dependencies>
    
      <build>
        <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
          <plugins>
            <!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
            <plugin>
              <artifactId>maven-clean-plugin</artifactId>
              <version>3.1.0</version>
            </plugin>
            <!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
            <plugin>
              <artifactId>maven-resources-plugin</artifactId>
              <version>3.0.2</version>
            </plugin>
            <plugin>
              <artifactId>maven-compiler-plugin</artifactId>
              <version>3.8.0</version>
            </plugin>
            <plugin>
              <artifactId>maven-surefire-plugin</artifactId>
              <version>2.22.1</version>
            </plugin>
            <plugin>
              <artifactId>maven-jar-plugin</artifactId>
              <version>3.0.2</version>
            </plugin>
            <plugin>
              <artifactId>maven-install-plugin</artifactId>
              <version>2.5.2</version>
            </plugin>
            <plugin>
              <artifactId>maven-deploy-plugin</artifactId>
              <version>2.8.2</version>
            </plugin>
            <!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
            <plugin>
              <artifactId>maven-site-plugin</artifactId>
              <version>3.7.1</version>
            </plugin>
            <plugin>
              <artifactId>maven-project-info-reports-plugin</artifactId>
              <version>3.0.0</version>
            </plugin>
          </plugins>
        </pluginManagement>
      </build>
    </project>
    

    3.SparkStreaming程序编写

    import java.sql
    import java.sql.{Connection, DriverManager, PreparedStatement}
    import java.util.Date
    
    import kafka.serializer.StringDecoder
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.kafka._
    import org.apache.spark.SparkConf
    /**
      * Created by Administrator on 2019/3/7.
      */
    object kafka_demo {
      def main(args: Array[String]): Unit = {
        System.setProperty("HADOOP_USER_NAME", "root")
    
        val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount").setMaster("local[2]")
        val ssc = new StreamingContext(sparkConf,Seconds(5))
        ssc.checkpoint("hdfs://192.168.92.134:8020/input")
        //kafka的topic集合,即可以订阅多个topic,args传参的时候用,隔开
        val topicsSet = Set("storm_kafka")
        //设置kafka参数,定义brokers集合
        val kafkaParams = Map[String, String]("metadata.broker.list" -> "192.168.92.134:9092,192.168.92.135:9092,192.168.92.136:9092")
        val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
          ssc, kafkaParams, topicsSet)
        print("---------:" +messages)
    
        val lines = messages.map(_._2)
    
    
        lines.foreachRDD(rdd => {
          //内部函数
          def func(records: Iterator[String]) {
            var conn: sql.Connection = null
            var stmt: sql.PreparedStatement = null
            try {
              val url = "jdbc:mysql://localhost:3306/storm"
              val user = "root"
              val password = "root"  //笔者设置的数据库密码是hadoop,请改成你自己的mysql数据库密码
              conn = DriverManager.getConnection(url, user, password)
              records.foreach(p => {
                val arr = p.split("\\t")
                val phoneno = arr(0)
                val jingwei = arr(1)
                var arrjingwei = jingwei.split(",")
                //wei,jing
                var sql = "insert into location(time,latitude,longitude) values (?,?,?)"
                stmt = conn.prepareStatement(sql);
                stmt.setLong(1, new Date().getTime)
                stmt.setDouble(2,java.lang.Double.parseDouble(arrjingwei(0).trim))
                stmt.setDouble(3,java.lang.Double.parseDouble(arrjingwei(1).trim))
                stmt.executeUpdate()
              })
    
            } catch {
              case e: Exception => e.printStackTrace()
                conn.rollback()
            } finally {
              if (stmt != null) {
                stmt.close()
              }
              if (conn != null) {
                conn.close()
              }
            }
          }
    
          val repartitionedRDD = rdd.repartition(1)
          repartitionedRDD.foreachPartition(func)
        })
    
        ssc.start()
        ssc.awaitTermination()
      }
    }
    

    4. 数据库的设计

    create database storm;
    
    use storm;
    
    create table location(
    time bigint,
    latitude double,
    longitude double
    )charset utf8;
    

    5. 集群的启动

    首先启动kafka(注意:需要启动ZK)。

    启动kafka:

    nohup bin/kafka-server-start.sh config/server.properties &
    
    

    创建topic:

    bin/kafka-topics.sh --create --zookeeper hadoop-senior.shinelon.com:2181 --replication-factor 1 --partitions 1 -- 
        topic storm_kafka
    
    

    注意:topic名称和flume中配置的必须一致。

    启动flume:

    在启动kafka和flume之后就可以启动 Storm,接着可以运行python数据模拟器,就会看到数据库中存入了计算结果:

    image

    三、数据可视化展示

    可视化结果如下图所示:

    image

    前端页面如下:

    <%--
      Created by IntelliJ IDEA.
      User: ttc
      Date: 2018/7/6
      Time: 14:06
      To change this template use File | Settings | File Templates.
    --%>
    <%@ page contentType="text/html;charset=UTF-8" language="java" %>
    <!DOCTYPE html>
    <html lang="en">
    <head>
      <meta charset="UTF-8"/>
      <title>高德地图</title>
      <link rel="stylesheet" href="http://cache.amap.com/lbs/static/main1119.css"/>
    </head>
    <body>
    <script src="https://cdn.bootcss.com/echarts/4.1.0.rc2/echarts.min.js"></script>
    <script src="https://cdn.bootcss.com/jquery/3.3.1/jquery.min.js"></script>
    <script src="http://webapi.amap.com/maps?v=1.4.9&amp;key=d16808eab90b7545923a1c2f4bb659ef"></script>
    <div id="container"></div>
    
    <script>
        var map = new AMap.Map("container", {
            resizeEnable: true,
            center: [116.418261, 39.921984],
            zoom: 11
        });
    
        var heatmap;
        var points =(function a(){  //<![CDATA[
            var city=[];
            $.ajax({
                type:"POST",
                url:"../get_map",
                dataType:'json',
                async:false,        //
                success:function(result){
                    for(var i=0;i<result.length;i++){
                        //alert("调用了");
                        city.push({"lng":result[i].longitude,"lat":result[i].latitude,"count":result[i].count});
                    }
    
                }
            })
            return city;
        })();//]]>
    
    //    var points =[
    //     {"lng":116.191031,"lat":39.988585,"count":1000},
    //     {"lng":116.389275,"lat":39.925818,"count":110},
    //     {"lng":116.287444,"lat":39.810742,"count":1200},
    //     {"lng":116.481707,"lat":39.940089,"count":130},
    //     {"lng":116.410588,"lat":39.880172,"count":140},
    //     {"lng":116.394816,"lat":39.91181,"count":15552},
    //     {"lng":116.416002,"lat":39.952917,"count":16}
    //
    //
    //     ];
    
        map.plugin(["AMap.Heatmap"],function() {      //加载热力图插件
            heatmap = new AMap.Heatmap(map,{
                raduis:50,
                opacity:[0,0.7]
            });    //在地图对象叠加热力图
            heatmap.setDataSet({data:points,max:100}); //设置热力图数据集
            //具体参数见接口文档
        });
    
        // var map = new AMap.Map('container', {
        //    pitch:75, // 地图俯仰角度,有效范围 0 度- 83 度
        //    viewMode:'3D' // 地图模式
        //});
    </script>
    
    </body>
    </html>
    
    

    SpringMvc DAO层代码如下:

    
    package com.neusoft.mapper;
    import java.sql.Connection;
    import java.sql.PreparedStatement;
    import java.sql.ResultSet;
    import java.util.ArrayList;
    
    import java.util.List;
    
    import com.neusoft.util.MysqlUtil;
    import org.springframework.stereotype.Component;
    
    @Component
    public class LocationDao {
    
        private static MysqlUtil mysqlUtil;
    
        public List<Location> map() throws Exception{
            List<Location> list = new ArrayList<Location>();
            Connection connection=null;
            PreparedStatement psmt=null;
            try {
                connection = MysqlUtil.getConnection();
                psmt = connection.prepareStatement("select latitude,longitude,count(*) from location where "
                        + "time>unix_timestamp(date_sub(current_timestamp(),interval 10 minute))*1000 "
                        + "group by longitude,latitude");
                ResultSet resultSet = psmt.executeQuery();
                while (resultSet.next()) {
                    Location location = new Location();
                    location.setLongitude(resultSet.getDouble(1));
                    location.setLatitude(resultSet.getDouble(2));
                    location.setCount(resultSet.getInt(3));
                    list.add(location);
                }
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                MysqlUtil.release();
            }
            return list;
        }
    
    }
    
    

    实体类:

    
    public class Location {
        private Integer count;
        private double latitude;
        private double longitude;
    
        public Integer getCount() {
            return count;
        }
        public void setCount(Integer count) {
            this.count = count;
        }
        public double getLatitude() {
            return latitude;
        }
        public void setLatitude(double latitude) {
            this.latitude = latitude;
        }
        public double getLongitude() {
            return longitude;
        }
        public void setLongitude(double longitude) {
            this.longitude = longitude;
        }
    }
    
    

    工具类:

    package com.neusoft.util;
    
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.PreparedStatement;
    import java.sql.ResultSet;
    
    public class MysqlUtil {
        private static final String DRIVER_NAME="jdbc:mysql://192.168.47.244:3306/storm?user=root&password=root";
        private static Connection connection;
        private static PreparedStatement pstm;
        private static ResultSet resultSet;
    
        public static Connection getConnection(){
            try {
                Class.forName("com.mysql.jdbc.Driver");
                connection=DriverManager.getConnection(DRIVER_NAME);
            }catch (Exception e){
                e.printStackTrace();
            }
            return connection;
        }
        public static void release(){
            try {
                if(resultSet!=null) {
                    resultSet.close();
                }
                if (pstm != null) {
                    pstm.close();
                }
                if(connection!=null){
                    connection.close();
                }
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                if(connection!=null){
                    connection=null;    //help GC
                }
            }
        }
    
    }
    
    

    Controller层:

    package com.neusoft.controller;
    
    import com.alibaba.fastjson.JSON;
    
    import com.neusoft.mapper.LocationDao;
    
    import org.springframework.stereotype.Controller;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.servlet.ModelAndView;
    
    import javax.servlet.http.HttpServletResponse;
    
    /**
     * Created by ttc on 2018/8/7.
     */
    @Controller
    public class HomeController {
    
        @RequestMapping("/")
        public ModelAndView home()
        {
            ModelAndView modelAndView = new ModelAndView();
    
            modelAndView.setViewName("index");
            return modelAndView;
        }
        @RequestMapping("/get_map")
         public void getMap(HttpServletResponse response) throws Exception{
            LocationDao locationDao = new LocationDao();
            String json = JSON.toJSONString(locationDao.map());
            response.getWriter().print(json);
        }
    }
    
    

    # centos6.9安装/升级到python2.7并安装pip

    https://www.cnblogs.com/harrymore/p/9024287.html

    记得同步centos和windows的时间。

    python生成动态数据脚本

    import random
    import os
    import sys
    import time
    import numpy as np
    
    def genertor():
        Point=[random.uniform(123.449169,123.458654),random.uniform(41.740567,41.743705)]
        arr = []
        for i in range(1, random.randint(0, 500)):
            bias = np.random.randn() * pow(10,-4)
            bias = round(bias,4)
            X = Point[0] + bias
            bias1 = np.random.randn() * pow(10,-4)
            bias1 = round(bias,4)
            Y = Point[1] + bias
            time_str=time.strftime("%Y-%m-%d %H:%M:%S",time.localtime())
            arr.append(['13888888888'+'\t',str(X)+',', str(Point[1])+'\t',time_str])
        return arr
    
    if __name__ == '__main__':
        path = sys.argv[1]
        if not os.path.isfile(path):
            open(path, 'w')
        with open(path,'a') as f:
            while True:
                arr = genertor()
                for i in range(len(arr)):
                    f.writelines(arr[i])
                    f.write('\n')
                time.sleep(5)
    
    

    sql改成间隔20秒

      psmt = connection.prepareStatement("select latitude,longitude,count(*) num from location where "
                        + "time>unix_timestamp(date_sub(current_timestamp(),interval 20 second))*1000 "
                        + "group by longitude,latitude");
    
    

    index.jsp改成定时器版

    <%--
      Created by IntelliJ IDEA.
      User: ttc
      Date: 2018/7/6
      Time: 14:06
      To change this template use File | Settings | File Templates.
    --%>
    <%@ page contentType="text/html;charset=UTF-8" language="java" %>
    <!DOCTYPE html>
    <html lang="en">
    <head>
      <meta charset="UTF-8"/>
      <title>高德地图</title>
      <link rel="stylesheet" href="http://cache.amap.com/lbs/static/main1119.css"/>
    </head>
    <body>
    <script src="https://cdn.bootcss.com/echarts/4.1.0.rc2/echarts.min.js"></script>
    <script src="https://cdn.bootcss.com/jquery/3.3.1/jquery.min.js"></script>
    <script src="http://webapi.amap.com/maps?v=1.4.9&amp;key=d16808eab90b7545923a1c2f4bb659ef"></script>
    <div id="container"></div>
    
    <script>
        var map = new AMap.Map("container", {
            resizeEnable: true,
            center: [123.453169, 41.742567],
            zoom: 17
        });
    
        var heatmap;
        map.plugin(["AMap.Heatmap"],function() {      //加载热力图插件
            heatmap = new AMap.Heatmap(map,{
                raduis:50,
                opacity:[0,0.7]
            });    //在地图对象叠加热力图
            //具体参数见接口文档
        });
        setInterval(function (args) {
            var points =(function a(){  //<![CDATA[
                var city=[];
                $.ajax({
                    type:"POST",
                    url:"../get_map",
                    dataType:'json',
                    async:false,        //
                    success:function(result){
                        for(var i=0;i<result.length;i++){
                            //alert("调用了");
                            city.push({"lng":result[i].longitude,"lat":result[i].latitude,"count":result[i].count});
                        }
    
                    }
                })
                return city;
            })();//]]>
            heatmap.setDataSet({data:points,max:100}); //设置热力图数据集
        },1000)
    
        // var map = new AMap.Map('container', {
        //    pitch:75, // 地图俯仰角度,有效范围 0 度- 83 度
        //    viewMode:'3D' // 地图模式
        //});
    </script>
    
    </body>
    </html>
    
    

    相关文章

      网友评论

        本文标题:flume+kafka+SparkStreaming+mysql

        本文链接:https://www.haomeiwen.com/subject/dwxdpqtx.html