美文网首页
Springboot+MQTT协议接收终端上报信息并写入数据库

Springboot+MQTT协议接收终端上报信息并写入数据库

作者: 安易学车 | 来源:发表于2021-12-22 21:37 被阅读0次

    1、下载apache-apollo-1.7.1并安装,下载地址如下:

    http://archive.apache.org/dist/activemq/activemq-apollo/1.7.1/

    2、windows安装

    1)、解压,进入到D:\java\apache-apollo-1.7.1\bin 目录下,执行命令

    apollo.cmd  create  mybroker

    2)、进入刚刚创建好的mybroker/bin目录,执行

    apollo-broker.cmd run

    如下界面说明安装成功:

    3)、浏览器打开地址http://127.0.0.1:61680/,默认用户名:admin,密码:password,即可登录主页面

    在configuration中的user.properties可以修改用户名和密码

    2、代码处理

    1)、引入maven依赖

    <dependency>

                <groupId>org.eclipse.paho</groupId>

                <artifactId>org.eclipse.paho.client.mqttv3</artifactId>

                <version>1.2.0</version>

            </dependency>

    2)、Mqtt

    package com.ghzn.bgl.mqtt.demo;

    import com.ghzn.bgl.dao.EquipmentMapper;

    import com.ghzn.bgl.dao.ReceivecordMapper;

    import com.ghzn.bgl.dao.SysUserMapper;

    import com.ghzn.bgl.entity.Equipment;

    import com.ghzn.bgl.service.EquipMentService;

    import org.springframework.beans.factory.annotation.Autowired;

    import org.springframework.boot.CommandLineRunner;

    import org.springframework.stereotype.Component;

    /**

    * @Author particle

    * @Description

    * @Date: Created in 16:24 2021-08-05

    * @Modified by:

    */

    @Component

    public class Mqttimplements CommandLineRunner {

    @Autowired

        private EquipmentMapper   equipmentMapper;  // 数据库CRUD接口

        @Autowired

        private SysUserMapper   sysUserMapper;  // 数据库CRUD接口

        @Autowired

        private ReceivecordMapper   receivecordMapper;  // 数据库CRUD接口

        @Override

        public void run(String... args)throws Exception {

    MqttConnectionUtils.start(equipmentMapper,sysUserMapper,receivecordMapper);

        }

    }

    2)、MqttConnectionUtils

    package com.ghzn.bgl.mqtt.demo;

    import com.ghzn.bgl.dao.EquipmentMapper;

    import com.ghzn.bgl.dao.ReceivecordMapper;

    import com.ghzn.bgl.dao.SysUserMapper;

    import com.ghzn.bgl.entity.Equipment;

    import com.ghzn.bgl.service.EquipMentService;

    import org.apache.log4j.LogManager;

    import org.apache.log4j.Logger;

    import org.eclipse.paho.client.mqttv3.*;

    import java.text.SimpleDateFormat;

    import java.util.Date;

    import java.util.UUID;

    /**

    * @Author particle

    * @Description

    * @Date: Created in 16:25 2021-08-05

    * @Modified by:

    */

    public class MqttConnectionUtils {

    private static MqttClientclient;

        private static MqttConnectOptionsconnectOptions;

        private static StringTOPIC;

        private static StringclientId;

        private static final LoggerLOG = LogManager.getLogger(MqttConnectionUtils.class);

        static {

    try {

    clientId = UUID.randomUUID().toString().trim().replaceAll("-", "");

                client =new MqttClient("tcp://ip:61613",clientId);

                connectOptions=new MqttConnectOptions();

                connectOptions.setCleanSession(false);

                connectOptions.setUserName("admin");

                connectOptions.setPassword("密码".toCharArray());//密码            connectOptions.setConnectionTimeout(10);

                client.setTimeToWait(10000);

                client.connect(connectOptions);

                TOPIC ="topic";//topic名称

            }catch (MqttException e) {

    e.printStackTrace();

            }

    }

    /**

    * 发送数据

    */

        public static void publish(String topic,String content)throws MqttException {

    MqttMessage message=new MqttMessage(content.getBytes());

            message.setQos(1);

            SimpleDateFormat df =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//设置日期格式

            LOG.info("发送时间========"+df.format(new Date()));

            LOG.info(topic+"主题发送成功,内容:"+message);

            client.publish(topic,message);

        }

    /**

    * 接收数据

    */

        public static void start(EquipmentMapper equipmentMapper, SysUserMapper sysUserMapper,ReceivecordMapper receivecordMapper)throws MqttException {

    MqttTopic topic =client.getTopic(TOPIC);

            // setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息

            connectOptions.setWill(topic, "close".getBytes(), 2, true);

            // 订阅消息

            int[] Qos = {1 };

            String[] topic1 = {TOPIC };

            client.subscribe(topic1, Qos);

            // 设置回调

            client.setCallback(new PushCallback());

            LOG.info("启动成功=================");

        }

    /**

    * mqtt重连

    */

        public static void reConnect() {

    while (true){

    try {

    if (null !=client && !(client.isConnected())) {

    Thread.sleep(1000);

                        clientId = UUID.randomUUID().toString().trim().replaceAll("-", "");

                        client.connect(connectOptions);

                        LOG.info("=======尝试重新连接==============");

    break;

                    }

    }catch (MqttException | InterruptedException e) {

    LOG.info("=======重新连接失败:{}=============="+e.toString());

    continue;

                }

    }

    }

    }

    3)、PushCallback

    package com.ghzn.bgl.mqtt.demo;

    import com.ghzn.bgl.dao.ReceivecordMapper;

    import com.ghzn.bgl.dao.SysUserMapper;

    import com.ghzn.bgl.entity.*;

    import com.ghzn.bgl.utils.FileWriteUtil;

    import com.ghzn.bgl.utils.StringUtils;

    import com.ghzn.bgl.utils.wechat.WechatUtil;

    import org.slf4j.Logger;

    import org.slf4j.LoggerFactory;

    import com.alibaba.fastjson.JSONObject;

    import com.ghzn.bgl.dao.EquipmentMapper;

    import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;

    import org.eclipse.paho.client.mqttv3.MqttCallback;

    import org.eclipse.paho.client.mqttv3.MqttMessage;

    import org.springframework.beans.factory.annotation.Autowired;

    import org.springframework.stereotype.Component;

    import javax.annotation.PostConstruct;

    import java.util.List;

    /**

    * @Author particle

    * @Description

    * @Date: Created in 16:28 2021-08-05

    * @Modified by:

    */

    @Component

    public class PushCallbackimplements MqttCallback {

    private static final Loggerlogger = LoggerFactory.getLogger(PushCallback.class);

        @Autowired

        private EquipmentMapper  equipmentMapper;  // 

        @Autowired

        private SysUserMapper  sysUserMapper; 

        @Autowired

        private ReceivecordMapper receivecordMapper; 

        public  static PushCallback  PushCallback;

        @PostConstruct //通过@PostConstruct实现初始化bean之前进行的操作

        public void init() {

    System.out.println("初始化");

            PushCallback =this;

            PushCallback.equipmentMapper =this.equipmentMapper;

            PushCallback.sysUserMapper=this.sysUserMapper;

            PushCallback.receivecordMapper=this.receivecordMapper;

        }

    /**

    * 解决数据库操作mqtt断开连接,根据设备编号修改设备信息

        * @param equipment

        * @return

        */

        public int updateByEquipId(Equipment equipment){

    return equipmentMapper.updateByEquipId(equipment);

        }

    /**

    * 根据设备编号判断是否已经有其他用户绑定该设备

        * @param equipmetId

        * @return

        */

        public int countByEquipId (String equipmetId){

    return equipmentMapper.countByEquipId(equipmetId);

        }

    /**

    * 根据终端编号获取用户信息

        * @param sysUser

        * @return

        */

        public SysUsergetByEquipId(SysUser sysUser){

    return sysUserMapper.getByEquipId(sysUser);

        }

    /**

    * 根据用户id获取所有父级相关信息

        * @param userId

        * @return

        */

        public ListgetParentByUserId(String userId){

    return sysUserMapper.getParentByUserId(userId);

        }

    /**

    * 新增上报记录

        * @param receivecord

        * @return

        */

        public int saveReceivecord(Receivecord receivecord){

    return receivecordMapper.saveReceivecord(receivecord);

        }

    @Override

        public void connectionLost(Throwable throwable) {

    // 连接丢失后,一般在这里面进行重连

            logger.info("======连接断开,可以做重连");

            MqttConnectionUtils.reConnect();

        }

    @Override

        public void messageArrived(String topic, MqttMessage message)throws Exception {

    // subscribe后得到的消息会执行到这里面

            String messages =new String(message.getPayload());

            if(!messages.equals("close")){

    logger.info("接收消息主题 : " + topic);

                logger.info("接收消息Qos : " + message.getQos());

                logger.info("接收消息内容 : " +new String(message.getPayload()));

                try {

    JSONObject jsonObject = JSONObject.parseObject(new String(message.getPayload()));

                    logger.info("json:"+jsonObject);

                    String equipmentId = jsonObject.getString("ID");

                    if (!StringUtils.isEmpty(equipmentId)) {

    //判断终端编号是否存在,并且存在切符合规则则修改

                      int k=PushCallback.equipmentMapper.countByEquipId(equipmentId);

                      if (k>0){

    Equipment equipment =new Equipment();

                          equipment.setT1(jsonObject.getIntValue("T1"));

                          equipment.setT2(jsonObject.getIntValue("T2"));

                          equipment.setT3(jsonObject.getIntValue("T3"));

                          equipment.setT4(jsonObject.getIntValue("T4"));

                          equipment.setT5(jsonObject.getIntValue("T5"));

                          equipment.setT6(jsonObject.getIntValue("T6"));

                          equipment.setState(jsonObject.getString("State"));

                          equipment.setFault(jsonObject.getString("Fault"));

                          equipment.setPower(jsonObject.getIntValue("Power"));

                          equipment.setON(jsonObject.getIntValue("ON"));

                          equipment.setWS(jsonObject.getIntValue("WS"));

                          equipment.setGas(jsonObject.getString("Gas"));

                          equipment.setTime(jsonObject.getString("Time"));

                          equipment.setEquipmentId(equipmentId);

                          //上报记录保存

                          Receivecord receivecord=new Receivecord();

                          receivecord.setT1(jsonObject.getString("T1"));

                          receivecord.setT2(jsonObject.getString("T2"));

                          receivecord.setT3(jsonObject.getString("T3"));

                          receivecord.setT4(jsonObject.getString("T4"));

                          receivecord.setT5(jsonObject.getString("T5"));

                          receivecord.setT6(jsonObject.getString("T6"));

                          receivecord.setState(jsonObject.getString("State"));

                          receivecord.setFault(jsonObject.getString("Fault"));

                          receivecord.setPower(jsonObject.getString("Power"));

                          receivecord.setStatus(jsonObject.getString("status"));

                          receivecord.setWS(jsonObject.getString("WS"));

                          receivecord.setGas(jsonObject.getString("Gas"));

                          receivecord.setTime(jsonObject.getString("Time"));

                          receivecord.setEquipmentId(equipmentId);

                          PushCallback.equipmentMapper.updateByEquipId(equipment);

                          logger.info("修改成功...");

                          //解析故障fault并判断是否含有故障1,如果含有1则解析出对应的故障并发送给用户

                          String Fault= jsonObject.getString("Fault");

                          SysUser sysUser=new SysUser();

                          sysUser.setEquipmentId(equipmentId);

                          sysUser=PushCallback.sysUserMapper.getByEquipId(sysUser);

                          //上报记录添加用户信息

                          receivecord.setAddress(sysUser.getAddress());

                          receivecord.setXm(sysUser.getXm());

                          receivecord.setPhone(sysUser.getPhonenumber());

                          receivecord.setUserid(sysUser.getUserId());

                          //有故障信息

                          if (Fault.contains("1")){

    //出现故障写日志:

                              FileWriteUtil.fileChaseFW(jsonObject.toString());

                              receivecord.setType(2);//1:正常上报;2:故障上报

    //获取父级经销商信息

                              ListsysUserList=PushCallback.sysUserMapper.getParentByUserId(sysUser.getUserId());

                              if (!StringUtils.isEmpty(sysUser.getWxOpenId())){

    String openid=sysUser.getWxOpenId();//公众号openid

                                  String appid= Constant.APPID;//公众号的appid

    //根据Fault解析判断是什么预警

                                  List result=StringUtils.getFaultList(Fault);

                                  String description="";

                                  if (result.get(0).equals("1")){

    description="漏气故障";

                                  }

    if (result.get(1).equals("1")){

    description=description+",温升故障";

                                  }

    if (result.get(2).equals("1")){

    description=description+",环境探头故障";

                                  }

    if (result.get(3).equals("1")){

    description=description+",卫浴探头故障";

                                  }

    if (result.get(4).equals("1")){

    description=description+",采暖探头故障";

                                  }

    if (result.get(5).equals("1")){

    description=description+",卫浴探头超温故障";

                                  }

    if (result.get(6).equals("1")){

    description=description+",采暖探头超温故障";

                                  }

    if (result.get(7).equals("1")){

    description=description+",水压故障";

                                  }

    if (result.get(8).equals("1")){

    description=description+",机械温控故障";

                                  }

    if (result.get(9).equals("1")){

    description=description+",风机/风压故障";

                                  }

    if (result.get(10).equals("1")){

    description=description+",火焰检测故障";

                                  }

    if (result.get(11).equals("1")){

    description=description+",点火故障";

                                  }

    if (result.get(12).equals("1")){

    description=description+",结冰故障";

                                  }

    //上报记录备注信息

                                  receivecord.setBz(description);

                                  sysUser.setDescription(description);

                                  WechatUtil.sendAlarmMessage(appid,openid,sysUser);

                                  if (sysUserList.size()>0){

    SysUser sysUser1=sysUserList.get(0);

                                      sysUser1.setPhonenumber(sysUser.getPhonenumber());//报修人的电话

                                      sysUser1.setDescription(description);

                                      sysUser1.setEquipmentId(sysUser.getEquipmentId());

                                      sysUser1.setSbxh(sysUser.getSbxh());

                                      sysUser1.setXm(sysUser.getXm());

                                      sysUser1.setAddress(sysUser.getAddress());

                                      String jxsopenId=sysUser1.getWxOpenId();//经销商的公众号openid

    //将报警信息发送给经销商

                                      WechatUtil.sendAlarmMessage(appid,jxsopenId,sysUser1);

                                  }

    }

    }

    //将上报记录写入数据库

                          PushCallback.receivecordMapper.saveReceivecord(receivecord);

                      }else {

    logger.info("设备编号不存在");

                      }

    }

    }catch (Exception e){

    e.printStackTrace();

                }

    }

    }

    @Override

        public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

    logger.info("deliveryComplete---------" +iMqttDeliveryToken.isComplete());

        }

    }

    相关文章

      网友评论

          本文标题:Springboot+MQTT协议接收终端上报信息并写入数据库

          本文链接:https://www.haomeiwen.com/subject/mircqrtx.html