美文网首页
Netty读书笔记:基于Netty-UDP的消息广播与监控

Netty读书笔记:基于Netty-UDP的消息广播与监控

作者: 卡门001 | 来源:发表于2020-09-13 20:02 被阅读0次

    目录

    • 概念
    • netty之UDP的相关组件
    • 代码
      • 服务端-广播端
        • LogEvent 消处组件
        • LogEventEncoder 消息封装
        • LogEventBroadcaster 启动类
    • 接收端-监控端
      • ClientLogEventEncoder- 解码器
      • ClientLogEventHandler- 消息处理器
      • LogEventMonitor: 启动程序

    概念

    • 连接传输(如TCP)管理了两个端点之间的连接的建立,在连接的生命周期内的有序和可靠的消息传输及有序的终止
    • UDP属于无连接协议,并无持久化连接的概念,每个消息(一个UDP数据报)都是一个单独的传输单元
    • UDP也无TCP的纠错机制,每个节点都将确认他们所接收到的包,而没有被确认的包将会被发送方重新传输
    • 有局限,但UDP高速于TCP;适用于那些能够处理或容忍消息丢失的应用程序(金融类的交易一定是不合适的)
    • 单播:发送消息给一个由唯一地址所标识的单一的网络目的地,面向连接的协议和无连接协议都支持
    • 多播: 传输到一个预定的主机组
    • 广播: 传到网络(或子网)上所有的主机
    • 发布与订阅:类似于syslog的应有程序将被归类为发布与订阅(一个生产者,多个接收者订阅消息)

    netty中UDP广播相关接口与实现类

    • interface AddressedEnvelope<M,A extends SocketAddress>: 定义一个消息,其包装了另一个消息并带有发送者和接收者地址。其中M是消息类型;A是地址类型。
    • class DefaultAddressEnvelope<M,A extends SocketAddress> implements AddressedEnvelope<M,A>:提供了AddressedEnvelope默认实现
    • interface DatagramChannel extends Channel: 扩展了Netty的Channel抽象以支持UDP的多播组管理
    • class NioDatagramChannel extends AbstractNioMessageChannel: 定义一个能发送或接收AddressedEnvelope消息的Channel类型
    • class DatagramPacket extends DefaultAddressEnvelope<ByteBuf,InetSocketAddress> implements ByteBufHolder
      • 扩展了DefaultAddressEnvelope以使用ByteBuf作为消息数据容器
      • DatagramPacket是一个简单的消息容器,DatagramChannel实现用它来和远程节点通信

    实例

    功能描述

    • 广播端:读取一个文件,将文件中的每一行当成一个消息广播到指定端口(注:该程序无身份认证、验证或加密,请读者自行添加)
    • 接收端:接收并处理消息

    ChannelPipeline事件流

    • 1、本地: ChannelPipeline处理流程
      LogEvent -> LogEventEncoder -> DataGramPacket)
    • 2、广播多个远程节点:远程节点1,远程节点2,远程节点3..

    代码

    服务端

    LogEvent -- 定义消息组件
    public final class LogEvent {
    
        public static final byte SEPARATOR=(byte)':';
        private final InetSocketAddress source;
        private final String logfile;
        private final String msg;
        private final long received;
    
        public LogEvent(String logfile, String msg ){
          this(null,logfile,msg,-1);
        }
    
        public LogEvent(InetSocketAddress source, String logfile, String msg, long received) {
            this.source = source;
            this.logfile = logfile;
            this.msg = msg;
            this.received = received;
        }
    
        public InetSocketAddress getSource() {
            return source;
        }
    
        public String getLogfile() {
            return logfile;
        }
    
        public String getMsg() {
            return msg;
        }
    
          public long getReceivedTimestamp(){
                return received;
          }
    }
    
    LogEventEncoder - 消息封装
    /**
     * LogEvent的编解码器
     * 在将logevent转为DataGramPackage之前必须先进行编码
     */
    public class LogEventEncoder extends MessageToMessageEncoder<LogEvent> {
        private final InetSocketAddress remoteAddress;
    
        /**
         * 创建即将被发送到指定的InetSocketAddress的DatagramPacket的消息
         * @param remoteAddress
         */
        public LogEventEncoder(InetSocketAddress remoteAddress) {
            this.remoteAddress = remoteAddress;
        }
    
    
        @Override
        protected void encode(ChannelHandlerContext ctx, LogEvent logEvent, List<Object> out) throws Exception {
            byte[] file = logEvent.getLogfile().getBytes(CharsetUtil.UTF_8);
            byte[] msg = logEvent.getMsg().getBytes(CharsetUtil.UTF_8);
    
            ByteBuf buf = ctx.alloc().buffer(file.length+msg.length+1);
            buf.writeBytes(file); //将文件写入到ByteBuf中
            buf.writeByte(LogEvent.SEPARATOR); //添加一个SEPARATOR
            buf.writeBytes(msg); //将日志消息写入到ByteBuf中
    
            //将一个拥有数据和目的地的新DatagramPacket添加到出站消息列表中
            out.add(new DatagramPacket(buf,remoteAddress));
        }
    }
    
    LogEventBroadcaster -- 启动类
    
    public class LogEventBroadcaster {
        private final EventLoopGroup group;
        private final Bootstrap bootstrap;
        private final File file;
    
    
        public LogEventBroadcaster(InetSocketAddress address, File file) {
            this.group = new NioEventLoopGroup();
            this.file = file;
            this.bootstrap = new Bootstrap();
            bootstrap.group(group)
                .channel(NioDatagramChannel.class)
                .option(ChannelOption.SO_BROADCAST,true) //设置SO_BROADCAST套接字选项
                .handler(new LogEventEncoder(address));
        }
    
        public void run() throws Exception{
            Channel ch = bootstrap.bind(0).sync().channel();
            long pointer = 0;
            for(;;){
                //启动主动循环
                long len = file.length();
                if (len<pointer){
                    pointer = len; //将文件指针设置到该文件的最后一个字节
                }else if(len>pointer){
                    RandomAccessFile raf = new RandomAccessFile(file,"r");
                    raf.seek(pointer); //设置当前的文件指针,以确保没有任何旧日志被发送
    
                    String line;
                    while((line=raf.readLine())!=null){
                        ch.writeAndFlush(new LogEvent(null,file.getAbsolutePath(),line,-1));
                    }
                    pointer = raf.getFilePointer();
                    raf.close();
                }
                try{
                    Thread.sleep(1000); //1秒
                }catch (Exception e){
                    //休眠1秒被中断,则退出循环,否则重新处理它
                    Thread.interrupted();
                    break;
                }
            }
        }
    
    
        public void stop(){
            group.shutdownGracefully();
        }
    
        /**
         * 第1个参数为端口
         * 第2个参数文件路径
         * @param args
         */
        public static void main(String[] args) throws Exception {
            if (args.length!=2){
                throw new IllegalArgumentException("请输入2个参数");
            }
    
            LogEventBroadcaster broadcaster = new LogEventBroadcaster(
                    new InetSocketAddress("255.255.255.255",Integer.parseInt(args[0]))
                    ,new File(args[1]));
            try{
                broadcaster.run();
            }finally {
                broadcaster.stop();
            }
        }
    
    }
    
    

    客户端-监控端

    ClientLogEventEncoder - LogEvent的编解码器
    
    /**
     * LogEvent的编解码器
     * 在将logevent转为DataGramPackage之前必须先进行编码
     */
    public class ClientLogEventEncoder extends MessageToMessageDecoder<DatagramPacket> {
        @Override
        protected void decode(ChannelHandlerContext ctx, DatagramPacket packet, List<Object> out) throws Exception {
            ByteBuf data = packet.content();
            int idx = data.indexOf(0,data.readableBytes(),LogEvent.SEPARATOR);
            String filename = data.slice(0,idx).toString(CharsetUtil.UTF_8); //提取文件名
            String logMsg = data.slice(idx+1,data.readableBytes()).toString(CharsetUtil.UTF_8);
            LogEvent event = new LogEvent(packet.sender()
                ,filename,logMsg,System.currentTimeMillis()
            );
            out.add(event);
        }
    }
    
    ClientLogEventHandler - 消息处理
    
    public class ClientLogEventHandler extends SimpleChannelInboundHandler<LogEvent> {
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, LogEvent event) throws Exception {
            StringBuilder builder = new StringBuilder();
            builder.append(event.getReceivedTimestamp());
            builder.append("[");
            builder.append(event.getSource());
            builder.append("][");
            builder.append(event.getLogfile());
            builder.append("]");
            builder.append(event.getMsg());
            System.out.println(builder.toString()); //打印logEvent的数据
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    
    }
    
    LogEventMonitor -- 启动程序
    
    public class LogEventMonitor {
        private final EventLoopGroup group;
        private final Bootstrap bootstrap;
    
        public LogEventMonitor(InetSocketAddress address){
            group = new NioEventLoopGroup();
            bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioDatagramChannel.class)
                    .option(ChannelOption.SO_BROADCAST,true) //设置套接字SO_BROADCAST
                    .handler(new ChannelInitializer<Channel>() {
                        @Override
                        protected void initChannel(Channel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new ClientLogEventEncoder());
                            pipeline.addLast(new ClientLogEventHandler());
                        }
                    })
                    .localAddress(address);
        }
    
        public Channel bind(){
            //绑定channel
            //DatagramChannel无连接
            return bootstrap.bind().syncUninterruptibly().channel();
        }
    
        public void stop(){
            group.shutdownGracefully();
        }
    
        public static void main(String[] args) throws InterruptedException {
            if (args.length!=1){
                throw  new IllegalArgumentException("Usage: LogEventMonitor");
            }
    
            LogEventMonitor monitor = new LogEventMonitor(
                    new InetSocketAddress(Integer.parseInt(args[0]))
            );
            try{
                Channel channel = monitor.bind();
                System.out.println("LogEventMonitor running");
                channel.closeFuture().sync(); //阻塞等待服务端监听端口关闭。
            }finally {
                monitor.stop();
            }
        }
    }
    

    相关文章

      网友评论

          本文标题:Netty读书笔记:基于Netty-UDP的消息广播与监控

          本文链接:https://www.haomeiwen.com/subject/weloektx.html