美文网首页
Strom自行整合kafka

Strom自行整合kafka

作者: 先生_吕 | 来源:发表于2017-10-19 11:41 被阅读44次

    前言

    最近由于业务线上环境kafka版本有变,导致与其配套的storm已经redis版本都要随之更新。本来想着将kafka的线上版本改回之前版本,这样所有的东西都不用随之变动,不幸的是老大告知线上kafka版本不能变,所以只能去找对应的storm和redis版本了,(注:线上kafka为2.11.0.11.0),在更新storm时发现还没有与之对应的storm-kafka匹配版本,这就意味着不能用storm提供的storm与kafka的整合包了,所以只能自行手写整合。

    任务

    本次要完成的任务与之前相同,还是走一套完整的storm分析(kafka-storm-kafka-redis),用一个kafka的生产者模拟数据源发给storm,这里要提说的是storm用最传统的开发方式(topology-spout-bolt)

    topology:拓扑(bolt的执行逻辑)
    spout:storm的数据源,这里接收kafka的数据
    bolt:计算任务组件,可以有一个或者多个

    环境

    kafka2.11.0.11.0
    zookeeper3.4.5.0
    storm1.0.1
    redis3.2.11.0

    数据流向

    (1):kafkaProducer生产者发送数据到test主题
    (2):storm从test主题获取数据(spout组件获取)
    (3):spout组件发送给AllDataBolt进行数据处理
    (4):AllDataBolt在execute()方法中进行业务处理并将所处理的数据下发给kafka各个数据主题
    (5):kafka各个数据主题对应的消费者进行数据消费,并将消费后的数据存储至redis

    代码实现

    1: 假数据格式

    假数据是一条字符串,每条字符串有25个字段,字段与字段之间用空格相隔,每一个字段代表一个特定属性,以下是部分假数据

    16 608 0 10010018 17 10 18 19 13 37 839 0.000000 0.000000 0.000000 0 0 0 0 0 0 0 0 0 1508325219 138196
    16 608 0 10010018 17 10 18 19 13 38 89 0.000000 0.000000 0.000000 0 0 0 0 0 0 0 0 0 1508325219 388327
    16 608 0 10010018 17 10 18 19 13 38 339 0.000000 0.000000 0.000000 0 0 0 0 0 0 0 0 0 1508325219 638286
    16 608 0 10010018 17 10 18 19 13 38 589 0.000000 0.000000 0.000000 0 0 0 0 0 0 0 0 0 1508325219 888342
    16 608 0 10010018 17 10 18 19 13 38 839 0.000000 0.000000 0.000000 0 0 0 0 0 0 0 0 0 1508325220 138420
    16 608 0 10010018 17 10 18 19 13 39 89 0.000000 0.000000 0.000000 0 0 0 0 0 0 0 0 0 1508325220 388487
    16 608 0 10010018 17 10 18 19 13 39 339 0.000000 0.000000 0.000000 0 0 0 0 0 0 0 0 0 1508325220 638539
    

    真实环境,这些数据都是球员所带的护腿板上发出的数据,由于我们是local测试,所以需要提前造一些假数据存放在txt中,数据格式与上格式吻合即可

    2:maven配置及工具类
    <!-- storm -->
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>1.0.1</version>
        </dependency>
    
           <dependency>
               <groupId>org.apache.curator</groupId>
               <artifactId>curator-framework</artifactId>
               <version>2.10.0</version>
               <exclusions>
                   <exclusion>
                       <groupId>log4j</groupId>
                       <artifactId>log4j</artifactId>
                   </exclusion>
                   <exclusion>
                       <groupId>org.jboss.netty</groupId>
                       <artifactId>netty</artifactId>
                   </exclusion>
               </exclusions>
           </dependency>
           <dependency>
                <groupId>commons-collections</groupId>
                <artifactId>commons-collections</artifactId>
           </dependency>
        <!-- kafka -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <!--<version>0.9.0.0</version>-->
            <version>0.11.0.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
                <!--<exclusion>-->
                    <!--<groupId>kafka-clients</groupId>-->
                    <!--<artifactId>kafka-clients11</artifactId>-->
                <!--</exclusion>-->
            </exclusions>
        </dependency>
        <dependency>
               <groupId>redis.clients</groupId>
               <artifactId>jedis</artifactId>
               <version>2.8.1</version>
           </dependency>
    

    HdtasScheme.java

    import org.apache.storm.spout.Scheme;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    import org.apache.storm.utils.Utils;
    
    import java.nio.ByteBuffer;
    import java.nio.charset.Charset;
    import java.util.List;
    
    /**
     * Created by Jarno on 2017/5/31.
     */
    public class HdtasScheme implements Scheme {
        private static final Charset UTF8_CHARSET;
    
        //协议类型
        public static final String PROTOCOL_TYPE = "protocol_type";
        //场地ID
        public static final String FIELD_ID = "field_id";
        //主设备ID(终端id)
        public static final String UWB_ID = "uwb_id";
        //护腿板ID
        public static final String SIGN_ID = "sign_id";
        // 年 月 日   时 分 秒  毫秒
        public static final String YEAR = "year";
        public static final String MONTH = "month";
        public static final String DAY = "day";
        public static final String HOUR = "hour";
        public static final String MINUTE = "minute";
        public static final String SECOND = "second";
        public static final String MILLISECOND = "millisecond";
        //定位精度  X Y Z
        public static final String X = "x";
        public static final String Y = "y";
        public static final String Z = "z";
        //加速度 X Y Z
        public static final String A_SPEED_X = "a_speed_x";
        public static final String A_SPEED_Y = "a_speed_y";
        public static final String A_SPEED_Z = "a_speed_z";
        //陀螺仪 X Y Z
        public static final String GYROSCOPE_X = "gyroscope_x";
        public static final String GYROSCOPE_Y = "gyroscope_y";
        public static final String GYROSCOPE_Z = "gyroscope_z";
        //心率
        public static final String HEART_RATE = "heart_rate";
        //电池电量
        public static final String ELECTRIC = "electric";
        //电池充电状态  1:充电  0:放电
        public static final String CHARGING_STATUS = "charging_status";
        //Unix时间戳  秒
        public static final String SERVER_ACCEPT_TIME_S = "server_accept_time_s";
        //Unix时间戳  纳秒
        public static final String SERVER_ACCEPT_TIME_N = "server_accept_time_n";
    
    
        @Override
        public List<Object> deserialize(ByteBuffer ser) {
            String input = deserializeString(ser);
            String[] strs = input.split(" ");
            if (strs.length != 25) {
                return null;
            }
            return new Values(strs);
        }
    
        private static String deserializeString(ByteBuffer string) {
            if (string.hasArray()) {
                int base = string.arrayOffset();
                return new String(string.array(), base + string.position(), string.remaining());
            } else {
                return new String(Utils.toByteArray(string), UTF8_CHARSET);
            }
        }
    
        @Override
        public Fields getOutputFields() {
            return new Fields(PROTOCOL_TYPE, FIELD_ID, UWB_ID, SIGN_ID, YEAR, MONTH, DAY, HOUR, MINUTE, SECOND, MILLISECOND, X, Y, Z, A_SPEED_X, A_SPEED_Y, A_SPEED_Z, GYROSCOPE_X, GYROSCOPE_Y, GYROSCOPE_Z, HEART_RATE, ELECTRIC, CHARGING_STATUS, SERVER_ACCEPT_TIME_S, SERVER_ACCEPT_TIME_N);
        }
    
        static {
            UTF8_CHARSET = Charset.forName("UTF-8");
        }
    }
    
    2:kafkaProducer模拟数据源读取txt发送数据

    KafkaProducer.java

    import kafka.serializer.StringEncoder;
    import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.clients.producer.Producer;
    
    import java.io.BufferedReader;
    import java.io.FileReader;
    import java.io.IOException;
    import java.util.Properties;
    
    /**
     * @author lvfang
     * @create 2017-10-15 11:17
     * @desc
     **/
    public class KafkaProduce extends Thread {
        private String topic;//主题
    
        private String src;//数据源
    
        public KafkaProduce(String topic,String src){
            super();
            this.topic = topic;
            this.src = src;
        }
    
        //创建生产者
        private Producer<Integer, String> createProducer(){
            Properties properties = new Properties();
    
            //kafka单节点
            properties.put("metadata.broker.list", "192.168.90.240:9092");
            properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            properties.put(org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.90.240:9092");
    
            return new KafkaProducer<Integer, String>(properties);
        }
    
        public void run() {
            BufferedReader br = null;
            try {
    
                br = new BufferedReader(new FileReader(src));
                // 创建生产者
                Producer producer = createProducer();
    
                String line = null;
                // 循环发送消息到kafka
                while ((line = br.readLine()) != null) {
                    System.out.println("生产数据为:"+line);
                    producer.send(new ProducerRecord(topic,line + "\n"));
    
                    // 发送消息的时间间隔
                    Thread.sleep(1000);//TimeUnit.SECONDS.sleep(333);
                }
            } catch (Exception e) {
            } finally {
                try { if (br != null) br.close(); } catch (IOException e) {}
            }
        }
    
        public static void main(String[] args) {
            // 使用kafka集群中创建好的主题 test
            new KafkaProduce("htb_position_test","D:/testdata/htb_position_test_data.txt").start();
        }
    }
    

    写完生产之后我们先试试是否可以顺利发送数据

    2017-10-19_103248.png 2017-10-19_103312.png

    数据顺利发送,第一步 我们的数据源就成功了

    3:编写storm逻辑

    这里storm扮演的角色主要是计算,它将计算好的数据再次分发送给kafka;在这个过程中,主要有三个组件参与到其中,spout、bolt、topology,spout作为数据源接收kafka对应主题所分发的数据;bolt是数据计算组件,每一个bolt做一次计算,当然,这里我们为了简单,都在一个bolt中处理了;topology主要是storm组件的拼凑逻辑,即决定bolt的执行顺序,类似于工作流中的逻辑图一样。

    3.1:spout数据源,这里数据源spout进行了抽取,我们知道spout的实现由两种方式,这里使用实现IRichSpout

    KafkaSpout.java

    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.ByteArrayDeserializer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.storm.Config;
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.IRichSpout;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.tuple.Fields;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.Iterator;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.atomic.AtomicInteger;
    
    
    public abstract class KafkaSpout implements IRichSpout {
        private static final long serialVersionUID = 3679050534053612196L;
    
        public static final Logger logger = LoggerFactory.getLogger(KafkaSpout.class);
        private final Properties props;
        private SpoutOutputCollector collector;
        private List<String> topicList;
        private final AtomicInteger spoutPending;
    //    private int maxSpoutPending;
        private KafkaConsumer<String, byte[]> consumer;
    
        public KafkaSpout(Properties props, List<String> topics) {
            this.props = props;
            initProps();
            this.topicList = topics;
            this.spoutPending = new AtomicInteger();
        }
    
        private void initProps() {
            this.props.put("key.deserializer", StringDeserializer.class);
            this.props.put("value.deserializer", ByteArrayDeserializer.class);
        }
    
    
        public abstract Fields generateFields();
    
        public abstract List<Object> generateTuple(byte[] message);
    
        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.collector = collector;
    //        this.maxSpoutPending = Integer.parseInt(conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING).toString());
        }
    
        @Override
        public void close() {
    
        }
    
        @Override
        public void activate() {
            consumer = new KafkaConsumer(props);
            consumer.subscribe(topicList);
            final ExecutorService executor = Executors.newFixedThreadPool(topicList.size());
            executor.submit(new Runnable() {
                @Override
                public void run() {
                    while (true) {
                        ConsumerRecords<String, byte[]> records = consumer.poll(100);
                        Iterator<ConsumerRecord<String, byte[]>> iterator = records.iterator();
                        while (iterator.hasNext()) {
                            if (spoutPending.get() <= 0) {
                                sleep(1000);
                                continue;
                            }
                            ConsumerRecord<String, byte[]> next = iterator.next();
                            byte[] message = next.value();
                            List<Object> tuple = null;
                            try {
                                tuple = generateTuple(message);
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                            if (tuple == null) {
                                continue;
                            }
    //                          logger.info("kafka spout emit tuple:{}", tuple.toString());
                            collector.emit(tuple);
                            spoutPending.decrementAndGet();
                        }
                    }
                }
            });
        }
    
        @Override
        public void deactivate() {
            consumer.close();
        }
    
        @Override
        public void nextTuple() {
    //        if (spoutPending.get() < maxSpoutPending) {
    //            spoutPending.incrementAndGet();
    //        }
            spoutPending.incrementAndGet();
        }
    
        @Override
        public void ack(Object msgId) {
        }
    
        @Override
        public void fail(Object msgId) {
    
        }
    
        @Override
        public Map<String, Object> getComponentConfiguration() {
            return null;
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(generateFields());
        }
    
        private void sleep(long millisecond) {
            try {
                Thread.sleep(millisecond);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    

    上边的KafkaSpout是一个抽象方法,他实现了IRichSpout接口并重写了接收和发送消息的方法,而留有两个抽象方法为实现,一个是Fields generateFields(),他需要开发人员根据业务对不同的数据进行不同的实现,主要是返回数据切分后的列字段,已被bolt处用相应字段接收;另一个是List<Object> generateTuple(byte[] message)方法,他是一个真正的消息接收发送方法,他接收到消息对消息进行切分。业务人员要实现自己spout数据源需要基础KafkaSpout, 并重写以上两个方法。

    HdtasSpout.java

    import com.misbio.seer.storm.KafkaSpout;
    import com.misbio.seer.storm.hdtas.HdtasScheme;
    import org.apache.storm.tuple.Fields;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Properties;
    
    /**
     * @author lvfang
     * @create 2017-10-16 16:40
     * @desc
     **/
    public class HdtasSpout extends KafkaSpout {
    
    
        public HdtasSpout(Properties props, List<String> topics){
            super(props,topics);
        }
    
        public List<Object> generateTuple(byte[] message){
            System.out.println(new String(message).split(" ")[1]);
            String[] messages = new String(message).split(" ");
            if(messages.length!=25) return null;
    
    
    
            List<Object> tuple = new ArrayList<Object>();
            tuple.add(messages[0]);
            tuple.add(messages[1]);
            tuple.add(messages[2]);
            tuple.add(messages[3]);
            tuple.add(messages[4]);
            tuple.add(messages[5]);
            tuple.add(messages[6]);
            tuple.add(messages[7]);
            tuple.add(messages[8]);
            tuple.add(messages[9]);
            tuple.add(messages[10]);
            tuple.add(messages[11]);
            tuple.add(messages[12]);
            tuple.add(messages[13]);
            tuple.add(messages[14]);
            tuple.add(messages[15]);
            tuple.add(messages[16]);
            tuple.add(messages[17]);
            tuple.add(messages[18]);
            tuple.add(messages[19]);
            tuple.add(messages[20]);
            tuple.add(messages[21]);
            tuple.add(messages[22]);
            tuple.add(messages[23]);
            tuple.add(messages[24]);
    
            return tuple;
        }
    
        public Fields generateFields(){
            return new Fields(
                    HdtasScheme.PROTOCOL_TYPE,
                    HdtasScheme.FIELD_ID,
                    HdtasScheme.UWB_ID,
                    HdtasScheme.SIGN_ID,
                    HdtasScheme.YEAR,
                    HdtasScheme.MONTH,
                    HdtasScheme.DAY,
                    HdtasScheme.HOUR,
                    HdtasScheme.MINUTE,
                    HdtasScheme.SECOND,
                    HdtasScheme.MILLISECOND,
                    HdtasScheme.X,
                    HdtasScheme.Y,
                    HdtasScheme.Z,
                    HdtasScheme.A_SPEED_X,
                    HdtasScheme.A_SPEED_Y,
                    HdtasScheme.A_SPEED_Z,
                    HdtasScheme.GYROSCOPE_X,
                    HdtasScheme.GYROSCOPE_Y,
                    HdtasScheme.GYROSCOPE_Z,
                    HdtasScheme.HEART_RATE,
                    HdtasScheme.ELECTRIC,
                    HdtasScheme.CHARGING_STATUS,
                    HdtasScheme.SERVER_ACCEPT_TIME_S,
                    HdtasScheme.SERVER_ACCEPT_TIME_N
            );
        };
    }
    
    3.2Bolt编写,此bolt主要对spout所发送的数据进行接收计算处理,处理之后并下发给对应的主题供kafka再次消费使用,这里同样bolt的实现方式也有两种,我们这里采用继承BaseBasicBolt的实现方式

    HdtasDataAllBolt.java

    import com.misbio.seer.kafka.DefaultProducer;
    import com.misbio.seer.storm.hdtas.HdtasScheme;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.BasicOutputCollector;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseBasicBolt;
    import org.apache.storm.tuple.Tuple;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.math.BigDecimal;
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.*;
    
    /**
     * @author lvfang
     * @create 2017-10-16 17:33
     * @desc
     **/
    public class HdtasDataAllBolt extends BaseBasicBolt {
            private final static Logger LOG = LoggerFactory.getLogger(HdtasDataAllBolt.class);
            private String kafkaUri;
            private Producer<String, String> createProducer;
    
            public HdtasDataAllBolt(){}
    
            public HdtasDataAllBolt(String kafkaUri) {
                this.kafkaUri = kafkaUri;
            }
    
            @Override
            public void prepare(Map stormConf, TopologyContext context) {
                super.prepare(stormConf, context);
                this.createProducer = new DefaultProducer().createProducer(kafkaUri, null);
            }
    
            long start = System.currentTimeMillis();
            int count = 0;
    
            @Override
            public void execute(Tuple input, BasicOutputCollector collector) {
    
                String fieldId = input.getStringByField(HdtasScheme.FIELD_ID);//获取球场id
                String signId = input.getStringByField(HdtasScheme.SIGN_ID);//标签id
    
                //加速度 X Y Z
                String aSpeedX = input.getStringByField(HdtasScheme.A_SPEED_X);
                String aSpeedY = input.getStringByField(HdtasScheme.A_SPEED_Y);
                String aSpeedZ = input.getStringByField(HdtasScheme.A_SPEED_Z);
                //定位精度  X Y Z
                String x = input.getStringByField(HdtasScheme.X);
                String y = input.getStringByField(HdtasScheme.Y);
                String z = input.getStringByField(HdtasScheme.Z);
                //陀螺仪 X Y Z
                String gyroscopeX = input.getStringByField(HdtasScheme.GYROSCOPE_X);
                String gyroscopeY = input.getStringByField(HdtasScheme.GYROSCOPE_Y);
                String gyroscopeZ = input.getStringByField(HdtasScheme.GYROSCOPE_Z);
    
                String heartRate = input.getStringByField(HdtasScheme.HEART_RATE);//心率
                String electric = input.getStringByField(HdtasScheme.ELECTRIC);//电池电量
    
                String charingStatus = input.getStringByField(HdtasScheme.CHARGING_STATUS);//充电状态
                String timeS = input.getStringByField(HdtasScheme.SERVER_ACCEPT_TIME_S);//时间戳 秒
                long time = convertMilTime(input);
                if ("1523".equals(signId)){
                    return;
                }
    
                //x y < 0.01
                double doubleX = Double.valueOf(x).doubleValue();
                double doubleY = Double.valueOf(y).doubleValue();
    
                if(doubleX<0.01 || doubleY <0.01){
                    return;
                }
    //            if(x.equals("0.000000") || y.equals("0.000000")){
    //                return;
    //            }
                System.out.println("X&Y : " + x + " : " + y);
                //三个轴的加速度值 单位g  16384/8 = 2048  16g量程
                double ax = BigDecimal.valueOf(Integer.parseInt(aSpeedX)).divide(BigDecimal.valueOf(2048), 3, BigDecimal.ROUND_HALF_DOWN).doubleValue();
                double ay = BigDecimal.valueOf(Integer.parseInt(aSpeedY)).divide(BigDecimal.valueOf(2048), 3, BigDecimal.ROUND_HALF_DOWN).doubleValue();
                double az = BigDecimal.valueOf(Integer.parseInt(aSpeedZ)).divide(BigDecimal.valueOf(2048), 3, BigDecimal.ROUND_HALF_DOWN).doubleValue();
                //三个轴的陀螺仪值 单位为dps 度每秒  131/8 = 16.375  2000度量程
                double gx = BigDecimal.valueOf(Integer.parseInt(gyroscopeX)).divide(BigDecimal.valueOf(16.375), 3, BigDecimal.ROUND_HALF_DOWN).doubleValue();
                double gy = BigDecimal.valueOf(Integer.parseInt(gyroscopeY)).divide(BigDecimal.valueOf(16.375), 3, BigDecimal.ROUND_HALF_DOWN).doubleValue();
                double gz = BigDecimal.valueOf(Integer.parseInt(gyroscopeZ)).divide(BigDecimal.valueOf(16.375), 3, BigDecimal.ROUND_HALF_DOWN).doubleValue();
    
                //加速度矢量
                double r = BigDecimal.valueOf(Math.sqrt(ax * ax + ay * ay + az * az)).setScale(3,BigDecimal.ROUND_HALF_DOWN).doubleValue();
                if (0d == r) {
                    return;
                }
                //结果放入kafka中(hdtas_all_ori_a_speed)
                final String result = signId + "," + ax + "," + ay + "," + az + "," + r + "," + time;
    
                createProducer.send(new ProducerRecord("hdtas_all_ori_a_speed",fieldId,result));
                long end = System.currentTimeMillis();
    
                if (end - start <= 1000) count++; else { start = end; count = 0;}
                //认为异常数据 不予处理
                if (ax == 0 && ay == 0 && az == 0) return;
    
                //计算步数
                Integer step = computeStep(fieldId, signId, r, time);
                //计算三个轴的角度变化
                computeAngle(ax, ay, az);
                //计算瞬时速度
                computeInstantSpeed(time, ax, ay, az);
                //计算位置
                computePosition(signId , fieldId , x , y , z , time);
                //计算速度(有问题)
                computeSpeed(signId , fieldId , x , y , z , time);
                //其他数据(陀螺仪,加速度,心率,电量等等)
                computeOther(signId , fieldId , electric , heartRate , time , ax , ay , az , gx , gy , gz , r , step);
                //计算灵敏度
                computeAgile(signId , fieldId , gx , gy , gz , time);
                //计算电量
                computeBattery(signId , fieldId , electric , time);
                //计算距离s
                computeDistance(signId , fieldId , x , y , z , time);
                //计算心率
                computeHeartrate(signId , fieldId , heartRate , time);
            }
    
            Map<String, Double> lastRMap = new HashMap<String, Double>();//加速度矢量
            Map<String, Integer> stepMap = new HashMap<String, Integer>();
            //    Map<String, Long> currentPeakTimeMap = new HashMap<String, Long>();
            Map<String, Long> lastPeakTimeMap = new HashMap<String, Long>();
    
            /**
             * 计算步数
             * @param fieldId
             * @param signId
             * @param r
             * @param time
             */
            private Integer computeStep(String fieldId, String signId, double r, long time) {
                Double lastR = lastRMap.get(signId);
                if (lastR == null) {
                    lastRMap.put(signId, r);
                } else {
                    boolean isPeak = detectPeak(signId, lastR, r);
                    if (isPeak){
                        Long lastPeakTime = lastPeakTimeMap.get(signId);
                        if (lastPeakTime == null) {
                            lastPeakTime = 0L;
                        }
                        Double peak = lastPeakMap.get(signId);
                        Double valley = lastValleyMap.get(signId);
                        if (peak == null || valley == null){
                            return 0;
                        }
                        //得到阙值
                        double threshold = peak - valley;
                        //System.out.println("阈值:" + threshold);
                        Integer step = stepMap.get(signId);
                        if (step == null) {
                            step = 0;
                            stepMap.put(signId, 0);
                        }
                        Double bt = baseThresholdMap.get(signId);
                        if (bt == null){
                            baseThresholdMap.put(signId, baseThreshold);
                            bt = baseThreshold;
                        }
                        System.out.println("baseThreshold:" + bt);
                        if (time - lastPeakTime >= 250 && (threshold >= bt)){
                            lastPeakTimeMap.put(signId, time);
                            stepMap.put(signId, step + 1);
                        }
                        if (time - lastPeakTime >= 250 && (threshold >= 1.3d)){
                            lastPeakTimeMap.put(signId, time);
                            double aveThreshold = computeThreshold(signId, threshold);
                            System.out.println("aveThreshold:" + aveThreshold);
                            baseThresholdMap.put(signId, aveThreshold);
                        }
    
                        //结果放入kafka中
                        final String result = signId + "," + threshold;
                        final StringBuilder sb = new StringBuilder(result);
                        sb.append(",").append(stepMap.get(signId));
    
                        return stepMap.get(signId);
    
    //                createProducer.send(new ProducerRecord("hdtas_all_step",fieldId,sb.toString()));
                    }
                    lastRMap.put(signId, r);
                }
                return 0;
    
            }
            private double stepAvgNum = 4;
            private double computeThreshold(String signId, double threshold) {
                double temp = baseThreshold;
                List<Double> thresholdList = aveThresholdMap.get(signId);
                if (thresholdList == null){
                    thresholdList = new ArrayList<Double>();
                }
                //System.out.println("size:" +thresholdList.size());
                if (thresholdList.size() < stepAvgNum) {
                    thresholdList.add(threshold);
                    aveThresholdMap.put(signId,thresholdList);
                }else{
                    temp = computeAvgThreshold(thresholdList);
                    thresholdList.add(threshold);
                    thresholdList.remove(0);
                    aveThresholdMap.put(signId,thresholdList);
                }
    
                return temp;
            }
    
    
            private double computeAvgThreshold(List<Double> thresholdList) {
                int size = thresholdList.size();
                double all = 0;
                for (double d : thresholdList){
                    all += d;
                }
                double ave = new BigDecimal(all).divide(new BigDecimal(size), 3).setScale(3, BigDecimal.ROUND_HALF_DOWN).doubleValue();
                if (ave >= 8) {
                    ave = 4.3d;
                }
                else if(ave >= 7 && ave < 8){
                    ave = 3.3d;
                }
                else if(ave >= 4 && ave < 7){
                    ave = 2.3d;
                }
                else if(ave >= 3 && ave <4){
                    ave = 2.0d;
                }
                else {
                    ave = 1.3d;
                }
                return ave;
            }
    
            //记录当前上升状态
            private Map<String, Boolean> upMap = new HashMap<String, Boolean>();
            //记录上一次上升状态
            private Map<String, Boolean> lastUpMap = new HashMap<String, Boolean>();
            //记录上升趋势次数
            private Map<String, Integer> upCountMap = new HashMap<String, Integer>();
            //记录上一次上升趋势次数
            private Map<String, Integer> lastUpCountMap = new HashMap<String, Integer>();
            //记录波峰的值
            private Map<String,Double> lastPeakMap = new HashMap<String, Double>();
            //记录波谷的值
            private Map<String,Double> lastValleyMap = new HashMap<String, Double>();
            private Map<String,List<Double>> aveThresholdMap = new HashMap<String, List<Double>>();
            private Map<String,Double> thresholdMap = new HashMap<String, Double>();
    
            private double baseThreshold = 2.0d;
            private Map<String,Double> baseThresholdMap = new HashMap<String, Double>();
    
            /**
             * 检测波峰
             *
             * @param signId
             * @param lastR
             * @param newR   1.下降趋势
             *               2.上一个点为上升趋势
             *               3.持续上升次数大于等于2
             *               4.波峰值判断(>=4走路 >=6慢跑 >=8快跑 >=9跳跃)
             * @return
             */
            private boolean detectPeak(String signId, Double lastR, double newR) {
                Boolean isUp = upMap.get(signId);
                if (isUp == null) {
                    lastUpMap.put(signId, false);
                }else{
                    lastUpMap.put(signId, isUp);
                }
                Integer count = upCountMap.get(signId);
                //上升趋势
                // && (newR - lastR) >= baseThreshold
                if (newR >= lastR) {
                    upMap.put(signId, true);
                    upCountMap.put(signId, count == null ? 1 : count + 1);
                } else {
                    lastUpCountMap.put(signId, count == null ? 0 : count);
                    upCountMap.put(signId, 0);
                    upMap.put(signId, false);
    
                }
                boolean lastUp = lastUpMap.get(signId);
                boolean currentUp = upMap.get(signId);
                Integer lastUpCount = lastUpCountMap.get(signId);
                if (lastUpCount == null) {lastUpCount = 0;}
                double baseR = 20 / 9.8;
                if( !currentUp && lastUp && (lastUpCount >= 2 || lastR >= baseR)){
                    lastPeakMap.put(signId, lastR);
                    return true;
                }
                else if (currentUp && !lastUp){
                    lastValleyMap.put(signId, lastR);
                    return false;
                }else {
                    return false;
                }
            }
    
            @Override
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
    
            }
    
            /**
             * 计算三个轴的角度变化
             * @param ax
             * @param ay
             * @param az
             */
            private void computeAngle(double ax, double ay, double az) {
                double r = Math.sqrt(ax * ax + ay * ay + az * az);
                double arx = (180 / Math.PI) * Math.acos(ax / r);
                double ary = (180 / Math.PI) * Math.acos(ay / r);
                double arz = (180 / Math.PI) * Math.acos(az / r);
    
    
    //        double myarx = -1;
    //        double myary = -1;
    ////        System.out.println("----------------转换后角度");
    //        if (arz > 0 && arz <= 90) {
    //            myarx = arx - 90;
    //            myary = ary - 90;
    //        }
    //        if (arz > 90) {
    //            if (arx >= 0 && arx <= 90) {
    //                myarx = -90 - arx;
    //            }
    //            if (arx > 90) {
    //                myarx = 180 - arx + 90;
    //            }
    //            if (ary >= 0 && ary <= 90) {
    //                myary = - 90 - ary;
    //            }
    //            if (ary > 90) {
    ////                System.out.println("Y轴:" + (180 - ary + 90));
    //                myary = 180 - ary + 90;
    //            }
    //        }
    //
            }
    
    
            private long lastMillisecond = -1;
            private long lastTime = 0;
            private double lastAx = -1;
            private double lastAy = -1;
            private double lastAz = -1;
            private double lastV = 0;
    
            /**
             * 计算瞬时速度
             * @param time
             * @param ax
             * @param ay
             * @param az
             */
            private void computeInstantSpeed(long time, double ax, double ay, double az) {
                if (lastMillisecond != -1 && lastAx != -1 && lastAy != -1 && lastAz != -1) {
                    double ds = (time - lastMillisecond) / 1000d;
                    if (ds <= 0) {
                        lastMillisecond = time;
                        return;
                    }
    
                    double vx = (ax - lastAx) / 2 * ds;
                    double vy = (ay - lastAy) / 2 * ds;
                    double vz = (az - lastAz) / 2 * ds;
    
    //            double vx_pow = Math.pow(vx , 2);
    //            double vy_pow = Math.pow(vy, 2);
    //            double vz_pow = Math.pow(vz, 2);
    
    //            double v = Math.sqrt(vx_pow + vy_pow + vz_pow);
                    double v = vx + vy + vz;
                    lastV = v;
    //            if(v > 8 ){
    //            if (v < 0){
    //                v = 0.000;
    //            }
                    v = new BigDecimal(v).setScale(3, BigDecimal.ROUND_HALF_DOWN).doubleValue();
    //            System.out.println("瞬时速度为:" + v);
    
                }
                lastMillisecond = time;
                lastAx = ax;
                lastAy = ay;
                lastAz = az;
            }
    
            private long convertMilTime(Tuple input) {
                String year = input.getStringByField(HdtasScheme.YEAR);
                String month = input.getStringByField(HdtasScheme.MONTH);
                String day = input.getStringByField(HdtasScheme.DAY);
                String hour = input.getStringByField(HdtasScheme.HOUR);
                String minute = input.getStringByField(HdtasScheme.MINUTE);
                String second = input.getStringByField(HdtasScheme.SECOND);
    
                String millisecond = input.getStringByField(HdtasScheme.MILLISECOND);
                SimpleDateFormat sdf = new SimpleDateFormat("yy-MM-dd HH:mm:ss:SSS");
                String time = year + "-" + month + "-" + day + " " + hour + ":" + minute + ":" + second + ":" + millisecond;
    //            System.out.println(time);
                try {
                    Date date = sdf.parse(time);
    //                System.out.println(date.getTime());
                    //线上时间
                    return date.getTime();
                    //测试时间
    //                return System.currentTimeMillis();
                } catch (ParseException e) {
                    e.printStackTrace();
                    LOG.error("convertMilTime date format error, error info is {}", e.getMessage());
                    return System.currentTimeMillis();
                }
            }
    
            /**
             * 计算灵敏度
             * @param userId
             * @param fieldId
             * @param x
             * @param y
             * @param z
             * @param time
             */
            public void computeAgile(String userId , String fieldId , double x , double y , double z , Long time){
                final String result = userId + "," + fieldId + "," + x + "," + y + "," + z + "," + time;
    
                createProducer.send(new ProducerRecord("hdtas_agile_bolt",fieldId,result));
            }
    
            /**
             * 计算电量
             * @param userId
             * @param electric
             * @param time
             */
            public void computeBattery(String userId , String fieldId , String electric , Long time){
                final String result = userId + "," + electric + "," +time;
    
                createProducer.send(new ProducerRecord("hdtas_battery_bolt",fieldId,result));
            }
    
    
            private final Map<String , String> oldXdis = new HashMap<String, String>();
            private final Map<String , String> oldYdis = new HashMap<String, String>();
            private final Map<String , Long> oldTimedis = new  HashMap<String , Long>();
            private final Map<String , Long> oldTimeDistance = new  HashMap<String , Long>();
            private Map<String,Position> distancePositionMap = new HashMap<String, Position>();
            /**
             * 计算距离
             * @param userId
             * @param fieldId
             * @param x
             * @param y
             * @param z
             * @param time
             */
            public void computeDistance(String userId , String fieldId , String x , String y , String z , Long time){
                double dx = Double.parseDouble(x);
                double dy = Double.parseDouble(y);
                double dz = Double.parseDouble(z);
    
                Long oldLongTime = oldTimeDistance.get(userId);
                Position p = distancePositionMap.get(userId);
    
                if(p == null){
                    oldTimeDistance.put(userId, time);
                    distancePositionMap.put(userId,new Position(dx,dy,dz));
                    return;
                }
                if(oldLongTime == null){
                    oldTimeDistance.put(userId,time);
                    return;
                }
    
                double absX = Math.abs((dz-p.getX()));
                double absY = Math.abs((dy-p.getY()));
                double absZ = Math.abs((dz-p.getZ()));
    
    
                double dis = new BigDecimal(Math.sqrt(absX*absX + absY*absY)).setScale(5,BigDecimal.ROUND_HALF_UP).doubleValue();
                long diff = Math.abs(time - oldLongTime);
    
                if (diff >= 1000){
                    double speed =  new BigDecimal(dis).divide(new BigDecimal(diff).divide(new BigDecimal(1000),3,BigDecimal.ROUND_HALF_EVEN),5,BigDecimal.ROUND_HALF_EVEN).doubleValue();
    
                    //存储
                    Position p1 = new Position(dx,dy,dz);
                    distancePositionMap.put(userId, p1);
                    oldTimeDistance.put(userId, time);
    
                    String result = userId + "," + fieldId + "," +time + "," + diff + "," + dis + "," + speed;
    
                    createProducer.send(new ProducerRecord("hdtas_distance_bolt",fieldId,result));
                }
            }
    
            /**
             * 计算心率
             * @param userId
             * @param fieldId
             * @param heartRate
             * @param time
             */
            public void computeHeartrate(String userId , String fieldId , String heartRate , Long time){
                final String result = userId + "," + fieldId + "," +time + "," + heartRate;
    
                createProducer.send(new ProducerRecord("hdtas_heartrate_bolt",fieldId,result));
            }
    
            /**
             * 计算坐标
             * @param userId
             * @param fieldId
             * @param x
             * @param y
             * @param z
             * @param time
             */
            public void computePosition(String userId , String fieldId , String x , String y , String z , Long time){
                final String result = userId + "," + fieldId + "," +time + "," + x + "," + y;
                System.out.println("位置数据:" + result);
                createProducer.send(new ProducerRecord("hdtas_position_bolt",fieldId,result));
            }
    
            private final Map<String , Long> oldTimespeed = new  HashMap<String , Long>();
            private Map<String,Position> speedPositionMap = new HashMap<String, Position>();
    
            /**
             * 计算速度
             * @param userId
             * @param fieldId
             * @param x
             * @param y
             * @param z
             * @param time
             */
            public void computeSpeed(String userId , String fieldId , String x , String y , String z , long time){
                double sx = Double.parseDouble(x);
                double sy = Double.parseDouble(y);
                double sz = Double.parseDouble(z);
    
                Long oldLongTime = oldTimespeed.get(userId);
                Position p = speedPositionMap.get(userId);
                if(p == null){
                    oldTimespeed.put(userId, time);
                    speedPositionMap.put(userId,new Position(sx,sy,sz));
                    return;
                }
                if (oldLongTime == null){
                    oldTimespeed.put(userId, time);
                    return;
                }
    
    
                double absX = Math.abs((sz-p.getX()));
                double absY = Math.abs((sy-p.getY()));
                double absZ = Math.abs((sz-p.getZ()));
    
                double dis = new BigDecimal(Math.sqrt(absX*absX + absY*absY)).setScale(5,BigDecimal.ROUND_HALF_UP).doubleValue();
                long diff = Math.abs(time - oldLongTime);
                System.out.println("diff : ++++++++++++" + absX+","+absY);
                System.out.println("dis : ++++++++++++" + dis);
    //            double speed = dis/(diff/1000);
                if (diff >= 1000){
                    double speed =  new BigDecimal(dis).divide(new BigDecimal(diff).divide(new BigDecimal(1000),3,BigDecimal.ROUND_HALF_EVEN),5,BigDecimal.ROUND_HALF_EVEN).doubleValue();
                    System.out.println("speed : ++++++++++++" + dis);
    
                    //存储
                    Position p1 = new Position(sx,sy,sz);
                    speedPositionMap.put(userId, p1);
                    oldTimespeed.put(userId, time);
    
                    String result = userId + "," + fieldId + "," +time + "," + diff + "," + dis + "," + speed;
                    System.out.println("速度数据:" + result);
                    createProducer.send(new ProducerRecord("hdtas_speed_bolt",fieldId,result));
                }
    
            }
    
            /**
             * 其他参数
             * @param userId 用户id
             * @param fieldId 球场id
             * @param battery 电量
             * @param heartRate 心率
             * @param time 当前毫秒值
             * @param ax
             * @param ay
             * @param az
             * @param gx
             * @param gy
             * @param gz
             * @param r
             */
            public void computeOther(String userId , String fieldId , String battery , String heartRate , long time , double ax , double ay , double az , double gx , double gy , double gz , double r , Integer step){
    
                String result = userId + "," + fieldId + "," + battery+ "," + heartRate + ","
                        + ax + "," + ay + "," + az + ","
                        + gx + "," + gy + "," + gz + ","
                        + r + "," + step + "," + time;
                System.out.println("其他数据: " + result);
                createProducer.send(new ProducerRecord("hdtas_other_bolt",fieldId,result));
            }
        }
    
    
    3.3topology编写,这里topology是一个bolt的逻辑视图,决定bolt的流向
    import com.misbio.seer.storm.hdtas.bolt.HdtasDataAllBolt;
    import com.misbio.seer.storm.hdtas.spout.HdtasSpout;
    import com.misbio.seer.utils.ResourceUtils;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.common.serialization.BytesDeserializer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.topology.TopologyBuilder;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Properties;
    
    /**
     * @author lvfang
     * @create 2017-10-16 16:31
     * @desc
     **/
    public class TopologyHdtas {
    
        // 主题与zk端口(local)
        public static final Integer ZK_PORT = Integer.parseInt(ResourceUtils.getProperty("zookeeper.port"));
        public static final String ZK_HOST = ResourceUtils.getProperty("zookeeper.host");
        public static final String ZKINFO = ResourceUtils.getProperty("zookeeper.connect.kafka");
    
        public static final String KAFKA_URL = ResourceUtils.getProperty("kafka.broker");
        public static final String ZKROOT = "/hdtas";
        public static final String SPOUTID = "hdtas";
    
        private static final String HDTAS_SPOUT = "hdtasSpout";
        private static final String HDTAS_ALL_BOLT = "hdtas_all_bolt";
    
        public static void main(String[] args) throws Exception{
            Config cfg = new Config();
            cfg.setNumWorkers(2);
            cfg.setDebug(true);
    
            Properties properties = new Properties();
            //kafka 设置
            properties.put("group.id", "group1");
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_URL);
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
    
            List<String> topics = new ArrayList<String>();
            topics.add("htb_position_test");
    
            //组件执行顺序逻辑
            TopologyBuilder builder = new TopologyBuilder();
            //先创造spout数据源PWSpout
            builder.setSpout("spout", new HdtasSpout(properties,topics));
    
            //在创建bolt组件PrintBolt,并接受分组id为“spout”的数据
            builder.setBolt("print-bolt", new HdtasDataAllBolt(KAFKA_URL)).shuffleGrouping("spout");
    
            //1 本地模式
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("top1", cfg, builder.createTopology());//提交分析
            Thread.sleep(10000000);//10000毫秒后停止实时计算
            cluster.killTopology("top1");
            cluster.shutdown();
    
            //2集群模式
    //        StormSubmitter.submitTopology("top1", cfg,builder.createTopology());
        }
    }
    

    在编写完topology之后,我们就可以启动程序看看能否跑通

    2017-10-19_112349.png 2017-10-19_112448.png

    我们可以看到,数据经过处理后已经发送到对应的主题,剩下的工作就是编写主题消费后持久化到redis

    3.4KafkaCusumer主要对storm计算后的数据进行消费,storm计算后将数据根据类型分别下发到了位置主题、距离主题、速度主题、心率主题、电量主题、陀螺仪主题等等,这里我们只编写一个主题的消费,其他等同

    KafkaCusumer.java

    import org.apache.kafka.clients.consumer.*;
    import org.apache.kafka.common.serialization.BytesDeserializer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.util.Arrays;
    import java.util.Properties;
    
    
    /**
     * @author lvfang
     * @create 2017-10-15 11:17
     * @desc
     **/
    public class KafkaCusumer extends Thread  {
        private String topic;//主题
    
        public final String SRC = "D:/testdata/testData.txt";
    
        public KafkaCusumer(String topic){
            super();
            this.topic = topic;
        }
    
        //创建消费者
        private Consumer<String, String> createConsumer(){
            Properties properties = new Properties();
            //必须要使用别的组名称, 如果生产者和消费者都在同一组,则不能访问同一组内的topic数据
            properties.put("group.id", "group1");
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.90.240:9092");
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
    
            return new KafkaConsumer<String, String>(properties);
    
        }
    
        @Override
        public void run() {
            //创建消费者
            Consumer consumer = createConsumer();
    
            consumer.subscribe(Arrays.asList(topic));
    
            while (true) {
    
                ConsumerRecords<String, byte[]> records = consumer.poll(100);
    
                for (ConsumerRecord<String, byte[]> record : records) {
    
                    System.out.println("消费到的数据为:"+record.value());
                    //System.out.printf("接收到: ", record.offset(), record.key(), record.value());
                }
            }
        }
    
        public static void main(String[] args) {
            new KafkaCusumer("htb_position_test").start();
        }
    }
    

    编写完后我们可以启动消费,看看是否可以消费到

    2017-10-19_113515.png

    这里消费是将数据直接打印出来了,至于持久化,我们在打印之后存储即可,这里就不在写了,完毕!!!

    相关文章

      网友评论

          本文标题:Strom自行整合kafka

          本文链接:https://www.haomeiwen.com/subject/dtbsuxtx.html