1.介绍
ChannelGroup是一个线程安全的集合,它提供了打开一个Channel和不同批量的方法。可以使用ChannelGroup来讲Channel分类到一个有特别意义的组中。当组中的channel关闭时会自动从组中移除,因此我们不需要担心添加进去的channel的生命周期。一个channel可以属于多个ChannelGroup。
当一个ChannelGroup中即存在ServerChanel又存在non-ServerChannel时,执行IO操作时会优先执行ServerChannel中的方法。
2.方法说明
//返回ChannelGroup的名称
String name();
//通过channelId查找相应的Channel
Channel find(ChannelId id);
//异步地将消息写到所有的Channel中
ChannelGroupFuture write(Object message);
ChannelGroupFuture write(Object message, ChannelMatcher matcher);
ChannelGroupFuture write(Object message, ChannelMatcher matcher, boolean voidPromise);
ChannelGroup flush();
ChannelGroup flush(ChannelMatcher matcher);
ChannelGroupFuture writeAndFlush(Object message);
@Deprecated
ChannelGroupFuture flushAndWrite(Object message);
ChannelGroupFuture writeAndFlush(Object message, ChannelMatcher matcher);
ChannelGroupFuture writeAndFlush(Object message, ChannelMatcher matcher, boolean voidPromise);
@Deprecated
ChannelGroupFuture flushAndWrite(Object message, ChannelMatcher matcher);
//将所有通道与远程端断开连接
ChannelGroupFuture disconnect();
ChannelGroupFuture disconnect(ChannelMatcher matcher);
//关闭所有的通道
ChannelGroupFuture close();
ChannelGroupFuture close(ChannelMatcher matcher);
@Deprecated
ChannelGroupFuture deregister();
@Deprecated
ChannelGroupFuture deregister(ChannelMatcher matcher);
//返回一个ChannelGroupFuture,当所有的channel关闭时,会得到相应的通知
ChannelGroupFuture newCloseFuture();
ChannelGroupFuture newCloseFuture(ChannelMatcher matcher);
3.示例
public class MyHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
//channel对象数组
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
System.out.println("channelRead0执行了");
Channel channel = ctx.channel();
channelGroup.forEach(ch -> {
if(channel != ch){
ch.writeAndFlush(new TextWebSocketFrame(channel.remoteAddress() + " 发送的消息 :"+msg.text()));
}else{
ch.writeAndFlush(new TextWebSocketFrame("[自己] "+msg.text()));
}
});
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("handlerAdded执行了");
Channel channel = ctx.channel();
//广播消息
channelGroup.writeAndFlush(new TextWebSocketFrame("[服务器] - "+channel.remoteAddress()+"加入"));
channelGroup.add(channel);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channelGroup.writeAndFlush(new TextWebSocketFrame("[服务器] -"+ channel.remoteAddress() +"离开"));
}
//该方法发出的消息,自身channel还未形成所以不能收到消息
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channelGroup.writeAndFlush(new TextWebSocketFrame(channel.remoteAddress() + "上线了"));
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channelGroup.writeAndFlush(new TextWebSocketFrame(channel.remoteAddress() + "下线了"));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("异常发生");
ctx.close();
}
}
网友评论