前言
最近由于业务线上环境kafka版本有变,导致与其配套的storm已经redis版本都要随之更新。本来想着将kafka的线上版本改回之前版本,这样所有的东西都不用随之变动,不幸的是老大告知线上kafka版本不能变,所以只能去找对应的storm和redis版本了,(注:线上kafka为2.11.0.11.0),在更新storm时发现还没有与之对应的storm-kafka匹配版本,这就意味着不能用storm提供的storm与kafka的整合包了,所以只能自行手写整合。
任务
本次要完成的任务与之前相同,还是走一套完整的storm分析(kafka-storm-kafka-redis),用一个kafka的生产者模拟数据源发给storm,这里要提说的是storm用最传统的开发方式(topology-spout-bolt)
topology:拓扑(bolt的执行逻辑)
spout:storm的数据源,这里接收kafka的数据
bolt:计算任务组件,可以有一个或者多个
环境
kafka2.11.0.11.0
zookeeper3.4.5.0
storm1.0.1
redis3.2.11.0
数据流向
(1):kafkaProducer生产者发送数据到test主题
(2):storm从test主题获取数据(spout组件获取)
(3):spout组件发送给AllDataBolt进行数据处理
(4):AllDataBolt在execute()方法中进行业务处理并将所处理的数据下发给kafka各个数据主题
(5):kafka各个数据主题对应的消费者进行数据消费,并将消费后的数据存储至redis
代码实现
1: 假数据格式
假数据是一条字符串,每条字符串有25个字段,字段与字段之间用空格相隔,每一个字段代表一个特定属性,以下是部分假数据
16 608 0 10010018 17 10 18 19 13 37 839 0.000000 0.000000 0.000000 0 0 0 0 0 0 0 0 0 1508325219 138196
16 608 0 10010018 17 10 18 19 13 38 89 0.000000 0.000000 0.000000 0 0 0 0 0 0 0 0 0 1508325219 388327
16 608 0 10010018 17 10 18 19 13 38 339 0.000000 0.000000 0.000000 0 0 0 0 0 0 0 0 0 1508325219 638286
16 608 0 10010018 17 10 18 19 13 38 589 0.000000 0.000000 0.000000 0 0 0 0 0 0 0 0 0 1508325219 888342
16 608 0 10010018 17 10 18 19 13 38 839 0.000000 0.000000 0.000000 0 0 0 0 0 0 0 0 0 1508325220 138420
16 608 0 10010018 17 10 18 19 13 39 89 0.000000 0.000000 0.000000 0 0 0 0 0 0 0 0 0 1508325220 388487
16 608 0 10010018 17 10 18 19 13 39 339 0.000000 0.000000 0.000000 0 0 0 0 0 0 0 0 0 1508325220 638539
真实环境,这些数据都是球员所带的护腿板上发出的数据,由于我们是local测试,所以需要提前造一些假数据存放在txt中,数据格式与上格式吻合即可
2:maven配置及工具类
<!-- storm -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.10.0</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.jboss.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
</dependency>
<!-- kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<!--<version>0.9.0.0</version>-->
<version>0.11.0.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<!--<exclusion>-->
<!--<groupId>kafka-clients</groupId>-->
<!--<artifactId>kafka-clients11</artifactId>-->
<!--</exclusion>-->
</exclusions>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.8.1</version>
</dependency>
HdtasScheme.java
import org.apache.storm.spout.Scheme;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.List;
/**
* Created by Jarno on 2017/5/31.
*/
public class HdtasScheme implements Scheme {
private static final Charset UTF8_CHARSET;
//协议类型
public static final String PROTOCOL_TYPE = "protocol_type";
//场地ID
public static final String FIELD_ID = "field_id";
//主设备ID(终端id)
public static final String UWB_ID = "uwb_id";
//护腿板ID
public static final String SIGN_ID = "sign_id";
// 年 月 日 时 分 秒 毫秒
public static final String YEAR = "year";
public static final String MONTH = "month";
public static final String DAY = "day";
public static final String HOUR = "hour";
public static final String MINUTE = "minute";
public static final String SECOND = "second";
public static final String MILLISECOND = "millisecond";
//定位精度 X Y Z
public static final String X = "x";
public static final String Y = "y";
public static final String Z = "z";
//加速度 X Y Z
public static final String A_SPEED_X = "a_speed_x";
public static final String A_SPEED_Y = "a_speed_y";
public static final String A_SPEED_Z = "a_speed_z";
//陀螺仪 X Y Z
public static final String GYROSCOPE_X = "gyroscope_x";
public static final String GYROSCOPE_Y = "gyroscope_y";
public static final String GYROSCOPE_Z = "gyroscope_z";
//心率
public static final String HEART_RATE = "heart_rate";
//电池电量
public static final String ELECTRIC = "electric";
//电池充电状态 1:充电 0:放电
public static final String CHARGING_STATUS = "charging_status";
//Unix时间戳 秒
public static final String SERVER_ACCEPT_TIME_S = "server_accept_time_s";
//Unix时间戳 纳秒
public static final String SERVER_ACCEPT_TIME_N = "server_accept_time_n";
@Override
public List<Object> deserialize(ByteBuffer ser) {
String input = deserializeString(ser);
String[] strs = input.split(" ");
if (strs.length != 25) {
return null;
}
return new Values(strs);
}
private static String deserializeString(ByteBuffer string) {
if (string.hasArray()) {
int base = string.arrayOffset();
return new String(string.array(), base + string.position(), string.remaining());
} else {
return new String(Utils.toByteArray(string), UTF8_CHARSET);
}
}
@Override
public Fields getOutputFields() {
return new Fields(PROTOCOL_TYPE, FIELD_ID, UWB_ID, SIGN_ID, YEAR, MONTH, DAY, HOUR, MINUTE, SECOND, MILLISECOND, X, Y, Z, A_SPEED_X, A_SPEED_Y, A_SPEED_Z, GYROSCOPE_X, GYROSCOPE_Y, GYROSCOPE_Z, HEART_RATE, ELECTRIC, CHARGING_STATUS, SERVER_ACCEPT_TIME_S, SERVER_ACCEPT_TIME_N);
}
static {
UTF8_CHARSET = Charset.forName("UTF-8");
}
}
2:kafkaProducer模拟数据源读取txt发送数据
KafkaProducer.java
import kafka.serializer.StringEncoder;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.clients.producer.Producer;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.Properties;
/**
* @author lvfang
* @create 2017-10-15 11:17
* @desc
**/
public class KafkaProduce extends Thread {
private String topic;//主题
private String src;//数据源
public KafkaProduce(String topic,String src){
super();
this.topic = topic;
this.src = src;
}
//创建生产者
private Producer<Integer, String> createProducer(){
Properties properties = new Properties();
//kafka单节点
properties.put("metadata.broker.list", "192.168.90.240:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put(org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.90.240:9092");
return new KafkaProducer<Integer, String>(properties);
}
public void run() {
BufferedReader br = null;
try {
br = new BufferedReader(new FileReader(src));
// 创建生产者
Producer producer = createProducer();
String line = null;
// 循环发送消息到kafka
while ((line = br.readLine()) != null) {
System.out.println("生产数据为:"+line);
producer.send(new ProducerRecord(topic,line + "\n"));
// 发送消息的时间间隔
Thread.sleep(1000);//TimeUnit.SECONDS.sleep(333);
}
} catch (Exception e) {
} finally {
try { if (br != null) br.close(); } catch (IOException e) {}
}
}
public static void main(String[] args) {
// 使用kafka集群中创建好的主题 test
new KafkaProduce("htb_position_test","D:/testdata/htb_position_test_data.txt").start();
}
}
写完生产之后我们先试试是否可以顺利发送数据
2017-10-19_103248.png 2017-10-19_103312.png数据顺利发送,第一步 我们的数据源就成功了
3:编写storm逻辑
这里storm扮演的角色主要是计算,它将计算好的数据再次分发送给kafka;在这个过程中,主要有三个组件参与到其中,spout、bolt、topology,spout作为数据源接收kafka对应主题所分发的数据;bolt是数据计算组件,每一个bolt做一次计算,当然,这里我们为了简单,都在一个bolt中处理了;topology主要是storm组件的拼凑逻辑,即决定bolt的执行顺序,类似于工作流中的逻辑图一样。
3.1:spout数据源,这里数据源spout进行了抽取,我们知道spout的实现由两种方式,这里使用实现IRichSpout
KafkaSpout.java
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.storm.Config;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Fields;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
public abstract class KafkaSpout implements IRichSpout {
private static final long serialVersionUID = 3679050534053612196L;
public static final Logger logger = LoggerFactory.getLogger(KafkaSpout.class);
private final Properties props;
private SpoutOutputCollector collector;
private List<String> topicList;
private final AtomicInteger spoutPending;
// private int maxSpoutPending;
private KafkaConsumer<String, byte[]> consumer;
public KafkaSpout(Properties props, List<String> topics) {
this.props = props;
initProps();
this.topicList = topics;
this.spoutPending = new AtomicInteger();
}
private void initProps() {
this.props.put("key.deserializer", StringDeserializer.class);
this.props.put("value.deserializer", ByteArrayDeserializer.class);
}
public abstract Fields generateFields();
public abstract List<Object> generateTuple(byte[] message);
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
// this.maxSpoutPending = Integer.parseInt(conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING).toString());
}
@Override
public void close() {
}
@Override
public void activate() {
consumer = new KafkaConsumer(props);
consumer.subscribe(topicList);
final ExecutorService executor = Executors.newFixedThreadPool(topicList.size());
executor.submit(new Runnable() {
@Override
public void run() {
while (true) {
ConsumerRecords<String, byte[]> records = consumer.poll(100);
Iterator<ConsumerRecord<String, byte[]>> iterator = records.iterator();
while (iterator.hasNext()) {
if (spoutPending.get() <= 0) {
sleep(1000);
continue;
}
ConsumerRecord<String, byte[]> next = iterator.next();
byte[] message = next.value();
List<Object> tuple = null;
try {
tuple = generateTuple(message);
} catch (Exception e) {
e.printStackTrace();
}
if (tuple == null) {
continue;
}
// logger.info("kafka spout emit tuple:{}", tuple.toString());
collector.emit(tuple);
spoutPending.decrementAndGet();
}
}
}
});
}
@Override
public void deactivate() {
consumer.close();
}
@Override
public void nextTuple() {
// if (spoutPending.get() < maxSpoutPending) {
// spoutPending.incrementAndGet();
// }
spoutPending.incrementAndGet();
}
@Override
public void ack(Object msgId) {
}
@Override
public void fail(Object msgId) {
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(generateFields());
}
private void sleep(long millisecond) {
try {
Thread.sleep(millisecond);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
上边的KafkaSpout是一个抽象方法,他实现了IRichSpout接口并重写了接收和发送消息的方法,而留有两个抽象方法为实现,一个是Fields generateFields(),他需要开发人员根据业务对不同的数据进行不同的实现,主要是返回数据切分后的列字段,已被bolt处用相应字段接收;另一个是List<Object> generateTuple(byte[] message)方法,他是一个真正的消息接收发送方法,他接收到消息对消息进行切分。业务人员要实现自己spout数据源需要基础KafkaSpout, 并重写以上两个方法。
HdtasSpout.java
import com.misbio.seer.storm.KafkaSpout;
import com.misbio.seer.storm.hdtas.HdtasScheme;
import org.apache.storm.tuple.Fields;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
/**
* @author lvfang
* @create 2017-10-16 16:40
* @desc
**/
public class HdtasSpout extends KafkaSpout {
public HdtasSpout(Properties props, List<String> topics){
super(props,topics);
}
public List<Object> generateTuple(byte[] message){
System.out.println(new String(message).split(" ")[1]);
String[] messages = new String(message).split(" ");
if(messages.length!=25) return null;
List<Object> tuple = new ArrayList<Object>();
tuple.add(messages[0]);
tuple.add(messages[1]);
tuple.add(messages[2]);
tuple.add(messages[3]);
tuple.add(messages[4]);
tuple.add(messages[5]);
tuple.add(messages[6]);
tuple.add(messages[7]);
tuple.add(messages[8]);
tuple.add(messages[9]);
tuple.add(messages[10]);
tuple.add(messages[11]);
tuple.add(messages[12]);
tuple.add(messages[13]);
tuple.add(messages[14]);
tuple.add(messages[15]);
tuple.add(messages[16]);
tuple.add(messages[17]);
tuple.add(messages[18]);
tuple.add(messages[19]);
tuple.add(messages[20]);
tuple.add(messages[21]);
tuple.add(messages[22]);
tuple.add(messages[23]);
tuple.add(messages[24]);
return tuple;
}
public Fields generateFields(){
return new Fields(
HdtasScheme.PROTOCOL_TYPE,
HdtasScheme.FIELD_ID,
HdtasScheme.UWB_ID,
HdtasScheme.SIGN_ID,
HdtasScheme.YEAR,
HdtasScheme.MONTH,
HdtasScheme.DAY,
HdtasScheme.HOUR,
HdtasScheme.MINUTE,
HdtasScheme.SECOND,
HdtasScheme.MILLISECOND,
HdtasScheme.X,
HdtasScheme.Y,
HdtasScheme.Z,
HdtasScheme.A_SPEED_X,
HdtasScheme.A_SPEED_Y,
HdtasScheme.A_SPEED_Z,
HdtasScheme.GYROSCOPE_X,
HdtasScheme.GYROSCOPE_Y,
HdtasScheme.GYROSCOPE_Z,
HdtasScheme.HEART_RATE,
HdtasScheme.ELECTRIC,
HdtasScheme.CHARGING_STATUS,
HdtasScheme.SERVER_ACCEPT_TIME_S,
HdtasScheme.SERVER_ACCEPT_TIME_N
);
};
}
3.2Bolt编写,此bolt主要对spout所发送的数据进行接收计算处理,处理之后并下发给对应的主题供kafka再次消费使用,这里同样bolt的实现方式也有两种,我们这里采用继承BaseBasicBolt的实现方式
HdtasDataAllBolt.java
import com.misbio.seer.kafka.DefaultProducer;
import com.misbio.seer.storm.hdtas.HdtasScheme;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
/**
* @author lvfang
* @create 2017-10-16 17:33
* @desc
**/
public class HdtasDataAllBolt extends BaseBasicBolt {
private final static Logger LOG = LoggerFactory.getLogger(HdtasDataAllBolt.class);
private String kafkaUri;
private Producer<String, String> createProducer;
public HdtasDataAllBolt(){}
public HdtasDataAllBolt(String kafkaUri) {
this.kafkaUri = kafkaUri;
}
@Override
public void prepare(Map stormConf, TopologyContext context) {
super.prepare(stormConf, context);
this.createProducer = new DefaultProducer().createProducer(kafkaUri, null);
}
long start = System.currentTimeMillis();
int count = 0;
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
String fieldId = input.getStringByField(HdtasScheme.FIELD_ID);//获取球场id
String signId = input.getStringByField(HdtasScheme.SIGN_ID);//标签id
//加速度 X Y Z
String aSpeedX = input.getStringByField(HdtasScheme.A_SPEED_X);
String aSpeedY = input.getStringByField(HdtasScheme.A_SPEED_Y);
String aSpeedZ = input.getStringByField(HdtasScheme.A_SPEED_Z);
//定位精度 X Y Z
String x = input.getStringByField(HdtasScheme.X);
String y = input.getStringByField(HdtasScheme.Y);
String z = input.getStringByField(HdtasScheme.Z);
//陀螺仪 X Y Z
String gyroscopeX = input.getStringByField(HdtasScheme.GYROSCOPE_X);
String gyroscopeY = input.getStringByField(HdtasScheme.GYROSCOPE_Y);
String gyroscopeZ = input.getStringByField(HdtasScheme.GYROSCOPE_Z);
String heartRate = input.getStringByField(HdtasScheme.HEART_RATE);//心率
String electric = input.getStringByField(HdtasScheme.ELECTRIC);//电池电量
String charingStatus = input.getStringByField(HdtasScheme.CHARGING_STATUS);//充电状态
String timeS = input.getStringByField(HdtasScheme.SERVER_ACCEPT_TIME_S);//时间戳 秒
long time = convertMilTime(input);
if ("1523".equals(signId)){
return;
}
//x y < 0.01
double doubleX = Double.valueOf(x).doubleValue();
double doubleY = Double.valueOf(y).doubleValue();
if(doubleX<0.01 || doubleY <0.01){
return;
}
// if(x.equals("0.000000") || y.equals("0.000000")){
// return;
// }
System.out.println("X&Y : " + x + " : " + y);
//三个轴的加速度值 单位g 16384/8 = 2048 16g量程
double ax = BigDecimal.valueOf(Integer.parseInt(aSpeedX)).divide(BigDecimal.valueOf(2048), 3, BigDecimal.ROUND_HALF_DOWN).doubleValue();
double ay = BigDecimal.valueOf(Integer.parseInt(aSpeedY)).divide(BigDecimal.valueOf(2048), 3, BigDecimal.ROUND_HALF_DOWN).doubleValue();
double az = BigDecimal.valueOf(Integer.parseInt(aSpeedZ)).divide(BigDecimal.valueOf(2048), 3, BigDecimal.ROUND_HALF_DOWN).doubleValue();
//三个轴的陀螺仪值 单位为dps 度每秒 131/8 = 16.375 2000度量程
double gx = BigDecimal.valueOf(Integer.parseInt(gyroscopeX)).divide(BigDecimal.valueOf(16.375), 3, BigDecimal.ROUND_HALF_DOWN).doubleValue();
double gy = BigDecimal.valueOf(Integer.parseInt(gyroscopeY)).divide(BigDecimal.valueOf(16.375), 3, BigDecimal.ROUND_HALF_DOWN).doubleValue();
double gz = BigDecimal.valueOf(Integer.parseInt(gyroscopeZ)).divide(BigDecimal.valueOf(16.375), 3, BigDecimal.ROUND_HALF_DOWN).doubleValue();
//加速度矢量
double r = BigDecimal.valueOf(Math.sqrt(ax * ax + ay * ay + az * az)).setScale(3,BigDecimal.ROUND_HALF_DOWN).doubleValue();
if (0d == r) {
return;
}
//结果放入kafka中(hdtas_all_ori_a_speed)
final String result = signId + "," + ax + "," + ay + "," + az + "," + r + "," + time;
createProducer.send(new ProducerRecord("hdtas_all_ori_a_speed",fieldId,result));
long end = System.currentTimeMillis();
if (end - start <= 1000) count++; else { start = end; count = 0;}
//认为异常数据 不予处理
if (ax == 0 && ay == 0 && az == 0) return;
//计算步数
Integer step = computeStep(fieldId, signId, r, time);
//计算三个轴的角度变化
computeAngle(ax, ay, az);
//计算瞬时速度
computeInstantSpeed(time, ax, ay, az);
//计算位置
computePosition(signId , fieldId , x , y , z , time);
//计算速度(有问题)
computeSpeed(signId , fieldId , x , y , z , time);
//其他数据(陀螺仪,加速度,心率,电量等等)
computeOther(signId , fieldId , electric , heartRate , time , ax , ay , az , gx , gy , gz , r , step);
//计算灵敏度
computeAgile(signId , fieldId , gx , gy , gz , time);
//计算电量
computeBattery(signId , fieldId , electric , time);
//计算距离s
computeDistance(signId , fieldId , x , y , z , time);
//计算心率
computeHeartrate(signId , fieldId , heartRate , time);
}
Map<String, Double> lastRMap = new HashMap<String, Double>();//加速度矢量
Map<String, Integer> stepMap = new HashMap<String, Integer>();
// Map<String, Long> currentPeakTimeMap = new HashMap<String, Long>();
Map<String, Long> lastPeakTimeMap = new HashMap<String, Long>();
/**
* 计算步数
* @param fieldId
* @param signId
* @param r
* @param time
*/
private Integer computeStep(String fieldId, String signId, double r, long time) {
Double lastR = lastRMap.get(signId);
if (lastR == null) {
lastRMap.put(signId, r);
} else {
boolean isPeak = detectPeak(signId, lastR, r);
if (isPeak){
Long lastPeakTime = lastPeakTimeMap.get(signId);
if (lastPeakTime == null) {
lastPeakTime = 0L;
}
Double peak = lastPeakMap.get(signId);
Double valley = lastValleyMap.get(signId);
if (peak == null || valley == null){
return 0;
}
//得到阙值
double threshold = peak - valley;
//System.out.println("阈值:" + threshold);
Integer step = stepMap.get(signId);
if (step == null) {
step = 0;
stepMap.put(signId, 0);
}
Double bt = baseThresholdMap.get(signId);
if (bt == null){
baseThresholdMap.put(signId, baseThreshold);
bt = baseThreshold;
}
System.out.println("baseThreshold:" + bt);
if (time - lastPeakTime >= 250 && (threshold >= bt)){
lastPeakTimeMap.put(signId, time);
stepMap.put(signId, step + 1);
}
if (time - lastPeakTime >= 250 && (threshold >= 1.3d)){
lastPeakTimeMap.put(signId, time);
double aveThreshold = computeThreshold(signId, threshold);
System.out.println("aveThreshold:" + aveThreshold);
baseThresholdMap.put(signId, aveThreshold);
}
//结果放入kafka中
final String result = signId + "," + threshold;
final StringBuilder sb = new StringBuilder(result);
sb.append(",").append(stepMap.get(signId));
return stepMap.get(signId);
// createProducer.send(new ProducerRecord("hdtas_all_step",fieldId,sb.toString()));
}
lastRMap.put(signId, r);
}
return 0;
}
private double stepAvgNum = 4;
private double computeThreshold(String signId, double threshold) {
double temp = baseThreshold;
List<Double> thresholdList = aveThresholdMap.get(signId);
if (thresholdList == null){
thresholdList = new ArrayList<Double>();
}
//System.out.println("size:" +thresholdList.size());
if (thresholdList.size() < stepAvgNum) {
thresholdList.add(threshold);
aveThresholdMap.put(signId,thresholdList);
}else{
temp = computeAvgThreshold(thresholdList);
thresholdList.add(threshold);
thresholdList.remove(0);
aveThresholdMap.put(signId,thresholdList);
}
return temp;
}
private double computeAvgThreshold(List<Double> thresholdList) {
int size = thresholdList.size();
double all = 0;
for (double d : thresholdList){
all += d;
}
double ave = new BigDecimal(all).divide(new BigDecimal(size), 3).setScale(3, BigDecimal.ROUND_HALF_DOWN).doubleValue();
if (ave >= 8) {
ave = 4.3d;
}
else if(ave >= 7 && ave < 8){
ave = 3.3d;
}
else if(ave >= 4 && ave < 7){
ave = 2.3d;
}
else if(ave >= 3 && ave <4){
ave = 2.0d;
}
else {
ave = 1.3d;
}
return ave;
}
//记录当前上升状态
private Map<String, Boolean> upMap = new HashMap<String, Boolean>();
//记录上一次上升状态
private Map<String, Boolean> lastUpMap = new HashMap<String, Boolean>();
//记录上升趋势次数
private Map<String, Integer> upCountMap = new HashMap<String, Integer>();
//记录上一次上升趋势次数
private Map<String, Integer> lastUpCountMap = new HashMap<String, Integer>();
//记录波峰的值
private Map<String,Double> lastPeakMap = new HashMap<String, Double>();
//记录波谷的值
private Map<String,Double> lastValleyMap = new HashMap<String, Double>();
private Map<String,List<Double>> aveThresholdMap = new HashMap<String, List<Double>>();
private Map<String,Double> thresholdMap = new HashMap<String, Double>();
private double baseThreshold = 2.0d;
private Map<String,Double> baseThresholdMap = new HashMap<String, Double>();
/**
* 检测波峰
*
* @param signId
* @param lastR
* @param newR 1.下降趋势
* 2.上一个点为上升趋势
* 3.持续上升次数大于等于2
* 4.波峰值判断(>=4走路 >=6慢跑 >=8快跑 >=9跳跃)
* @return
*/
private boolean detectPeak(String signId, Double lastR, double newR) {
Boolean isUp = upMap.get(signId);
if (isUp == null) {
lastUpMap.put(signId, false);
}else{
lastUpMap.put(signId, isUp);
}
Integer count = upCountMap.get(signId);
//上升趋势
// && (newR - lastR) >= baseThreshold
if (newR >= lastR) {
upMap.put(signId, true);
upCountMap.put(signId, count == null ? 1 : count + 1);
} else {
lastUpCountMap.put(signId, count == null ? 0 : count);
upCountMap.put(signId, 0);
upMap.put(signId, false);
}
boolean lastUp = lastUpMap.get(signId);
boolean currentUp = upMap.get(signId);
Integer lastUpCount = lastUpCountMap.get(signId);
if (lastUpCount == null) {lastUpCount = 0;}
double baseR = 20 / 9.8;
if( !currentUp && lastUp && (lastUpCount >= 2 || lastR >= baseR)){
lastPeakMap.put(signId, lastR);
return true;
}
else if (currentUp && !lastUp){
lastValleyMap.put(signId, lastR);
return false;
}else {
return false;
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
/**
* 计算三个轴的角度变化
* @param ax
* @param ay
* @param az
*/
private void computeAngle(double ax, double ay, double az) {
double r = Math.sqrt(ax * ax + ay * ay + az * az);
double arx = (180 / Math.PI) * Math.acos(ax / r);
double ary = (180 / Math.PI) * Math.acos(ay / r);
double arz = (180 / Math.PI) * Math.acos(az / r);
// double myarx = -1;
// double myary = -1;
//// System.out.println("----------------转换后角度");
// if (arz > 0 && arz <= 90) {
// myarx = arx - 90;
// myary = ary - 90;
// }
// if (arz > 90) {
// if (arx >= 0 && arx <= 90) {
// myarx = -90 - arx;
// }
// if (arx > 90) {
// myarx = 180 - arx + 90;
// }
// if (ary >= 0 && ary <= 90) {
// myary = - 90 - ary;
// }
// if (ary > 90) {
//// System.out.println("Y轴:" + (180 - ary + 90));
// myary = 180 - ary + 90;
// }
// }
//
}
private long lastMillisecond = -1;
private long lastTime = 0;
private double lastAx = -1;
private double lastAy = -1;
private double lastAz = -1;
private double lastV = 0;
/**
* 计算瞬时速度
* @param time
* @param ax
* @param ay
* @param az
*/
private void computeInstantSpeed(long time, double ax, double ay, double az) {
if (lastMillisecond != -1 && lastAx != -1 && lastAy != -1 && lastAz != -1) {
double ds = (time - lastMillisecond) / 1000d;
if (ds <= 0) {
lastMillisecond = time;
return;
}
double vx = (ax - lastAx) / 2 * ds;
double vy = (ay - lastAy) / 2 * ds;
double vz = (az - lastAz) / 2 * ds;
// double vx_pow = Math.pow(vx , 2);
// double vy_pow = Math.pow(vy, 2);
// double vz_pow = Math.pow(vz, 2);
// double v = Math.sqrt(vx_pow + vy_pow + vz_pow);
double v = vx + vy + vz;
lastV = v;
// if(v > 8 ){
// if (v < 0){
// v = 0.000;
// }
v = new BigDecimal(v).setScale(3, BigDecimal.ROUND_HALF_DOWN).doubleValue();
// System.out.println("瞬时速度为:" + v);
}
lastMillisecond = time;
lastAx = ax;
lastAy = ay;
lastAz = az;
}
private long convertMilTime(Tuple input) {
String year = input.getStringByField(HdtasScheme.YEAR);
String month = input.getStringByField(HdtasScheme.MONTH);
String day = input.getStringByField(HdtasScheme.DAY);
String hour = input.getStringByField(HdtasScheme.HOUR);
String minute = input.getStringByField(HdtasScheme.MINUTE);
String second = input.getStringByField(HdtasScheme.SECOND);
String millisecond = input.getStringByField(HdtasScheme.MILLISECOND);
SimpleDateFormat sdf = new SimpleDateFormat("yy-MM-dd HH:mm:ss:SSS");
String time = year + "-" + month + "-" + day + " " + hour + ":" + minute + ":" + second + ":" + millisecond;
// System.out.println(time);
try {
Date date = sdf.parse(time);
// System.out.println(date.getTime());
//线上时间
return date.getTime();
//测试时间
// return System.currentTimeMillis();
} catch (ParseException e) {
e.printStackTrace();
LOG.error("convertMilTime date format error, error info is {}", e.getMessage());
return System.currentTimeMillis();
}
}
/**
* 计算灵敏度
* @param userId
* @param fieldId
* @param x
* @param y
* @param z
* @param time
*/
public void computeAgile(String userId , String fieldId , double x , double y , double z , Long time){
final String result = userId + "," + fieldId + "," + x + "," + y + "," + z + "," + time;
createProducer.send(new ProducerRecord("hdtas_agile_bolt",fieldId,result));
}
/**
* 计算电量
* @param userId
* @param electric
* @param time
*/
public void computeBattery(String userId , String fieldId , String electric , Long time){
final String result = userId + "," + electric + "," +time;
createProducer.send(new ProducerRecord("hdtas_battery_bolt",fieldId,result));
}
private final Map<String , String> oldXdis = new HashMap<String, String>();
private final Map<String , String> oldYdis = new HashMap<String, String>();
private final Map<String , Long> oldTimedis = new HashMap<String , Long>();
private final Map<String , Long> oldTimeDistance = new HashMap<String , Long>();
private Map<String,Position> distancePositionMap = new HashMap<String, Position>();
/**
* 计算距离
* @param userId
* @param fieldId
* @param x
* @param y
* @param z
* @param time
*/
public void computeDistance(String userId , String fieldId , String x , String y , String z , Long time){
double dx = Double.parseDouble(x);
double dy = Double.parseDouble(y);
double dz = Double.parseDouble(z);
Long oldLongTime = oldTimeDistance.get(userId);
Position p = distancePositionMap.get(userId);
if(p == null){
oldTimeDistance.put(userId, time);
distancePositionMap.put(userId,new Position(dx,dy,dz));
return;
}
if(oldLongTime == null){
oldTimeDistance.put(userId,time);
return;
}
double absX = Math.abs((dz-p.getX()));
double absY = Math.abs((dy-p.getY()));
double absZ = Math.abs((dz-p.getZ()));
double dis = new BigDecimal(Math.sqrt(absX*absX + absY*absY)).setScale(5,BigDecimal.ROUND_HALF_UP).doubleValue();
long diff = Math.abs(time - oldLongTime);
if (diff >= 1000){
double speed = new BigDecimal(dis).divide(new BigDecimal(diff).divide(new BigDecimal(1000),3,BigDecimal.ROUND_HALF_EVEN),5,BigDecimal.ROUND_HALF_EVEN).doubleValue();
//存储
Position p1 = new Position(dx,dy,dz);
distancePositionMap.put(userId, p1);
oldTimeDistance.put(userId, time);
String result = userId + "," + fieldId + "," +time + "," + diff + "," + dis + "," + speed;
createProducer.send(new ProducerRecord("hdtas_distance_bolt",fieldId,result));
}
}
/**
* 计算心率
* @param userId
* @param fieldId
* @param heartRate
* @param time
*/
public void computeHeartrate(String userId , String fieldId , String heartRate , Long time){
final String result = userId + "," + fieldId + "," +time + "," + heartRate;
createProducer.send(new ProducerRecord("hdtas_heartrate_bolt",fieldId,result));
}
/**
* 计算坐标
* @param userId
* @param fieldId
* @param x
* @param y
* @param z
* @param time
*/
public void computePosition(String userId , String fieldId , String x , String y , String z , Long time){
final String result = userId + "," + fieldId + "," +time + "," + x + "," + y;
System.out.println("位置数据:" + result);
createProducer.send(new ProducerRecord("hdtas_position_bolt",fieldId,result));
}
private final Map<String , Long> oldTimespeed = new HashMap<String , Long>();
private Map<String,Position> speedPositionMap = new HashMap<String, Position>();
/**
* 计算速度
* @param userId
* @param fieldId
* @param x
* @param y
* @param z
* @param time
*/
public void computeSpeed(String userId , String fieldId , String x , String y , String z , long time){
double sx = Double.parseDouble(x);
double sy = Double.parseDouble(y);
double sz = Double.parseDouble(z);
Long oldLongTime = oldTimespeed.get(userId);
Position p = speedPositionMap.get(userId);
if(p == null){
oldTimespeed.put(userId, time);
speedPositionMap.put(userId,new Position(sx,sy,sz));
return;
}
if (oldLongTime == null){
oldTimespeed.put(userId, time);
return;
}
double absX = Math.abs((sz-p.getX()));
double absY = Math.abs((sy-p.getY()));
double absZ = Math.abs((sz-p.getZ()));
double dis = new BigDecimal(Math.sqrt(absX*absX + absY*absY)).setScale(5,BigDecimal.ROUND_HALF_UP).doubleValue();
long diff = Math.abs(time - oldLongTime);
System.out.println("diff : ++++++++++++" + absX+","+absY);
System.out.println("dis : ++++++++++++" + dis);
// double speed = dis/(diff/1000);
if (diff >= 1000){
double speed = new BigDecimal(dis).divide(new BigDecimal(diff).divide(new BigDecimal(1000),3,BigDecimal.ROUND_HALF_EVEN),5,BigDecimal.ROUND_HALF_EVEN).doubleValue();
System.out.println("speed : ++++++++++++" + dis);
//存储
Position p1 = new Position(sx,sy,sz);
speedPositionMap.put(userId, p1);
oldTimespeed.put(userId, time);
String result = userId + "," + fieldId + "," +time + "," + diff + "," + dis + "," + speed;
System.out.println("速度数据:" + result);
createProducer.send(new ProducerRecord("hdtas_speed_bolt",fieldId,result));
}
}
/**
* 其他参数
* @param userId 用户id
* @param fieldId 球场id
* @param battery 电量
* @param heartRate 心率
* @param time 当前毫秒值
* @param ax
* @param ay
* @param az
* @param gx
* @param gy
* @param gz
* @param r
*/
public void computeOther(String userId , String fieldId , String battery , String heartRate , long time , double ax , double ay , double az , double gx , double gy , double gz , double r , Integer step){
String result = userId + "," + fieldId + "," + battery+ "," + heartRate + ","
+ ax + "," + ay + "," + az + ","
+ gx + "," + gy + "," + gz + ","
+ r + "," + step + "," + time;
System.out.println("其他数据: " + result);
createProducer.send(new ProducerRecord("hdtas_other_bolt",fieldId,result));
}
}
3.3topology编写,这里topology是一个bolt的逻辑视图,决定bolt的流向
import com.misbio.seer.storm.hdtas.bolt.HdtasDataAllBolt;
import com.misbio.seer.storm.hdtas.spout.HdtasSpout;
import com.misbio.seer.utils.ResourceUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
/**
* @author lvfang
* @create 2017-10-16 16:31
* @desc
**/
public class TopologyHdtas {
// 主题与zk端口(local)
public static final Integer ZK_PORT = Integer.parseInt(ResourceUtils.getProperty("zookeeper.port"));
public static final String ZK_HOST = ResourceUtils.getProperty("zookeeper.host");
public static final String ZKINFO = ResourceUtils.getProperty("zookeeper.connect.kafka");
public static final String KAFKA_URL = ResourceUtils.getProperty("kafka.broker");
public static final String ZKROOT = "/hdtas";
public static final String SPOUTID = "hdtas";
private static final String HDTAS_SPOUT = "hdtasSpout";
private static final String HDTAS_ALL_BOLT = "hdtas_all_bolt";
public static void main(String[] args) throws Exception{
Config cfg = new Config();
cfg.setNumWorkers(2);
cfg.setDebug(true);
Properties properties = new Properties();
//kafka 设置
properties.put("group.id", "group1");
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_URL);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
List<String> topics = new ArrayList<String>();
topics.add("htb_position_test");
//组件执行顺序逻辑
TopologyBuilder builder = new TopologyBuilder();
//先创造spout数据源PWSpout
builder.setSpout("spout", new HdtasSpout(properties,topics));
//在创建bolt组件PrintBolt,并接受分组id为“spout”的数据
builder.setBolt("print-bolt", new HdtasDataAllBolt(KAFKA_URL)).shuffleGrouping("spout");
//1 本地模式
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("top1", cfg, builder.createTopology());//提交分析
Thread.sleep(10000000);//10000毫秒后停止实时计算
cluster.killTopology("top1");
cluster.shutdown();
//2集群模式
// StormSubmitter.submitTopology("top1", cfg,builder.createTopology());
}
}
在编写完topology之后,我们就可以启动程序看看能否跑通
2017-10-19_112349.png 2017-10-19_112448.png我们可以看到,数据经过处理后已经发送到对应的主题,剩下的工作就是编写主题消费后持久化到redis
3.4KafkaCusumer主要对storm计算后的数据进行消费,storm计算后将数据根据类型分别下发到了位置主题、距离主题、速度主题、心率主题、电量主题、陀螺仪主题等等,这里我们只编写一个主题的消费,其他等同
KafkaCusumer.java
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Arrays;
import java.util.Properties;
/**
* @author lvfang
* @create 2017-10-15 11:17
* @desc
**/
public class KafkaCusumer extends Thread {
private String topic;//主题
public final String SRC = "D:/testdata/testData.txt";
public KafkaCusumer(String topic){
super();
this.topic = topic;
}
//创建消费者
private Consumer<String, String> createConsumer(){
Properties properties = new Properties();
//必须要使用别的组名称, 如果生产者和消费者都在同一组,则不能访问同一组内的topic数据
properties.put("group.id", "group1");
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.90.240:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class);
return new KafkaConsumer<String, String>(properties);
}
@Override
public void run() {
//创建消费者
Consumer consumer = createConsumer();
consumer.subscribe(Arrays.asList(topic));
while (true) {
ConsumerRecords<String, byte[]> records = consumer.poll(100);
for (ConsumerRecord<String, byte[]> record : records) {
System.out.println("消费到的数据为:"+record.value());
//System.out.printf("接收到: ", record.offset(), record.key(), record.value());
}
}
}
public static void main(String[] args) {
new KafkaCusumer("htb_position_test").start();
}
}
编写完后我们可以启动消费,看看是否可以消费到
2017-10-19_113515.png这里消费是将数据直接打印出来了,至于持久化,我们在打印之后存储即可,这里就不在写了,完毕!!!
网友评论