美文网首页
Netty学习

Netty学习

作者: bboymonk | 来源:发表于2018-01-12 11:18 被阅读0次

Netty是一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。
Netty是一个NIO客户端、服务端框架。允许快速简单的开发网络应用程序。例如:服务端和客户端之间的协议。它最牛逼的地方在于简化了网络编程规范。例如:TCP和UDP的Socket服务。
“快速和容易”并不意味着结果应用程序将遇到可维护性或性能问题。 Netty已经仔细设计了从许多协议,如FTP,SMTP,HTTP和各种二进制和基于文本的遗留协议的实现获得的经验。 因此,Netty成功地找到了一种方法来实现易于开发,性能,稳定性和灵活性的应用程序。

NIO介绍:

Buffer:
Channel:
Selector:

Buffer对象使用demo:

package com.wjb.demo.nio;

import java.nio.IntBuffer;

/**
 * Created by wjb on 2018/1/12.
 * Buffer对象使用,有很多个Buffer抽象类,实际上是一个数组,对应JAVA的基本数据类型,除了boolean。
 * Buffer对象有几个属性 position,limit,capacity
 */
public class BufferDemo {
    public static void main(String[] args) {
        //分配10个大小的容量,此时buffer是 [pos=0 lim=10 cap=10]
        IntBuffer buffer = IntBuffer.allocate(10);
        buffer.put(2);
        buffer.put(3);
        buffer.put(4);
        //put了三个元素,此时buffer是 [pos=3 lim=10 cap=10]
        System.out.println(buffer);
        /**
         * get()无参数方法获取position位置元素,此时position为3,无元素,所以返回0,并且position加1。
         * 有参数就获取指定下标的元素。
         * 此时buffer是 [pos=4 lim=10 cap=10]
         */
        System.out.println(buffer.get());
        buffer.flip();
        /**
         *
         * flip()是复位的意思,此时buffer是 [pos=0 lim=4 cap=10]
         *一般需要读取buffer中的元素时,必须调用flip()方法,否则异常。
         * filp()方法后,position变成0,limit变为4(limit可以理解成buffer中实际存了多少元素的容器)
         */
        System.out.println(buffer);
        for (int i = 0;i<buffer.limit();i++){
            System.out.println(buffer.get());
        }
//        System.out.println(buffer.capacity());
//        System.out.println(buffer.limit());

        System.out.println(buffer);
        System.out.println(buffer.get());


    }

      /**
     * 其它的一些方法使用
     */
    public void wrapDemo(){
        int[] array = new int[]{1,3,5};
        //wrap()方法可以包裹一个数组成来Buffer对象
        IntBuffer wrap = IntBuffer.wrap(array);
        //只要1和3
        IntBuffer.wrap(array,0,2);
        IntBuffer buffer = IntBuffer.allocate(10);
        //复制一份buffer
        IntBuffer buffer2 = buffer.duplicate();
    }
}

模拟客户端服务器通信

Client

package com.wjb.demo.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.Channel;
import java.nio.channels.SocketChannel;

/**
 * Created by wjb on 2018/1/12.
 */
