美文网首页
Netty学习--使用UDP广播事件

Netty学习--使用UDP广播事件

作者: 何何与呵呵呵 | 来源:发表于2019-01-25 16:45 被阅读0次
UDP 的基础知识
  • UDP 的基础知识
    TCP 连接就像打电话,其中一系列的有序消息将会在两个方向上流动。相反,UDP 则类似于往邮箱中投入一叠明信片。你无法知道它们将以何种顺序到达它们的目的地,或者它们是否所有的都能够到达它们的目的地。
  • UDP 广播
    UDP 提供了向多个接收者发送消息的额外传输模式:
    1.多播——传输到一个预定义的主机组;
    2.广播——传输到网络(或者子网)上的所有主机。
  • UDP 示例应用程序
    发布/订阅模式:一个生产者或者服务发布事件,而多个客户端进行订阅以接收它们。
广播系统概览
  • 消息POJO: LogEvent
public class LogEvent {
    public static final byte SEPARATOR = (byte) ':';
    private final InetSocketAddress source;
    private final String logfile;
    private final String msg;
    private final long received;
    public LogEvent(String logfile, String msg) { // 用于传出消息的构造函数
        this(null, -1, logfile, msg);
    }
    public LogEvent(InetSocketAddress source, long received, String logfile, String msg) { // 用于 传入消息的构造函数
        this.source = source;
        this.logfile = logfile;
        this.msg = msg;
        this.received = received;
    }
    public InetSocketAddress getSource() { // 返回发送LogEvent 的源的InetSocketAddress
        return source;
    }
    public String getLogfile() { // 返回所发送的LogEvent 的日志文件的名称
        return logfile;
    }
    public String getMsg() { // 返回消息内容
        return msg;
    }
    public long getReceivedTimestamp() { // 返回接收LogEvent的时间
        return received;
    }
}
  • 编写广播者
在广播者中使用的Netty 的UDP 相关类

Netty 的DatagramPacket 是一个简单的消息容器,DatagramChannel 实现用它来和远程节点通信。类似于在我们先前的类比中的明信片,它包含了接收者(和可选的发送者)的地址以及消息的有效负载本身。

通过DatagramPacket 发送的日志条目
LogEventBroadcaster:ChannelPipeline 和LogEvent 事件流
  • 编写监视器
    (1)接收由LogEventBroadcaster 广播的UDP DatagramPacket;
    (2)将它们解码为LogEvent 消息;
    (3)将LogEvent 消息写出到System.out。
LogEventMonitor
public class LogEventDecoder extends MessageToMessageDecoder<DatagramPacket> {
    @Override
    protected void decode(ChannelHandlerContext ctx,
                          DatagramPacket datagramPacket, List<Object> out) throws Exception {
        ByteBuf data = datagramPacket.content(); // 获取对DatagramPacket 中的数据(ByteBuf)的引用
        int idx = data.indexOf(0, data.readableBytes(),
                LogEvent.SEPARATOR); // 获取该SEPARATOR的索引
        String filename = data.slice(0, idx) // 提取日志消息
                .toString(CharsetUtil.UTF_8);
        String logMsg = data.slice(idx + 1,
                data.readableBytes()).toString(CharsetUtil.UTF_8);
        LogEvent event = new LogEvent(datagramPacket.sender(),
                System.currentTimeMillis(), filename, logMsg);
        out.add(event);
    }
}
public class LogEventHandler extends SimpleChannelInboundHandler<LogEvent> {
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx,
                                Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
    @Override
    public void channelRead0(ChannelHandlerContext ctx, LogEvent event) throws Exception {
        StringBuilder builder = new StringBuilder();
        builder.append(event.getReceivedTimestamp());
        builder.append(" [");
        builder.append(event.getSource().toString());
        builder.append("] [");
        builder.append(event.getLogfile());
        builder.append("] : ");
        builder.append(event.getMsg());
        System.out.println(builder.toString());
    }
}
public class LogEventMonitor {
    private final EventLoopGroup group;
    private final Bootstrap bootstrap;
    public LogEventMonitor(InetSocketAddress address) {
        group = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioDatagramChannel.class)
                .option(ChannelOption.SO_BROADCAST, true)
                .handler( new ChannelInitializer<Channel>() {
                    @Override
                    protected void initChannel(Channel channel)
                            throws Exception {
                        ChannelPipeline pipeline = channel.pipeline();
                        pipeline.addLast(new LogEventDecoder());
                        pipeline.addLast(new LogEventHandler());
                    }
                } )
                .localAddress(address);
    }
    public Channel bind() {
        return bootstrap.bind().syncUninterruptibly().channel();
    }
    public void stop() {
        group.shutdownGracefully();
    }
    public static void main(String[] args) throws Exception {
//        if (args.length != 1) {
//            throw new IllegalArgumentException(
//                    "Usage: LogEventMonitor <port>");
//        }
        LogEventMonitor monitor = new LogEventMonitor(new InetSocketAddress(9999));
        try {
            Channel channel = monitor.bind();
            System.out.println("LogEventMonitor running");
            channel.closeFuture().sync();
        } finally {
            monitor.stop();
        }
    }
}
效果图

这样就可以监听你运行程序的log信息,并将信息返回到你的客服端,是不是很nice.

相关文章

网友评论

      本文标题:Netty学习--使用UDP广播事件

      本文链接:https://www.haomeiwen.com/subject/txbmjqtx.html