美文网首页
Netty集成串口RXTX编程,为什么过时了?

Netty集成串口RXTX编程,为什么过时了?

作者: 程就人生 | 来源:发表于2022-06-18 11:05 被阅读0次

    用过Netty的人明显表现对它的偏爱,有没有?!

    为什么要用netty再实现一遍?

    上一篇已经实现了串口通信。当然,简单实现还是远远不够的。即使是串口通信,也需要对收发数据进行编解码处理吧?也需要保证数据的完整性吧?也需要协议吧?也需要把业务逻辑的部分单独处理吧?

    下面上代码:

    1.编解码类;

    import java.util.List;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.ByteToMessageDecoder;
    
    /**
     * 对收到的数据进行解码
     * @author 程就人生
     * @Date
     */
    public class ByteArrayDecoder extends ByteToMessageDecoder{
    
      @Override
      protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        // 标记一下当前的readIndex的位置
        in.markReaderIndex();
        int dataLength = in.readableBytes();
        byte[] array = new byte[dataLength];
        in.readBytes(array, 0, dataLength);
        if(array.length > 0){
          out.add(array);  
        }
      }
    }
    
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.MessageToByteEncoder;
    import lombok.extern.slf4j.Slf4j;
    /**
     * 对发出的数据进行编码
     * @author 程就人生
     * @Date
     */
    @Slf4j
    public class ByteArrayEncoder extends MessageToByteEncoder<byte[]>{
      
      @Override
      protected void encode(ChannelHandlerContext ctx, byte[] msg, ByteBuf out) throws Exception {
        log.info(".....经过ByteArrayEncoder编码.....");
        //消息体,包含我们要发送的数据
        out.writeBytes(msg);
      }
    }
    

    2.数据接收处理类;

    import org.springframework.stereotype.Service;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.util.ReferenceCountUtil;
    import io.netty.channel.ChannelHandler;
    
    /**
     * 串口接收数据处理器
     * @author 程就人生
     * @Date
     */
    @Service("rxtxHandler")
    @ChannelHandler.Sharable
    public class RxtxHandler extends SimpleChannelInboundHandler<byte[]>{
    
      @Override
      protected void channelRead0(ChannelHandlerContext ctx, byte[] msg) throws Exception {
        //文本方式编解码,String
            //System.out.println("接收到:"+msg);
          
          // 十六进制发送编解码
          int dataLength = msg.length;
            ByteBuf buf = Unpooled.buffer(dataLength);
            buf.writeBytes(msg);
            System.out.println("接收到:");
            while(buf.isReadable()){
               System.out.print(" " + buf.readByte());
            }
            System.out.println("");
            // 释放资源
            ReferenceCountUtil.release(msg);
      }
    }
    

    3.串口参数配置类;

    import io.netty.channel.rxtx.RxtxChannelConfig;
    import io.netty.channel.rxtx.RxtxChannelConfig.Databits;
    import io.netty.channel.rxtx.RxtxChannelConfig.Paritybit;
    import io.netty.channel.rxtx.RxtxChannelConfig.Stopbits;
    import lombok.Data;
    
    /**
     * 串口参数配置类
     * @author 
     * @date 2022年4月26日
     *
     */
    @Data
    public final class SerialPortParam {
    
        /**
         * 串口名称,以COM开头(COM0、COM1、COM2等等)
         */
        private String serialPortName;
        /**
         * 波特率, 默认:115200
         */
        private int baudRate = 115200;
        /**
         * 数据位 默认8位
         * 可以设置的值:SerialPort.DATABITS_5、SerialPort.DATABITS_6、SerialPort.DATABITS_7、SerialPort.DATABITS_8
         */
        private Databits dataBits = RxtxChannelConfig.Databits.DATABITS_8;
        /**
         * 停止位
         * 可以设置的值:SerialPort.STOPBITS_1、SerialPort.STOPBITS_2、SerialPort.STOPBITS_1_5
         */
        private Stopbits stopBits = RxtxChannelConfig.Stopbits.STOPBITS_1;
        /**
         * 校验位
         * 可以设置的值:SerialPort.PARITY_NONE、SerialPort.PARITY_ODD、SerialPort.PARITY_EVEN、SerialPort.PARITY_MARK、SerialPort.PARITY_SPACE
         */
        private Paritybit parity = RxtxChannelConfig.Paritybit.NONE;
    }
    

    4.netty整合串口的启动类;

    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.Executors;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoop;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.oio.OioEventLoopGroup;
    import io.netty.channel.rxtx.RxtxChannel;
    import io.netty.channel.rxtx.RxtxDeviceAddress;
    import io.netty.util.concurrent.GenericFutureListener;
    import lombok.Data;
    import lombok.extern.slf4j.Slf4j;
    
    /**
     * 串口接收数据的服务端
     * @author 程就人生
     * @Date
     */
    @Slf4j
    @Data
    @Component
    public class RxtxServer {
    
      private RxtxChannel channel;
      
      private SerialPortParam serialPortParam;
      
        @Autowired
      private RxtxHandler handler;
    
        public void createRxtx() throws Exception {
            // 串口使用阻塞io
            EventLoopGroup group = new OioEventLoopGroup();
            try {
                Bootstrap bootstrap  = new Bootstrap();
                bootstrap.group(group)
                        .channelFactory(() -> {
                            RxtxChannel rxtxChannel = new RxtxChannel();
                            rxtxChannel.config()
                                    .setBaudrate(serialPortParam.getBaudRate()) // 波特率
                                    .setDatabits(serialPortParam.getDataBits()) // 数据位
                                    .setParitybit(serialPortParam.getParity())      // 校验位
                                    .setStopbits(serialPortParam.getStopBits()); // 停止位
                            return rxtxChannel ;
                        })
                        .handler(new ChannelInitializer<RxtxChannel>() {
                            @Override
                            protected void initChannel(RxtxChannel rxtxChannel) {
                                rxtxChannel.pipeline().addLast(
    //                                    new LineBasedFrameDecoder(60000),
                                      // 文本形式发送编解码
    //                                    new StringEncoder(StandardCharsets.UTF_8),
    //                                    new StringDecoder(StandardCharsets.UTF_8),
                                    // 十六进制形式发送编解码
                                    new ByteArrayDecoder(),
                                    new ByteArrayEncoder(),
                                        handler
                                );
                            }
                        });
    
                ChannelFuture f = bootstrap.connect(new RxtxDeviceAddress(serialPortParam.getSerialPortName())).sync();
                f.addListener(connectedListener);
                    
                f.channel().closeFuture().sync();
            } finally {
                group.shutdownGracefully();
            }
        }
    
    
        // 连接监听
        GenericFutureListener<ChannelFuture> connectedListener = (ChannelFuture f) -> {
          final EventLoop eventLoop = f.channel().eventLoop();
          if (!f.isSuccess()) {
            log.info("连接失败");
          }else{
            channel = (RxtxChannel) f.channel();
            log.info("连接成功");
            sendData();
          }
        };
        
        /**
         * 发送数据
         */
        public void sendData(){
          // 十六机制形式发送
          ByteBuf buf = Unpooled.buffer(2);
          buf.writeByte(3);
          buf.writeByte(2);
          channel.writeAndFlush(buf.array());
          
          // 文本形式发送
          //channel.writeAndFlush("2");
        }
        
        public void start(){
            CompletableFuture.runAsync(()->{
                try {
                  // 阻塞的函数
                    createRxtx();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }, Executors.newSingleThreadExecutor());//不传默认使用ForkJoinPool,都是守护线程
        }
    
    }
    

    串口连接成功后,发送一次数据。

    5.把串口启动类放在入口程序里,实际业务中根据实际情况调整;

    
    @SpringBootApplication
    public class SpringBootSqliteApplication {
    
      public static void main(String[] args) {
        //获取application的上下文
            ApplicationContext applicationContext = SpringApplication.run(SpringBootSqliteApplication.class, args);
        
            // 串口连接服务类
        RxtxServer rxtxServer = applicationContext.getBean(RxtxServer.class);
        SerialPortParam serialPort = new SerialPortParam();
        // 连接串口com1
        serialPort.setSerialPortName("COM1");
        rxtxServer.setSerialPortParam(serialPort);  
        rxtxServer.start();
        try {
          // 连接串口需要一点时间,这里稍微等待一下
          Thread.currentThread().sleep(5000);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
        // 发送数据
        rxtxServer.sendData();
      }
    
    }
    

    这里又发送了一次数据,所以一共发送了两次数据。

    6.运行项目;

    在运行项目前,把串口调试工具设置一下,发送数据采用Hex,接收数据也是Hex;如果是String类型,则可以去掉这项勾选;并且设置数据5s中发送一次;

    image.png

    在控制台,可以看到接收到的数据;


    image.png

    在串口调试工具这里,又接收到了两组数据。

    image.png

    最后总结

    以上便是netty关于串口采用字节数组的形式通讯的编码。整个编码流程和TCP协议通讯的流程一样。但是,Netty关于串口的编程,相关类相关方法已经过时了。为什么过时了,还有没有什么更好的架包来替代,这是在生产中需要持续关注的。参考资料:

    串口调试工具下载:

    http://www.zlmcu.com/document/com_debug_tools.html

    http://www.zlmcu.com/download.htm

    虚拟串口工具下载:

    https://www.iotplat.top/#/iot/soft/604ed56de1457928489bf826

    相关文档:

    Java串口编程RXTX,距离硬件又近了一点

    SpringBoot 2 整合 Netty 实现基于 DTU 的 TCP 服务器 之 服务端

    SpringBoot 2 整合 Netty 实现基于 DTU 的 TCP 服务器 之 客户端

    Netty中的编解码,至少知道这两种

    Netty 之 IdleStateHandler 心跳检测(部分源码分析),实现超时断开连接

    Netty 之 ByteBuf 几种分配方案 及 内存溢出相关Bug

    Netty整合HTTP协议,实现文件目录服务器

    Netty整合JBoss Marshalling编解码
    Netty整合Protobuf编解码,并解决半包问题

    Netty整合MessagePack、LengthFieldBasedFrameDecoder解决粘包/拆包问题

    Netty中粘包拆包,小试牛刀

    SpringBoot整合Netty与websocket客户端进行对话

    相关文章

      网友评论

          本文标题:Netty集成串口RXTX编程,为什么过时了?

          本文链接:https://www.haomeiwen.com/subject/iepkvrtx.html