public class Client {
    public static void main(String[] args) {
        SocketChannel channel =null;
        InetSocketAddress address = new InetSocketAddress("127.0.0.1", 8888);
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        try {
            channel = SocketChannel.open();
            channel.connect(address);
            while (true){
                byte[] bytes = new byte[1024];
                System.in.read(bytes);
                buffer.put(bytes);
                buffer.flip();
                channel.write(buffer);
                buffer.clear();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            if (channel != null){
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

Server

package com.wjb.demo.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;

/**
 * Created by wjb on 2018/1/12.
 * Selector和Channel这两个概念和关系,Selector里有很多Channel,
 * Selector会根据key来轮询通道
 *
 */
public class Server implements Runnable {
    private Selector selector;
    private ByteBuffer readBuf = ByteBuffer.allocate(1024);
    private ByteBuffer writeBuf = ByteBuffer.allocate(1024);
    public Server(int port){
        try {
            //打开多路复用器
            this.selector = Selector.open();
            //打开服务器通道
            ServerSocketChannel channel = ServerSocketChannel.open();
            //设置为非阻塞模式
            channel.configureBlocking(false);
            channel.bind(new InetSocketAddress(port));
            //把通道注册到Selector上,并监听阻塞事件
            channel.register(this.selector, SelectionKey.OP_ACCEPT);
            System.out.println("server start,port is :"+port);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        while (true){
            try {
                //Selector开始监听
                this.selector.select();
                //返回Selector选择的结果集
                Iterator<SelectionKey> keys = this.selector.selectedKeys().iterator();
                while (keys.hasNext()){
                    //key其实就是通道的key,意思就是选择了一个通道
                    SelectionKey key = keys.next();
                    //把选择的这个key从所有的keys中移除,避免重复执行
                    keys.remove();
                    //如果key是有效的
                    if (key.isValid()){
                        //如果为阻塞状态
                        if (key.isAcceptable()){
                            this.accept(key);
                        }
                        //如果是可读状态
                        if(key.isReadable()){
                            this.read(key);
                        }
                        //如果是可写状态
                        if (key.isWritable()){

                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private void read(SelectionKey key){
        //清空旧数据
        this.readBuf.clear();
        //获取注册的通道channel
        SocketChannel channel = (SocketChannel) key.channel();
        try {
            int count = channel.read(this.readBuf);
            if (count == -1){
                key.channel().close();
                key.cancel();
                return;
            }
            //如果有数据就读取,别忘了复位
            this.readBuf.flip();
            //remainint()方法可获取缓冲取可读数据
            byte[] bytes = new byte[this.readBuf.remaining()];
            //接收缓冲区数据
            this.readBuf.get(bytes);

            System.out.println(new String(bytes));
        } catch (IOException e) {
            e.printStackTrace();
        }


    }

    private void accept(SelectionKey key){
        //根据key获取当前通道
        ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
        try {
            SocketChannel channel = ssc.accept();
            channel.configureBlocking(false);
            channel.register(this.selector,SelectionKey.OP_READ);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        new Thread(new Server(8888)).start();
    }

}

AIO介绍:

示例demo:

Client
package com.wjb.demo.nio.aio;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.concurrent.ExecutionException;

/**
 * Created by wjb on 2018/1/12.
 */
public class Client implements Runnable {
    private AsynchronousSocketChannel channel;

    public Client() throws IOException {
        channel = AsynchronousSocketChannel.open();
    }

    public void connect(){
        channel.connect(new InetSocketAddress("127.0.0.1",8765));
    }
    public void write(String response){
        try {
            channel.write(ByteBuffer.wrap(response.getBytes())).get();
            read();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    public void read(){
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        try {
            channel.read(buffer).get();
            buffer.flip();
            byte[] bytes = new byte[buffer.remaining()];
            buffer.get(bytes);
            System.out.println(new String(bytes,"utf-8"));
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }

    //真实开发环境一般是放在一个web容器中的,不会像这样一直死循环的。
    @Override
    public void run() {
        while (true){

        }
    }

    public static void main(String[] args)throws Exception {
        Client c1 = new Client();
        c1.connect();
        Client c2 = new Client();
        c2.connect();
        Client c3 = new Client();
        c3.connect();
        new Thread(c1).start();
        new Thread(c2).start();
        new Thread(c3).start();
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        c1.write("aaaaa");
        c2.write("bbbbb");
        c3.write("ccccc");

    }
}

Server

package com.wjb.demo.nio.aio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * Created by wjb on 2018/1/12.
 */
public class Server {
    private ExecutorService executorService;
    private AsynchronousChannelGroup group;
    public AsynchronousServerSocketChannel channel;

    public Server(int port) {
        try {
            //创建线程池
            executorService = Executors.newCachedThreadPool();
            //通道组的概念,配合线程池使用
            group = AsynchronousChannelGroup.withCachedThreadPool(executorService,1);
            //创建服务器通道
            channel = AsynchronousServerSocketChannel.open(group);
            channel.bind(new InetSocketAddress(port));
            System.out.println("server start,port is:"+port);
            channel.accept(this,new ServerCompletionHandler());
            //一直阻塞,不让服务器停止。实际不应如此,这里只是为了让线程不终止。
            Thread.sleep(Integer.MAX_VALUE);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        new Server(8765);
    }
}

ServerCompletionHandler

package com.wjb.demo.nio.aio;

import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.ExecutionException;

/**
 * Created by wjb on 2018/1/12.
 */
public class ServerCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, Server> {
    /**
     * 类似于递归,或者叫拉力,反复调用Server里的accept()方法,保证多个客户端都能连上
     * 因为AIO不像NIO是一直轮询的。
     *
     * @param channel
     * @param server
     */
    @Override
    public void completed(AsynchronousSocketChannel channel, Server server) {
        server.channel.accept(server, this);
        read(channel);
    }

    @Override
    public void failed(Throwable exc, Server server) {
        exc.printStackTrace();
    }

    public void read(final AsynchronousSocketChannel channel) {
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        channel.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
            @Override
            public void completed(Integer result, ByteBuffer attachment) {
                attachment.flip();
                String s = new String(attachment.array());
                System.out.println("server response data is :" + s);
                write(channel, s);
            }

            @Override
            public void failed(Throwable exc, ByteBuffer attachment) {
                exc.printStackTrace();
            }
        });
    }

    public void write(AsynchronousSocketChannel channel, String data) {
        try {
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            buffer.put(data.getBytes());
            buffer.flip();
            channel.write(buffer).get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}

netty使用示例:

Client:

package com.wjb.demo.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

/**
 * Created by wjb on 2018/1/12.
 */
public class Client {
    public static void main(String[] args) throws InterruptedException {
        NioEventLoopGroup group = new NioEventLoopGroup();
        Bootstrap b = new Bootstrap();
        b.group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel sc) throws Exception {
                        //具体处理交给ClientHandler
                        sc.pipeline().addLast(new ClientHandler());
                    }
                });
        ChannelFuture connect = b.connect("127.0.0.1", 8888);
        System.out.println("client connected");
        connect.channel().writeAndFlush(Unpooled.copiedBuffer("hello server 1".getBytes()));
        Thread.sleep(1000);
        connect.channel().writeAndFlush(Unpooled.copiedBuffer("hello server 2".getBytes()));
        Thread.sleep(1000);
        connect.channel().writeAndFlush(Unpooled.copiedBuffer("hello server 3".getBytes()));

        connect.channel().closeFuture().sync();
        group.shutdownGracefully();
    }

}

ClientHandler:

package com.wjb.demo.netty;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;

/**
 * Created by wjb on 2018/1/12.
 */
public class ClientHandler extends ChannelHandlerAdapter{
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //ByteBuf是个引用计数对象
        try {
            ByteBuf buf = (ByteBuf) msg;
            byte[] bytes = new byte[buf.readableBytes()];
            buf.readBytes(bytes);
            System.out.println(new String(bytes,"utf-8"));
        } finally {
            //释放缓冲,以防ByteBuf溢出
            ReferenceCountUtil.release(msg);
        }
    }
}

Server:

package com.wjb.demo.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * Created by wjb on 2018/1/12.
 */
public class Server {
    public static void main(String[] args) throws Exception{
        //处理客户端连接
        NioEventLoopGroup pGroup = new NioEventLoopGroup();
        //处理网络读写
        NioEventLoopGroup cGroup = new NioEventLoopGroup();
        //辅助工具类,用于配置
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(pGroup,cGroup)
                //指定NIO模式
                .channel(NioServerSocketChannel.class)
                //tcp缓冲区大小
                .option(ChannelOption.SO_BACKLOG,1024)
                //发送缓冲大小
                .option(ChannelOption.SO_SNDBUF,32*1024)
                //接收缓冲大小
                .option(ChannelOption.SO_RCVBUF,32*1024)
                //保持连接
                .option(ChannelOption.SO_KEEPALIVE,true)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel sc) throws Exception {
                        //具体处理交给ServerHandler
                        sc.pipeline().addLast(new ServerHandler());
                    }
                });
        ChannelFuture future = bootstrap.bind(8888).sync();
        System.out.println("server connected");
        //阻塞等待,类似Thread.sleep(Integer.MAX_VALUE),实际开发不这样
        future.channel().closeFuture().sync();
        pGroup.shutdownGracefully();
        cGroup.shutdownGracefully();

    }

}

ServerHandler:

package com.wjb.demo.netty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

/**
 * Created by wjb on 2018/1/12.
 */
public class ServerHandler extends ChannelHandlerAdapter {
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

    /**
     * 为什么客户端有 ReferenceCountUtil.release(msg)释放,而服务器没有呢。
     * 因为服务器端只要有写操作就会自动释放,如果只有读没有写的话也要手动释放。
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //ByteBuf缓冲区引用计数对象,无须像NIO一样flip,因为它内部有两个指针,分别处理读和写。
        ByteBuf buf = (ByteBuf) msg;
        byte[] bytes = new byte[buf.readableBytes()];
        //从缓冲区把数据读到新建的数组里
        buf.readBytes(bytes);
        System.out.println(new String(bytes,"utf-8"));
        String response = "hello client";
        //通过工具类把数据写到缓冲区
        ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes()));

    }
}

TCP拆包粘包:


server:

@Override
protected void initChannel(SocketChannel sc) throws Exception {
//TCP拆包粘包,可以设定特殊字符或都固定字节两种方式
ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes());
sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,buf));

//采用固定字节的方式,如何字节不够,必须用空格补全。
// sc.pipeline().addLast(new FixedLengthFrameDecoder(5));

//直接转码成符串形式,如果这里解码成字符串了,那ServerHandler里就不用强制转换ByteBuf再处理了。
//sc.pipeline().addLast(new StringDecoder());
//具体处理交给ServerHandler
sc.pipeline().addLast(new ServerHandler());
  }

client:

@Override
 protected void initChannel(SocketChannel sc) throws Exception {
 //TCP拆包粘包,可以设定特殊字符或都固定字节两种方式
ByteBuf buf = Unpooled.copiedBuffer("$_".getBytes());
 sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,buf));

 //采用固定字节的方式,如何字节不够,必须用空格补全。
 // sc.pipeline().addLast(new FixedLengthFrameDecoder(5));

 //直接转码成符串形式,如果这里解码成字符串了,那ServerHandler里就不用强制转换ByteBuf再处理了。
 //sc.pipeline().addLast(new StringDecoder());
 //具体处理交给ClientHandler
sc.pipeline().addLast(new ClientHandler());
 }

编解码:

kryo效率最快,Marshalling可以包装成JAVA对象来处理数据,并且客户端服务器端必须都是JAVA语言。其它两个可以跨语言处理数据。

相关文章

  • Netty学习之Netty介绍

    Netty学习之Netty介绍 前言 本周开始学习Netty,主要的参考资料是《Netty In Action》原...

  • java-netty

    netty常用API学习 netty简介 Netty是基于Java NIO的网络应用框架. Netty是一个NIO...

  • Netty | 第2章 Netty 简介《Netty In Ac

    前言 参考资料: 《Netty In Action》; 本系列为 Netty 学习笔记,本篇介绍总结Netty 简...

  • Netty 编码解码

    参考来源 Netty实践 Netty 4.x学习笔记 - Channel和Pipeline Netty 编码器和解...

  • netty

    最近有个项目要用到netty,对于netty进行了研究,简单的总结一下。 学习netty的意义 学习网络编程,怎样...

  • Netty学习之数据传输

    Netty学习之数据传输 前言 在前面的小节中,我们简略地学习了Netty及Netty的核心组件,在本小节中,我们...

  • NIO-01

    最近开始学习Netty,在学习Netty之前,先学习下NIO 相对于传统的Socket。Nio提供了SocketC...

  • netty性能优化

    关于netty的学习和介绍,可以去github看官方文档,这里良心推荐《netty实战》和《netty权威指南》两...

  • Netty异步回调模式-Future和Promise剖析

    学习目标 为什么了解Netty异步监听? Netty如何实现异步监听的? Future简介 我们知道Netty的I...

  • Netty学习之EventLoop&Threading

    Netty学习之EventLoop&Threading Model 前言 在前面我们学习了Netty的众多组件,如...

网友评论

      本文标题:Netty学习

      本文链接:https://www.haomeiwen.com/subject/syvxoxtx.html