1、pom文件
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.25.Final</version>
</dependency>
2、netty服务配置
2、1 启动服务
@Component
@Slf4j
public class NettyServer {
/**
* boss 线程组用于处理链接工作
*/
private EventLoopGroup boss = new NioEventLoopGroup();
/**
* worl 线程组用户处理数据工作
*/
private EventLoopGroup work = new NioEventLoopGroup();
@Value("${netty.port}")
private Integer port;
/**
* 启动Netty Server
*
* @throws InterruptedException
*/
@PostConstruct
public void start() throws InterruptedException{
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss,work)
//指定Channel
.channel(NioServerSocketChannel.class)
//使用指定的端口号设置套接字地址
.localAddress(new InetSocketAddress(port))
//服务端可连接队列数,对应TCP/IP协议listen函数中backlog参数
.option(ChannelOption.SO_BACKLOG,1024)
//设置TCP长连接,一般如果两个小时内没有数据通信时,tcp会自动发一个活动探测数据报文
.childOption(ChannelOption.SO_KEEPALIVE,true)
//将小的数据包包装成跟更大的帧进行传送,提高网络负载
.childOption(ChannelOption.TCP_NODELAY,true)
.childHandler(new NettyServerHandlerInitializer());
ChannelFuture future = bootstrap.bind().sync();
if (future.isSuccess()){
log.info("启动 Netty Server");
}
}
@PreDestroy
public void destory() throws InterruptedException{
boss.shutdownGracefully().sync();
boss.shutdownGracefully().sync();
log.info("关闭Netty");
}
}
2.2
public class NettyServerHandlerInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline() .addLast(new NettyServerHandler());
}
}
2.3 处理类
@Slf4j
@ChannelHandler.Sharable
public class NettyServerHandler extends SimpleChannelInboundHandler{
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf in = (ByteBuf) msg;
System.out.println("server received: "+in.toString(CharsetUtil.UTF_8));
//发送命令时,接受返回值
//ChannelMap.mapData=null;
//Map<String, Object> contentMap = (Map<String, Object>) msg;
//ChannelMap.mapData=contentMap;
}
//表示连接时活动状态,客户端主动连接服务端,保存ChannelHandlerContext到缓存
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ChannelMap.ctx.put("ctx",ctx);
}
}
3、缓存处理
@Component
public class ChannelMap {
public static Map<String, ChannelHandlerContext> ctx = new HashMap<>();
//存储发送命令返回的数据
// public static Map<String,Object> mapData= Collections.synchronizedMap(new HashMap<String, Object>());
// public static String ip;
}
4、接口调用
4.1 service 使用
@Service
public class TestServiceImpl implements TestService {
@Override
public void sendMessage() {
ChannelHandlerContext ctx = ChannelMap.ctx.get("ctx");
String msg = "A message111";
byte[] bytes = msg.getBytes(CharsetUtil.UTF_8);
ByteBuf buf = Unpooled.wrappedBuffer(bytes);
ctx.write(buf);
ctx.flush();
}
}
网友评论