美文网首页
Netty学习--使用UDP广播事件

Netty学习--使用UDP广播事件

作者: 何何与呵呵呵 | 来源:发表于2019-01-25 16:45 被阅读0次
    UDP 的基础知识
    • UDP 的基础知识
      TCP 连接就像打电话,其中一系列的有序消息将会在两个方向上流动。相反,UDP 则类似于往邮箱中投入一叠明信片。你无法知道它们将以何种顺序到达它们的目的地,或者它们是否所有的都能够到达它们的目的地。
    • UDP 广播
      UDP 提供了向多个接收者发送消息的额外传输模式:
      1.多播——传输到一个预定义的主机组;
      2.广播——传输到网络(或者子网)上的所有主机。
    • UDP 示例应用程序
      发布/订阅模式:一个生产者或者服务发布事件,而多个客户端进行订阅以接收它们。
    广播系统概览
    • 消息POJO: LogEvent
    public class LogEvent {
        public static final byte SEPARATOR = (byte) ':';
        private final InetSocketAddress source;
        private final String logfile;
        private final String msg;
        private final long received;
        public LogEvent(String logfile, String msg) { // 用于传出消息的构造函数
            this(null, -1, logfile, msg);
        }
        public LogEvent(InetSocketAddress source, long received, String logfile, String msg) { // 用于 传入消息的构造函数
            this.source = source;
            this.logfile = logfile;
            this.msg = msg;
            this.received = received;
        }
        public InetSocketAddress getSource() { // 返回发送LogEvent 的源的InetSocketAddress
            return source;
        }
        public String getLogfile() { // 返回所发送的LogEvent 的日志文件的名称
            return logfile;
        }
        public String getMsg() { // 返回消息内容
            return msg;
        }
        public long getReceivedTimestamp() { // 返回接收LogEvent的时间
            return received;
        }
    }
    
    • 编写广播者
    在广播者中使用的Netty 的UDP 相关类

    Netty 的DatagramPacket 是一个简单的消息容器,DatagramChannel 实现用它来和远程节点通信。类似于在我们先前的类比中的明信片,它包含了接收者(和可选的发送者)的地址以及消息的有效负载本身。

    通过DatagramPacket 发送的日志条目
    LogEventBroadcaster:ChannelPipeline 和LogEvent 事件流
    • 编写监视器
      (1)接收由LogEventBroadcaster 广播的UDP DatagramPacket;
      (2)将它们解码为LogEvent 消息;
      (3)将LogEvent 消息写出到System.out。
    LogEventMonitor
    public class LogEventDecoder extends MessageToMessageDecoder<DatagramPacket> {
        @Override
        protected void decode(ChannelHandlerContext ctx,
                              DatagramPacket datagramPacket, List<Object> out) throws Exception {
            ByteBuf data = datagramPacket.content(); // 获取对DatagramPacket 中的数据(ByteBuf)的引用
            int idx = data.indexOf(0, data.readableBytes(),
                    LogEvent.SEPARATOR); // 获取该SEPARATOR的索引
            String filename = data.slice(0, idx) // 提取日志消息
                    .toString(CharsetUtil.UTF_8);
            String logMsg = data.slice(idx + 1,
                    data.readableBytes()).toString(CharsetUtil.UTF_8);
            LogEvent event = new LogEvent(datagramPacket.sender(),
                    System.currentTimeMillis(), filename, logMsg);
            out.add(event);
        }
    }
    
    public class LogEventHandler extends SimpleChannelInboundHandler<LogEvent> {
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx,
                                    Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
        @Override
        public void channelRead0(ChannelHandlerContext ctx, LogEvent event) throws Exception {
            StringBuilder builder = new StringBuilder();
            builder.append(event.getReceivedTimestamp());
            builder.append(" [");
            builder.append(event.getSource().toString());
            builder.append("] [");
            builder.append(event.getLogfile());
            builder.append("] : ");
            builder.append(event.getMsg());
            System.out.println(builder.toString());
        }
    }
    
    public class LogEventMonitor {
        private final EventLoopGroup group;
        private final Bootstrap bootstrap;
        public LogEventMonitor(InetSocketAddress address) {
            group = new NioEventLoopGroup();
            bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioDatagramChannel.class)
                    .option(ChannelOption.SO_BROADCAST, true)
                    .handler( new ChannelInitializer<Channel>() {
                        @Override
                        protected void initChannel(Channel channel)
                                throws Exception {
                            ChannelPipeline pipeline = channel.pipeline();
                            pipeline.addLast(new LogEventDecoder());
                            pipeline.addLast(new LogEventHandler());
                        }
                    } )
                    .localAddress(address);
        }
        public Channel bind() {
            return bootstrap.bind().syncUninterruptibly().channel();
        }
        public void stop() {
            group.shutdownGracefully();
        }
        public static void main(String[] args) throws Exception {
    //        if (args.length != 1) {
    //            throw new IllegalArgumentException(
    //                    "Usage: LogEventMonitor <port>");
    //        }
            LogEventMonitor monitor = new LogEventMonitor(new InetSocketAddress(9999));
            try {
                Channel channel = monitor.bind();
                System.out.println("LogEventMonitor running");
                channel.closeFuture().sync();
            } finally {
                monitor.stop();
            }
        }
    }
    
    效果图

    这样就可以监听你运行程序的log信息,并将信息返回到你的客服端,是不是很nice.

    相关文章

      网友评论

          本文标题:Netty学习--使用UDP广播事件

          本文链接:https://www.haomeiwen.com/subject/txbmjqtx.html