美文网首页
工作中的一个storm实时流计算需求案例

工作中的一个storm实时流计算需求案例

作者: 先生_吕 | 来源:发表于2017-06-30 17:48 被阅读750次

    开门见山,先说一说业务需求背景吧。两个月前在老大的要求下着手公司的一个storm实时流计算需求,简单的说,就是把C终端发来的数据进行业务处理,在kafka和storm中来回的进行计算,把最终的结果数据持久化到redis中作为数据源供后端调用。作为技术小白当时的心情是崩溃的(),除了之前对各个组件有一些简单的了解外,整体还停留在hello world层面,各组件间的整合对本小白来说难度也就不言而喻了。没有相关的知识储备,面对这个需求还真有点不知所措,尽管只是简单的算数运算,但是小白也表示压力山大。没办法,硬着头皮上吧,看了看相关的视频资料,总算在师父的指点下,磕磕绊绊,跑通了整个流程,虽然计算的准确性还有问题,小白表示收获还是蛮大的。

    废话少说,直奔正题。

    【相关框架】
    业务主要涉及的技术不多,大体符合常规的实时流计算架构模型,strom + kafka + redis,所以需要先在本机环境搭建这几个环境,另外,zookeeper环境也必不可少(kafka的消息主题都存放在zookeeper中)

    【运行环境】

    jdk1.7
    zookeeper-3.4.5
    storm-0.9.2
    kafka-2.9
    redis-3.2.3
    

    【架构】
    先借用一张最常见的strom实时流分析通用模型设计


    strom流分析通用架构.jpg

    【本业务模型】

    业务模型.png

    【数据流向】

    数据流向.png

    这里值得注意的是,常规设计一般均在strom处理完数据后直接从bolt将数据发送给redis持久层,之所以本业务没有直接从bolt流向redis而是分主题转向kafka,再从kafka分主题发送给redis是因为数据源发送数据的频率太高(大概3-5次/s),老大说redis会承受不住,测试时已验证,具体原因可能跟redis的读取频率有关,这里是单节点,如果是redis集群环境会更好点儿。至于模型设计是灵活的,根据具体业务酌情考虑,没有规定非得按某个模型来,符合实际业务即可。

    先奉上一些模拟数据吧,这里由于是local环境,所以LZ提前在本地收录了一些模拟数据存放在TXT文件中,利用kafka逐行读取数据并发送至指定topic来模拟数据源。数据格式如下,每一条消息有27个字段,每个字段代表不同的含义。
    【模拟数据】

    16 91 16777216 0 17 6 7 15 41 46 535 6.158485 1.813451 0.000000 -1068 8496 13572 28 276 0 1597 100 0 1496821304 436028
    16 91 16777216 0 17 6 7 15 41 46 785 5.917774 1.716683 0.000000 -1064 8392 13544 109 254 -19 1598 100 0 1496821304 686031
    16 91 16777216 0 17 6 7 15 41 47 35 5.090148 1.145671 0.000000 0 0 0 0 0 0 0 0 0 1496821304 935960
    16 91 16777216 0 17 6 7 15 41 47 285 6.013670 1.660154 0.000000 -1096 8432 13700 72 314 12 1599 100 0 1496821305 186008
    16 91 16777216 0 17 6 7 15 41 47 535 5.637641 1.650586 0.000000 -1080 8468 13468 23 373 -10 1600 100 0 1496821305 436129
    

    【项目结构】

    2017-06-30_173218.png

    【代码】

    【pom.xml】注意:引入的jar版本要与Linux下安装的版本保持一致

    <dependencies>
        <!-- storm相关包 -->
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>0.9.2</version>
        </dependency>
        <dependency>
               <groupId>org.apache.storm</groupId>
               <artifactId>storm-kafka</artifactId>
               <version>0.9.2</version>
        </dependency>
    
        <!-- kafka相关包 -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka</artifactId>
            <version>2.9.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    
    <!-- redis相关包 -->
           <dependency>
               <groupId>redis.clients</groupId>
               <artifactId>jedis</artifactId>
               <version>2.8.1</version>
           </dependency>
           <dependency>
               <groupId>org.apache.commons</groupId>
               <artifactId>commons-pool2</artifactId>
               <version>3.2.3</version>
           </dependency>
       </dependencies>
    

    启动虚拟机环境(redis,zookeeper,kafka)

    #启动redis服务
    ./redis-server ./redis.conf
    
    #启动zookeeper服务
    ./zkServer.sh start
    
    #启动kafka服务
    ./kafka-server-start.sh -daemon ../config/server.properties
    
    #jps检查进程
    
    #开启总数据消费(这里主题设为:htb_position_test)
    ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic htb_position_test --from-beginning
    
    2017-06-30_163546.png

    【redadTxt_KafkaProduce.java】此类读取指定文件并发送消息到对应主题,模拟数据源

    /**
     * kafka生产者类
     * @author lvfang
     */
    public class redadTxt_KafkaProduce extends Thread {
    
        private String topic;//主题
        private String src;//数据源
        
        public redadTxt_KafkaProduce(String topic){  
            this.topic = topic;        
        } 
        
        public redadTxt_KafkaProduce(String topic,String src){  
            this.topic = topic; 
            this.src = src;
        }
        
        //创建生产者
        private Producer createProducer(){
            Properties properties = new Properties();
            //zookeeper单节点
            properties.put("zookeeper.connect","192.168.1.201:2181");
            properties.put("serializer.class", StringEncoder.class.getName());  
    
            //kafka单节点
            properties.put("metadata.broker.list", "192.168.1.201:9092");
            properties.put("advertised.host.name", "192.168.1.201");
            return new Producer<Integer, String>(new ProducerConfig(properties)); 
        }
        
        @Override
        public void run() {
            BufferedReader br = null;
            try {
                br = new BufferedReader(new FileReader(src));
                // 创建生产者
                Producer producer = createProducer();
    
                String line = null;
                // 循环发送消息到kafka
                while ((line = br.readLine()) != null) {        
                    producer.send(new KeyedMessage<Integer, String>(topic,line + "\n"));
                    
                    // 发送消息的时间间隔,一秒发送3此
                    Thread.sleep(333);
                }
            } catch (Exception e) {
            } finally {
                try {
                    if (br != null) br.close();
                } catch (IOException e) {}
            }
        }
        
        //---------------------主方法----------------------------
        public static void main(String[] args) {
            // 使用kafka集群中创建好的主题 test  
            new redadTxt_KafkaProduce("htb_position_test","D:/testdata/htb_position_test_data.txt").start(); 
        }
    

    启动redadTxt_KafkaProduce线程发送数据,并去linux终端查看,确保数据在发送

    2017-06-30_164402.png

    【HdtasInfo.java】javaBean,数据承载容器类

    /**
     * @author lvfang
     * @create 2017-06-09 13:57
     * @desc 数据容器bean
     **/
    public class HdtasInfo {
        //协议类型
        public static final String PROTOCOL_TYPE = "protocol_type";
        //场地ID
        public static final String FIELD_ID = "field_id";
        //主设备ID
        public static final String UWB_ID = "uwb_id";
        //护腿板ID
        public static final String SIGN_ID = "sign_id";
        // 年 月 日   时 分 秒  毫秒
        public static final String YEAR = "year";
        public static final String MONTH = "month";
        public static final String DAY = "day";
        public static final String HOUR = "hour";
        public static final String MINUTE = "minute";
        public static final String SECOND = "second";
        public static final String MILLISECOND = "millisecond";
        //定位精度  X Y Z
        public static final String X = "x";
        public static final String Y = "y";
        public static final String Z = "z";
        //加速度 X Y Z
        public static final String A_SPEED_X = "a_speed_x";
        public static final String A_SPEED_Y = "a_speed_y";
        public static final String A_SPEED_Z = "a_speed_z";
        //陀螺仪 X Y Z
        public static final String GYROSCOPE_X = "gyroscope_x";
        public static final String GYROSCOPE_Y = "gyroscope_y";
        public static final String GYROSCOPE_Z = "gyroscope_z";
        //心率
        public static final String HEART_RATE = "heart_rate";
        //电池电量
        public static final String ELECTRIC = "electric";
        //电池充电状态  1:充电  0:放电
        public static final String CHARGING_STATUS = "charging_status";
        //Unix时间戳  秒
        public static final String SERVER_ACCEPT_TIME_S = "server_accept_time_s";
        //Unix时间戳  纳秒
        public static final String SERVER_ACCEPT_TIME_N = "server_accept_time_n";
    }
    

    【KafkaUtil.java】由于全过程要多次创建kafka生产消费者,所以单提出工具类

    public class KafkaUtil {
        
        public static final String HDTAS_SPOUT = "hdtasSpout";
        public static final String HDTAS_DATA_BOLT = "dataBolt";
        public static final String HDTAS_SPEED_BOLT = "hdtas_speed_bolt";
        public static final String HDTAS_AGILE_BOLT = "hdtas_agile_bolt";
        public static final String HDTAS_BATTERY_BOLT = "hdtas_battery_bolt";
        public static final String HDTAS_DISTANCE_BOLT = "hdtas_distance_bolt";
        public static final String HDTAS_HEARTRATE_BOLT = "hdtas_heartrate_bolt";
        public static final String HDTAS_POSITION_BOLT = "hdtas_psoition_bolt";
        
        public static final String HDTAS_SPEED_GROOPID = "hdtas_speed_groopId";
        public static final String HDTAS_AGILE_GROOPID = "hdtas_agile_groopId";
        public static final String HDTAS_BATTERY_GROOPID = "hdtas_battery_groopId";
        public static final String HDTAS_DISTANCE_GROOPID = "hdtas_distance_groopId";
        public static final String HDTAS_HEARTRATE_GROOPID = "hdtas_heartrate_groopId";
        public static final String HDTAS_POSITION_GROOPID = "hdtas_position_groopId";
    
        // 创建生产者
        public static Producer<Integer, String> createProducer() {
            Properties properties = new Properties();
            // zookeeper单节点
            properties.put("zookeeper.connect", "192.168.1.201:2181");
            properties.put("serializer.class", StringEncoder.class.getName());
    
            // kafka单节点
            properties.put("metadata.broker.list", "192.168.1.201:9092");
            
            //不设置可能会报错:kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
            properties.put("advertised.host.name", "192.168.1.201");
            return new Producer<Integer, String>(new ProducerConfig(properties));
        }
    
        // 创建消费者
        public static ConsumerConnector createConsumer(String groupId) {
            Properties properties = new Properties();
            // 声明zookeeper集群链接地址
            properties.put("zookeeper.connect", "192.168.1.201:2181");
            
            // 必须要使用别的组名称, 如果生产者和消费者都在同一组,则不能访问同一组内的topic数据
            properties.put("group.id", groupId);
            return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
        }
    }
    

    【RedisUtil.java】redis数据源工具类

    public class RedisUtil {
    
    private static JedisPool pool = null;
        
      /**
       * @author lvfang
       * @create 2017-06-09 13:57
       * @desc 数据容器bean
       * @param ip
       * @param port
       * @return JedisPool
       **/
        public static JedisPool getPool() {
            if (pool == null) {
                JedisPoolConfig config = new JedisPoolConfig();
                //控制一个pool可分配多少个jedis实例,通过pool.getResource()来获取;
                //如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个jedis实例,则此时pool的状态为exhausted(耗尽)。
                config.setMaxActive(500);
                //控制一个pool最多有多少个状态为idle(空闲的)的jedis实例。
                config.setMaxIdle(5);
                //表示当borrow(引入)一个jedis实例时,最大的等待时间,如果超过等待时间,则直接抛出JedisConnectionException;
                config.setMaxWait(1000 * 100);
                //在borrow一个jedis实例时,是否提前进行validate操作;如果为true,则得到的jedis实例均是可用的;
                config.setTestOnBorrow(true);
                
                //public JedisPool(final ConfigpoolConfig, final String host, int port, int timeout, final String password, final int database)
                //public JedisPool(final ConfigpoolConfig, final String host, final int port, final int timeout)
                pool = new JedisPool(config, "reids主机IP", 端口,20000,"密码",0);
                
            }
            return pool;
        }
    
        /**
         * 返还到连接池
         * 
         * @param pool 
         * @param redis
         */
        public static void returnResource(JedisPool pool, Jedis redis) {
            if (redis != null) {
                pool.returnResourceObject(redis);
            }
        }
        
        public static void main(String[] args) {
            System.out.println(RedisUtil.getPool().getResource().ping());
        }
    }
    

    【Constant.java】特殊符号工具类

    public class Constant {
    
        public static final char CHAR_BAR = '|';
        public static final char CHAR_SLASH = '/';
        public static final char CHAR_MINUS = '-';
        public static final char CHAR_TAB = '\t';
        public static final char CHAR_COMMA = ',';
        public static final char CHAR_UNDERLINE = '_';
        public static final char CHAR_CURLY_BRACKETS_LEFT = '{';
        public static final char CHAR_COLON = ':';
        public static final String STR_TAB = "\\t";
        public static final String STR_COMMA = ",";
        public static final String STR_EMPTY = "";
        public static final String STR_UNDERLINE = "_";
    
        public static final String TXT = ".txt";
    
        public static final String PLATFORM_TYPE_WEBSITE = "0";
        public static final String PLATFORM_TYPE_ANDROID = "1";
        public static final String PLATFORM_TYPE_IPHONE = "2";
    
        public static final int DEFAULT_NUM_WORKERS = 2;
        public static final int DEFAULT_BOLT_PARALLELISM_HINT = 1;
        public static final int DEFAULT_SPOUT_PARALLELISM_HINT = 1;
        public static final int DEFAULT_KAFKA_TOPICTHREAD_CAPACITY = 1;
    
        public static final int REDIS_KEY_EXPIRE_3_MONTH = 90 * 24 * 60 * 60;
    }
    
    

    【KafkaSpoutMain.java】此类是kafka与storm的整合,它即是kafka消息的消费者又是strom数据源的数据生产者,其从对应的topic接收消息,并作为storm数据的数据源。

    /**
     * @author lvfang
     * @create 2017-06-09 13:57
     * @desc kafka整合storm 主程序入口
     **/
    public class KafkaSpoutMain {
        
        // 主题与zk端口
        public static final String TOPIC = "htb_position_test";
        public static final String ZKINFO = "192.168.1.201:2181";
    
        private static final String HDTAS_SPOUT = "hdtasSpout";
        private static final String HDTAS_DATA_BOLT = "dataBolt";
        private static final String HDTAS_SPEED_BOLT = "hdtas_speed_bolt";
        private static final String HDTAS_AGILE_BOLT = "hdtas_agile_bolt";
        private static final String HDTAS_BATTERY_BOLT = "hdtas_battery_bolt";
        private static final String HDTAS_DISTANCE_BOLT = "hdtas_distance_bolt";
        private static final String HDTAS_HEARTRATE_BOLT = "hdtas_heartrate_bolt";
        private static final String HDTAS_POSITION_BOLT = "hdtas_psoition_bolt";
    
        public static void main(String[] args) {
            TopologyBuilder topologyBuilder = new TopologyBuilder();
            //创建zk主机
            ZkHosts zkHosts = new ZkHosts(ZKINFO);
            //创建spout
            SpoutConfig spoutConfig = new SpoutConfig(zkHosts, TOPIC, "","KafkaSpout");
            //整合kafkaSpout
            KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
    
            //设置storm数据源为kafka整合storm的kafkaSpout
            topologyBuilder.setSpout(HDTAS_SPOUT, kafkaSpout, 1);
            //流向dataBolt进行空格分割处理(总处理,同时分发给多个bolt)
            topologyBuilder.setBolt(HDTAS_DATA_BOLT, new DataBolt(), 1).shuffleGrouping(HDTAS_SPOUT);
            //灵敏度数据流
            topologyBuilder.setBolt(HDTAS_AGILE_BOLT,new HdtasAgileBolt(HDTAS_AGILE_BOLT),1).fieldsGrouping(HDTAS_DATA_BOLT,new Fields(HdtasInfo.SIGN_ID));
            //速度数据流
            topologyBuilder.setBolt(HDTAS_SPEED_BOLT,new HdtasSpeedBolt(HDTAS_SPEED_BOLT),1).fieldsGrouping(HDTAS_DATA_BOLT,new Fields(HdtasInfo.SIGN_ID));
            //电量数据流
            topologyBuilder.setBolt(HDTAS_BATTERY_BOLT,new HdtasBatteryBolt(HDTAS_BATTERY_BOLT),1).fieldsGrouping(HDTAS_DATA_BOLT,new Fields(HdtasInfo.SIGN_ID));
            //距离数据流
            topologyBuilder.setBolt(HDTAS_DISTANCE_BOLT,new HdtasDistanceBolt(HDTAS_DISTANCE_BOLT),1).fieldsGrouping(HDTAS_DATA_BOLT,new Fields(HdtasInfo.SIGN_ID));
            //心率数据流
            topologyBuilder.setBolt(HDTAS_HEARTRATE_BOLT,new HdtasHeartrateBolt(HDTAS_HEARTRATE_BOLT),1).fieldsGrouping(HDTAS_DATA_BOLT,new Fields(HdtasInfo.SIGN_ID));
            //坐标数据流
            topologyBuilder.setBolt(HDTAS_POSITION_BOLT,new HdtasPositionBolt(HDTAS_POSITION_BOLT),1).fieldsGrouping(HDTAS_DATA_BOLT,new Fields(HdtasInfo.SIGN_ID));
            
            Config config = new Config();
            config.setNumWorkers(1);
    
            if (args.length > 0) {
                try {
                    StormSubmitter.submitTopology(args[0], config,topologyBuilder.createTopology());
                } catch (Exception e) {}
            } else {
                LocalCluster localCluster = new LocalCluster();
                localCluster.submitTopology("HDTAS", config,topologyBuilder.createTopology());
            }
    
        }
    }
    
    2017-06-30_165146.png

    KafkaSpoutMain 类主要编写的数据在各个bolt间的流程逻辑,通过上图不难看出spout接收到数据后先整体发送给DataBolt,DataBolt进行数据切分后在同时发送给各个bolt,各个bolt进行各自的业务处理。组分割字段是userID,进程数酌情设置,单节点就设置1,集群环境设置节点个数

    【DataBolt.java】总bolt,进行数据切分

    /**
     * @author lvfang
     * @create 2017-06-09 13:57
     * @desc 总Bolt,对数据进行分割处理
     **/
    public class DataBolt extends BaseRichBolt {
    
        private OutputCollector collector;
    
        public Map<String,String> map;
    
        /**
         * 业务操作,数据处理(这里进行分割发送)
         * @param tuple
         */
        @Override
        public void execute(Tuple tuple) {
            String string = new String((byte[]) tuple.getValue(0));
    
            String[] datas = string.split(" ");//按空格切分
       
            if(datas.length==25){
                this.collector.emit(new Values(datas[0],datas[1],datas[2],datas[3],datas[4],datas[5],datas[6],datas[7],datas[8],datas[9],
                        datas[10],datas[11],datas[12],datas[13],datas[14],datas[15],datas[16],datas[17],datas[18],datas[9],
                        datas[20],datas[21],datas[22],datas[23],datas[24]));
            }
        }
    
        /**
         * 初始化方法
         * @param map
         * @param topologyContext
         * @param outputCollector
         */
        @Override
        public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
            this.collector = outputCollector;
        }
    
        /**
         * 指定流向,标注流向字段
         * @param declarer
         */
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(
                    new Fields(HdtasInfo.PROTOCOL_TYPE,
                            HdtasInfo.FIELD_ID,
                            HdtasInfo.UWB_ID,
                            HdtasInfo.SIGN_ID,
                            HdtasInfo.YEAR,
                            HdtasInfo.MONTH,
                            HdtasInfo.DAY,
                            HdtasInfo.HOUR,
                            HdtasInfo.MINUTE,
                            HdtasInfo.SECOND,
                            HdtasInfo.MILLISECOND,
                            HdtasInfo.X,
                            HdtasInfo.Y,
                            HdtasInfo.Z,
                            HdtasInfo.A_SPEED_X,
                            HdtasInfo.A_SPEED_Y,
                            HdtasInfo.A_SPEED_Z,
                            HdtasInfo.GYROSCOPE_X,
                            HdtasInfo.GYROSCOPE_Y,
                            HdtasInfo.GYROSCOPE_Z,
                            HdtasInfo.HEART_RATE,
                            HdtasInfo.ELECTRIC,
                            HdtasInfo.CHARGING_STATUS,
                            HdtasInfo.SERVER_ACCEPT_TIME_S,
                            HdtasInfo.SERVER_ACCEPT_TIME_N));
        }
    }
    

    以上DataBolt将数据切分后以字段标示并发出,由各个bolt去自行获取,由于bolt较多,这里就只提供一个bolt的代码,其他bolt等同,只是个业务bolt获取的数据不同,以TLY业务处理为例

    【HdtasAgileBolt.java】需要获取到坐标z,y,z,time,userId,fieldId等数据并进行处理

    /**
     * @author lvfang
     * @create 2017-06-09 13:57
     * @desc 灵敏度Bolt
     **/
    public class HdtasAgileBolt extends BaseRichBolt {
        
        private String topic;
        private StringBuilder sb;
        private Producer<Integer, String> producer;
        private OutputCollector collector;
        
        public HdtasAgileBolt(String topic){
            this.topic = topic;
        }
    
        @Override
        public void prepare(Map config, TopologyContext context, OutputCollector collector) {
            //this.topic = KafkaUtil.HDTAS_AGILE_BOLT;
            producer = new KafkaUtil().createProducer();
            this.collector = collector;
        }
        
    
        @Override
        public void execute(Tuple input) {
            //这里进行灵敏度数据操作
            String userId = input.getStringByField(HdtasInfo.SIGN_ID);
            String fieldId = input.getStringByField(HdtasInfo.FIELD_ID);
            String x = input.getStringByField(HdtasInfo.X);
            String y = input.getStringByField(HdtasInfo.Y);
            String z = input.getStringByField(HdtasInfo.Z);
            String time = input.getStringByField(HdtasInfo.SERVER_ACCEPT_TIME_S);
            
            sb = new StringBuilder();
            sb.append(userId).append(Constant.STR_UNDERLINE)
                .append(fieldId).append(Constant.STR_UNDERLINE)
                .append(x).append(Constant.STR_UNDERLINE)
                .append(y).append(Constant.STR_UNDERLINE)
                .append(z).append(Constant.STR_UNDERLINE)
                .append(time);
    
    //        this.message = userId + "_" + fieldId + "_" + x + "_" + y + "_" + z + "_" + time;      
            
    //发送给指定的kafka主题
            producer.send(new KeyedMessage<Integer, String>(topic,sb.toString()));   
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
          //这里没有流转至下一个bolt,自然不用重写。
        }
    }
    

    要注意的是,实现bolt的方式有好几种,实现IRichBolt,或者继承BaseRichBolt等,这里选用后者,因为后者会自动ACK(老大告诉的),貌似是storm消息保障机制.
    由于需要将bolt处理的数据流转给kafka,所有在上bolt的初始化方法prepare中初始化好了一个kafka-producer,在数据进行业务处理完后,将数据发送给对应的主题,这里当然是TLY主题hdtas_agile_bolt。启动kafkaspout主方法进行数据处理,我们可以去虚拟机终端查看,也可以自行写kafka-customer去消费对应topic,小白是从Linux终端查看的

    ./kafka-console-consumer.sh --zookeeper localhost:2181 --topic hdtas_agile_bolt --from-beginning
    
    2017-06-30_171501.png

    其他bolt等同,这时候该书写TLY的主题消费类,消费后持久化

    【HdtasAgileCusumer.java】此类主要消费hdtas_agile_bolt主题数据,并进行持久化,这里注意在存储数据到redis中要设置过去时间,由于redis的数据持久化特性,如果不设过去时间,会造成存储数据文件过大

    /**
     * kafka消费者类
     * @author lvfang
     *
     */
    public class HdtasAgileCusumer extends Thread {
    
        private String topic;//主题
        private static JedisPool pool;
        private Jedis jedis;
        private String[] messages;
        private String key;
        private String field;
        private String value;
        
        static {
            pool = RedisUtil.getPool();
        }
        
        public HdtasAgileCusumer(String topic){  
            super();  
            this.topic = topic;  
        } 
        
        @Override
        public void run() {
            //创建消费者
            ConsumerConnector consumer = KafkaUtil.createConsumer(KafkaUtil.HDTAS_AGILE_GROOPID);//createConsumer();  
            //主题数map
            Map<String, Integer> topicCountMap = new HashMap<>();
            // 一次从topic主题中获取一个数据 
            topicCountMap.put(topic, 1);
            //创建一个获取消息的消息流
            Map<String,List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);
            // 获取每次接收topic主题到的这个数据  
            KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);
            ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
            
            try {
                jedis = pool.getResource();
                //循环打印
                while (iterator.hasNext()) {
                    String message = new String(iterator.next().message()); 
                    
                    System.out.println("接收到: " + message);
                    //接收到的数据格式:userId + "_" + fieldId + "_" + x + "_" + y + "_" + z + "_" + time;
                    messages = message.split("_");
                    
                    if(messages.length == 6){
                        key = "hiseeHTLY_" + messages[1];
                        field = messages[0] + "_" + messages[5];
                        value = messages[2] + "_" + messages[3] + "_" + messages[4];
                        
                        jedis.hset(key, field, value);
                        jedis.expire(key, 60);
                    }           
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                RedisUtil.returnResource(pool, jedis);
            }
        }
            
        public static void main(String[] args) {
            // 使用kafka集群中创建好的主题 test 
            new HdtasAgileCusumer(KafkaUtil.HDTAS_AGILE_BOLT).start();  
        }
    }
    

    这时我们可以启动HDTAS_AGILE_BOLT的消费类,并去redis查看是否持久化成功

    2017-06-30_172531.png 2017-06-30_172709.png

    我们可以看到数据持久化成功,其他主题消费等同,整个数据流向很简单,只过了两次bolt,

    来一张全图:

    2017-06-30_174438.png 2017-06-30_174618.png

    流计算真心属于入门水准,欢迎拍砖。
    最后真心感谢师父的指点 ! ! !

    相关文章

      网友评论

          本文标题:工作中的一个storm实时流计算需求案例

          本文链接:https://www.haomeiwen.com/subject/zcticxtx.html