目录
- 概念
- netty之UDP的相关组件
- 代码
- 服务端-广播端
- LogEvent 消处组件
- LogEventEncoder 消息封装
- LogEventBroadcaster 启动类
- 服务端-广播端
- 接收端-监控端
- ClientLogEventEncoder- 解码器
- ClientLogEventHandler- 消息处理器
- LogEventMonitor: 启动程序
概念
- 连接传输(如TCP)管理了两个端点之间的连接的建立,在连接的生命周期内的有序和可靠的消息传输及有序的终止
- UDP属于无连接协议,并无持久化连接的概念,每个消息(一个UDP数据报)都是一个单独的传输单元
- UDP也无TCP的纠错机制,每个节点都将确认他们所接收到的包,而没有被确认的包将会被发送方重新传输
- 有局限,但UDP高速于TCP;适用于那些能够处理或容忍消息丢失的应用程序(金融类的交易一定是不合适的)
- 单播:发送消息给一个由唯一地址所标识的单一的网络目的地,面向连接的协议和无连接协议都支持
- 多播: 传输到一个预定的主机组
- 广播: 传到网络(或子网)上所有的主机
- 发布与订阅:类似于syslog的应有程序将被归类为发布与订阅(一个生产者,多个接收者订阅消息)
netty中UDP广播相关接口与实现类
- interface AddressedEnvelope<M,A extends SocketAddress>: 定义一个消息,其包装了另一个消息并带有发送者和接收者地址。其中M是消息类型;A是地址类型。
- class DefaultAddressEnvelope<M,A extends SocketAddress> implements AddressedEnvelope<M,A>:提供了AddressedEnvelope默认实现
- interface DatagramChannel extends Channel: 扩展了Netty的Channel抽象以支持UDP的多播组管理
- class NioDatagramChannel extends AbstractNioMessageChannel: 定义一个能发送或接收AddressedEnvelope消息的Channel类型
- class DatagramPacket extends DefaultAddressEnvelope<ByteBuf,InetSocketAddress> implements ByteBufHolder
- 扩展了DefaultAddressEnvelope以使用ByteBuf作为消息数据容器
- DatagramPacket是一个简单的消息容器,DatagramChannel实现用它来和远程节点通信
实例
功能描述
- 广播端:读取一个文件,将文件中的每一行当成一个消息广播到指定端口(注:该程序无身份认证、验证或加密,请读者自行添加)
- 接收端:接收并处理消息
ChannelPipeline事件流
- 1、本地: ChannelPipeline处理流程
LogEvent -> LogEventEncoder -> DataGramPacket) - 2、广播多个远程节点:远程节点1,远程节点2,远程节点3..
代码
服务端
LogEvent -- 定义消息组件
public final class LogEvent {
public static final byte SEPARATOR=(byte)':';
private final InetSocketAddress source;
private final String logfile;
private final String msg;
private final long received;
public LogEvent(String logfile, String msg ){
this(null,logfile,msg,-1);
}
public LogEvent(InetSocketAddress source, String logfile, String msg, long received) {
this.source = source;
this.logfile = logfile;
this.msg = msg;
this.received = received;
}
public InetSocketAddress getSource() {
return source;
}
public String getLogfile() {
return logfile;
}
public String getMsg() {
return msg;
}
public long getReceivedTimestamp(){
return received;
}
}
LogEventEncoder - 消息封装
/**
* LogEvent的编解码器
* 在将logevent转为DataGramPackage之前必须先进行编码
*/
public class LogEventEncoder extends MessageToMessageEncoder<LogEvent> {
private final InetSocketAddress remoteAddress;
/**
* 创建即将被发送到指定的InetSocketAddress的DatagramPacket的消息
* @param remoteAddress
*/
public LogEventEncoder(InetSocketAddress remoteAddress) {
this.remoteAddress = remoteAddress;
}
@Override
protected void encode(ChannelHandlerContext ctx, LogEvent logEvent, List<Object> out) throws Exception {
byte[] file = logEvent.getLogfile().getBytes(CharsetUtil.UTF_8);
byte[] msg = logEvent.getMsg().getBytes(CharsetUtil.UTF_8);
ByteBuf buf = ctx.alloc().buffer(file.length+msg.length+1);
buf.writeBytes(file); //将文件写入到ByteBuf中
buf.writeByte(LogEvent.SEPARATOR); //添加一个SEPARATOR
buf.writeBytes(msg); //将日志消息写入到ByteBuf中
//将一个拥有数据和目的地的新DatagramPacket添加到出站消息列表中
out.add(new DatagramPacket(buf,remoteAddress));
}
}
LogEventBroadcaster -- 启动类
public class LogEventBroadcaster {
private final EventLoopGroup group;
private final Bootstrap bootstrap;
private final File file;
public LogEventBroadcaster(InetSocketAddress address, File file) {
this.group = new NioEventLoopGroup();
this.file = file;
this.bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST,true) //设置SO_BROADCAST套接字选项
.handler(new LogEventEncoder(address));
}
public void run() throws Exception{
Channel ch = bootstrap.bind(0).sync().channel();
long pointer = 0;
for(;;){
//启动主动循环
long len = file.length();
if (len<pointer){
pointer = len; //将文件指针设置到该文件的最后一个字节
}else if(len>pointer){
RandomAccessFile raf = new RandomAccessFile(file,"r");
raf.seek(pointer); //设置当前的文件指针,以确保没有任何旧日志被发送
String line;
while((line=raf.readLine())!=null){
ch.writeAndFlush(new LogEvent(null,file.getAbsolutePath(),line,-1));
}
pointer = raf.getFilePointer();
raf.close();
}
try{
Thread.sleep(1000); //1秒
}catch (Exception e){
//休眠1秒被中断,则退出循环,否则重新处理它
Thread.interrupted();
break;
}
}
}
public void stop(){
group.shutdownGracefully();
}
/**
* 第1个参数为端口
* 第2个参数文件路径
* @param args
*/
public static void main(String[] args) throws Exception {
if (args.length!=2){
throw new IllegalArgumentException("请输入2个参数");
}
LogEventBroadcaster broadcaster = new LogEventBroadcaster(
new InetSocketAddress("255.255.255.255",Integer.parseInt(args[0]))
,new File(args[1]));
try{
broadcaster.run();
}finally {
broadcaster.stop();
}
}
}
客户端-监控端
ClientLogEventEncoder - LogEvent的编解码器
/**
* LogEvent的编解码器
* 在将logevent转为DataGramPackage之前必须先进行编码
*/
public class ClientLogEventEncoder extends MessageToMessageDecoder<DatagramPacket> {
@Override
protected void decode(ChannelHandlerContext ctx, DatagramPacket packet, List<Object> out) throws Exception {
ByteBuf data = packet.content();
int idx = data.indexOf(0,data.readableBytes(),LogEvent.SEPARATOR);
String filename = data.slice(0,idx).toString(CharsetUtil.UTF_8); //提取文件名
String logMsg = data.slice(idx+1,data.readableBytes()).toString(CharsetUtil.UTF_8);
LogEvent event = new LogEvent(packet.sender()
,filename,logMsg,System.currentTimeMillis()
);
out.add(event);
}
}
ClientLogEventHandler - 消息处理
public class ClientLogEventHandler extends SimpleChannelInboundHandler<LogEvent> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, LogEvent event) throws Exception {
StringBuilder builder = new StringBuilder();
builder.append(event.getReceivedTimestamp());
builder.append("[");
builder.append(event.getSource());
builder.append("][");
builder.append(event.getLogfile());
builder.append("]");
builder.append(event.getMsg());
System.out.println(builder.toString()); //打印logEvent的数据
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
LogEventMonitor -- 启动程序
public class LogEventMonitor {
private final EventLoopGroup group;
private final Bootstrap bootstrap;
public LogEventMonitor(InetSocketAddress address){
group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST,true) //设置套接字SO_BROADCAST
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ClientLogEventEncoder());
pipeline.addLast(new ClientLogEventHandler());
}
})
.localAddress(address);
}
public Channel bind(){
//绑定channel
//DatagramChannel无连接
return bootstrap.bind().syncUninterruptibly().channel();
}
public void stop(){
group.shutdownGracefully();
}
public static void main(String[] args) throws InterruptedException {
if (args.length!=1){
throw new IllegalArgumentException("Usage: LogEventMonitor");
}
LogEventMonitor monitor = new LogEventMonitor(
new InetSocketAddress(Integer.parseInt(args[0]))
);
try{
Channel channel = monitor.bind();
System.out.println("LogEventMonitor running");
channel.closeFuture().sync(); //阻塞等待服务端监听端口关闭。
}finally {
monitor.stop();
}
}
}
网友评论