美文网首页
Mina粘包,断包问题处理(附完整实例,客户端,服务端)

Mina粘包,断包问题处理(附完整实例,客户端,服务端)

作者: 顺应自然y | 来源:发表于2018-03-28 23:25 被阅读801次

    1.什么是断包,粘包?

    在讲断包,粘包之前,先说下消息保护边界和无消息保护边界。
    1.保护消息边界,就是指传输协议把数据当作一条独立的消息在网上传输,接收端只能接收独立的消息.也就是说存在保护消息边界,接收端一次只能接收发送端发出的一个数据包.
    2.而面向流则是无消息保护边界的,如果发送端连续发送数据, 接收端有可能在一次接收动作中,会接收两个或者更多的数据包。

    而tcp是面向流的,需要在消息接收端处理消息边界问题。

    接收端在接受数据时有可能会遇到下面四种情况

    A.先接收到dataA然后接收到dataB.
    B.先接收到dataA的部分数据,然后接收到dataA余下的部分以及dataB的全部.
    C.先接收到了dataA的全部数据和dataB的部分数据,然后接收到了dataB的余下的数据.
    D.一次性接收到了dataA和dataB的全部数据.

    A为正常情况,无粘包或断包。
    B为断包+粘包。
    C为粘包+断包。
    D为粘包。

    2.如何处理Mina中遇到的粘包和断包问题

    在Mina框架中有个CumulativeProtocolDecoder 累积性的协议解码器,专门用来处理粘包和断包问题。doDecode()的返回值有重要作用。

    A.你的doDecode()方法返回true 时,CumulativeProtocolDecoder 的decode()方法会首先判断你是否在doDecode()方法中从内部的IoBuffer 缓冲区读取了数据,如果没有,则会抛出非法的状态异常,也就是你的doDecode()方法返回true 就表示你已经消费了本次数据(相当于聊天室中一个完整的消息已经读取完毕),进一步说,也就是此时你必须已经消费过内部的IoBuffer 缓冲区的数据(哪怕是消费了一个字节的数据)。如果验证过通过,那么CumulativeProtocolDecoder 会检查缓冲区内是否还有数据未读取,如果有就继续调用doDecode()方法,没有就停止对doDecode()方法的调用,直到有新的数据被缓冲。

    B. 当你的doDecode()方法返回false 时,CumulativeProtocolDecoder 会停止对doDecode()方法的调用,但此时如果本次数据还有未读取完的,就将含有剩余数据的IoBuffer 缓冲区保存到IoSession 中,以便下一次数据到来时可以从IoSession 中提取合并。如果发现本次数据全都读取完毕,则清空IoBuffer 缓冲区(让父类进行接收下一个包)。简而言之,当你认为读取到的数据已经够解码了,那么就返回true,否则就返回false。这个CumulativeProtocolDecoder 其实最重要的工作就是帮你完成了数据的累积,因为这个工作是很烦琐的。也就是说返回true,那么CumulativeProtocolDecoder会再次调用decoder,并把剩余的数据发下来;(意思就是会把剩余数据给doDecode()处理,剩余数据就是remaining()的数据),返回false就不处理剩余的,(不把剩余数据给doDecode()处理)当有新数据包来的时候就把剩余的数据和新的数据拼接在一起,然后再调用decoder。

    下面附上一个完整的实例
    1.消息的格式
    包头+消息长度(int)+消息内容(json字符串)+包尾,包头包尾是十六进制字符串00 aa bb cc,转化成字节数组0, -86, -69, -52四个字节,下面的完整实例有客户端,服务端,将会解析数据,获取其中的消息内容(Json字符串)并且打印处理,消息以字节数组的方式在服务端,客户端之间传递。

    服务端代码

    package com.my.mina;
    
    import java.net.InetSocketAddress;
    import java.nio.charset.Charset;
    import java.util.Date;
    
    import org.apache.mina.core.service.IoAcceptor;
    import org.apache.mina.core.service.IoHandlerAdapter;
    import org.apache.mina.core.session.IdleStatus;
    import org.apache.mina.core.session.IoSession;
    import org.apache.mina.filter.codec.ProtocolCodecFilter;
    import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory;
    import org.apache.mina.filter.logging.LoggingFilter;
    import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
    
    /**
     * mina的Service端
     * 
     * @author linbin
     *
     */
    public class MinaService {
    
        public static void main(String[] args) {
    
            // 创建一个非阻塞的server端的Socket
            IoAcceptor acceptor = new NioSocketAcceptor();
            // 添加日志过滤器
            acceptor.getFilterChain().addLast("logger", new LoggingFilter());
            acceptor.getFilterChain().addLast("codec",
                    new ProtocolCodecFilter(new ByteArrayCodecFactory(Charset.forName("UTF-8"))));// 自定义解编码器
    
            // 设置Handler
            acceptor.setHandler(new DemoServerHandler());
            // 设置读取数据的缓存区大小
            acceptor.getSessionConfig().setReadBufferSize(2048);
            // 读写通道10秒内无操作进入空闲状态
            acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
            try {
                // 绑定端口
                acceptor.bind(new InetSocketAddress(20000));
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("启动服务");
        }
    
        /**
         * @ClassName: DemoServerHandler
         * @Description: 负责session对象的创建和监听以及消息的创建和接收监听
         * @author chenzheng
         * @date 2016-12-9 下午3:57:11
         */
        private static class DemoServerHandler extends IoHandlerAdapter {
    
            // 服务器与客户端创建连接
            @Override
            public void sessionCreated(IoSession session) throws Exception {
                System.out.println("服务器与客户端创建连接...");
                super.sessionCreated(session);
            }
    
            @Override
            public void sessionOpened(IoSession session) throws Exception {
                System.out.println("服务器与客户端连接打开...");
                super.sessionOpened(session);
            }
    
            // 消息的接收处理
            @Override
            public void messageReceived(IoSession session, Object message) throws Exception {
                // TODO Auto-generated method stub
                super.messageReceived(session, message);// 消息的接受
    
                // 传递自定义解编码器传递数组和解析数组丢包断包的
                String a = (String) message;
                System.out.println("接收到的数据:" + a);
                session.write(a);
    
            }
    
            // 消息发送后调用
            @Override
            public void messageSent(IoSession session, Object message) throws Exception {
                // TODO Auto-generated method stub
                super.messageSent(session, message);
                System.out.println("服务器发送消息成功...");
            }
    
            // session关闭
            @Override
            public void sessionClosed(IoSession session) throws Exception {
                // TODO Auto-generated method stub
                super.sessionClosed(session);
                System.out.println("断开连接:");
            }
        }
    
    }
    

    编码器

    
    package com.my.mina;
    
    import org.apache.mina.core.buffer.IoBuffer;
    import org.apache.mina.core.session.IoSession;
    import org.apache.mina.filter.codec.ProtocolEncoderAdapter;
    import org.apache.mina.filter.codec.ProtocolEncoderOutput;
    
    import java.nio.charset.Charset;
    
    /**
     * 编码器
     * 
     */
    public class ByteArrayEncoder extends ProtocolEncoderAdapter {
    
        private final Charset charset;
    
        public ByteArrayEncoder(Charset charset) {
            this.charset = charset;
    
        }
    
        /**
         * 直接将数据发出去,数据格式,包头+消息长度(int)+消息内容(json字符串)+包尾 包头包尾是十六进制字符串00 aa bb cc,转化成字节数组0,
         * -86, -69, -52四个字节
         *
         * @param session
         * @param message
         * @param out
         * @throws Exception
         */
        @Override
        public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
            // 仿项目,解决断包,粘包问题
            String value = (message == null ? "" : message.toString());// 消息值
            byte[] content = value.getBytes(charset);// 消息内容,字节数组
            IoBuffer buf = IoBuffer.allocate(38 + content.length).setAutoExpand(true);// 缓冲区容量大小38字节加上字符长度
            buf.put(new byte[] { 0, -86, -69, -52 });// 输入包开头固定值十六进制00 aa bb cc,转化成字节数组
            buf.putUnsignedInt(content.length);// int为4字节,一个字节等于2个16进制字符,所以有八位 00 00 00 0c,内容长度。
            buf.put(content);// 消息内容
            buf.put(new byte[] { 0, -86, -69, -52 });// 包尾
            buf.flip();
            out.write(buf);// 写入
        }
    
    }
    
    

    解码器,重点,解决Mina断包,丢包问题

    package com.my.mina;
    
    import org.apache.mina.core.buffer.IoBuffer;
    import org.apache.mina.core.session.IoSession;
    import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
    import org.apache.mina.filter.codec.ProtocolDecoderOutput;
    
    import java.nio.charset.Charset;
    
    /**
     * 自定义解码器,确保能读到完整的包
     */
    public class ByteArrayDecoder extends CumulativeProtocolDecoder {
    
        private final Charset charset;
    
        public ByteArrayDecoder(Charset charset) {
            this.charset = charset;
    
        }
    
        @Override
        protected boolean doDecode(IoSession ioSession, IoBuffer ioBuffer, ProtocolDecoderOutput protocolDecoderOutput)
                throws Exception {
            // 丢包,断包处理
            if (ioBuffer.remaining() > 4)// 有包头,包头足够
            {
                ioBuffer.mark();// 标记当前position的快照标记mark,以便后继的reset操作能恢复position位置,开始是0
                byte[] l = new byte[4];
                ioBuffer.get(l);// 读取包头,占4个字节
                if (ioBuffer.remaining() < 4)// 内容长度的4个字节不够,断包
                {
                    ioBuffer.reset();
                    return false;//
                } else {// 内容长度的4个字节数组足够
                    byte[] bytesLegth = new byte[4];// 内容长度
                    ioBuffer.get(bytesLegth);// 读取内容长度,int类型,占四个字节
                    int len = MinaUtil.byteArrayToInt(bytesLegth);// 内容长度有多少
                    if (ioBuffer.remaining() < len)// 内容不够,断包
                    {
                        ioBuffer.reset();
                        return false;//
    
                    } else { // 消息内容足够
    
                        byte[] bytes = new byte[len];
                        ioBuffer.get(bytes, 0, len);
                        protocolDecoderOutput.write(new String(bytes, charset));// 读取内容,并且发送
    
                        if (ioBuffer.remaining() < 4) {// 包尾不够
                            ioBuffer.reset();
                            return false;//
    
                        } else {// 包尾足够
                            byte[] tails = new byte[4];
                            ioBuffer.get(tails);// 读取包尾
                            if (ioBuffer.remaining() > 0)// 最后如果粘了包,会再次调用doDeocde()方法,把剩余数据给doDeocde()方法处理
                            {
                                return true;
                            }
    
                        }
                    }
    
                }
    
            }
            return false;// 断包,或者执行完,
    
        }
    }
    

    解编码工厂

    package com.my.mina;
    
    import org.apache.mina.core.session.IoSession;
    import org.apache.mina.filter.codec.ProtocolCodecFactory;
    import org.apache.mina.filter.codec.ProtocolDecoder;
    import org.apache.mina.filter.codec.ProtocolEncoder;
    
    import java.nio.charset.Charset;
    
    /**
     * 自定义解编码器工厂
     *
     */
    
    public class ByteArrayCodecFactory implements ProtocolCodecFactory {
    
        private ByteArrayDecoder decoder;
        private ByteArrayEncoder encoder;
    
        public ByteArrayCodecFactory() {
            this(Charset.defaultCharset());
        }
    
        public ByteArrayCodecFactory(Charset charSet) {
            encoder = new ByteArrayEncoder(charSet);
            decoder = new ByteArrayDecoder(charSet);
        }
    
        @Override
        public ProtocolDecoder getDecoder(IoSession session) throws Exception {
            return decoder;
        }
    
        @Override
        public ProtocolEncoder getEncoder(IoSession session) throws Exception {
            return encoder;
        }
    
    }
    

    注意:客户端,服务端需要和服务端有同样的解码器,编码器,解编码工厂这三个类。

    客户端核心代码

    
    package com.example.mina.minaapplication.view;
    
    import android.app.Activity;
    import android.os.Bundle;
    import android.os.Handler;
    import android.os.Message;
    import android.util.Log;
    import android.view.View;
    import android.widget.TextView;
    import android.widget.Toast;
    
    import com.example.mina.minaapplication.R;
    import com.example.mina.minaapplication.mina.ByteArrayCodecFactory;
    
    import org.apache.mina.core.future.ConnectFuture;
    import org.apache.mina.core.service.IoHandlerAdapter;
    import org.apache.mina.core.session.IdleStatus;
    import org.apache.mina.core.session.IoSession;
    import org.apache.mina.filter.codec.ProtocolCodecFilter;
    import org.apache.mina.filter.logging.LoggingFilter;
    import org.apache.mina.transport.socket.SocketSessionConfig;
    import org.apache.mina.transport.socket.nio.NioSocketConnector;
    
    import java.net.InetSocketAddress;
    import java.nio.charset.Charset;
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.FutureTask;
    
    /**
     * Mina客户端
     */
    public class MainActivity extends Activity {
        /**
         * 线程池,避免阻塞主线程,与服务器建立连接使用,创建一个只有单线程的线程池,尽快执行线程的线程池
         */
        private static ExecutorService executorService = Executors.newSingleThreadExecutor();
    
    
        /**
         * 连接对象
         */
        private NioSocketConnector mConnection;
        /**
         * session对象
         */
        private IoSession mSession;
        /**
         * 连接服务器的地址
         */
        private InetSocketAddress mAddress;
    
        private ConnectFuture mConnectFuture;
    
    
        public static final int UPADTE_TEXT = 1;
        /**
         * 服务端返回的信息
         */
        private TextView tvShow;
    
        @Override
        protected void onCreate(Bundle savedInstanceState) {
            super.onCreate(savedInstanceState);
            setContentView(R.layout.activity_main);
            tvShow = findViewById(R.id.tv_show);
            initConfig();
            connect();
            findViewById(R.id.send).setOnClickListener(new View.OnClickListener() {//发送消息数据
    
    
                @Override
                public void onClick(View view) {
                    if (mConnectFuture != null && mConnectFuture.isConnected()) {//与服务器连接上
                        mConnectFuture.getSession().write("{\"id\":11,\"name\":\"ccc\"}");//发送json字符串
                    }
    
                }
            });
        }
    
        /**
         * 初始化Mina配置信息
         */
        private void initConfig() {
            mAddress = new InetSocketAddress("192.168.0.1", 20000);//连接地址,此数据可改成自己要连接的IP和端口号
            mConnection = new NioSocketConnector();// 创建连接
            // 设置读取数据的缓存区大小
            SocketSessionConfig socketSessionConfig = mConnection.getSessionConfig();
            socketSessionConfig.setReadBufferSize(2048);
            socketSessionConfig.setIdleTime(IdleStatus.BOTH_IDLE, 4);//设置4秒没有读写操作进入空闲状态
            mConnection.getFilterChain().addLast("logging", new LoggingFilter());//logging过滤器
            mConnection.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ByteArrayCodecFactory(Charset.forName("UTF-8"))));//自定义解编码器
            mConnection.setHandler(new DefaultHandler());//设置handler
            mConnection.setDefaultRemoteAddress(mAddress);//设置地址
    
    
        }
    
        /**
         * 创建连接
         */
    
        private void connect() {
    
            FutureTask<Void> futureTask = new FutureTask<>(new Callable<Void>() {
                @Override
                public Void call() {//
    
                    try {
                        while (true) {
                            mConnectFuture = mConnection.connect();
                            mConnectFuture.awaitUninterruptibly();//一直等到他连接为止
                            mSession = mConnectFuture.getSession();//获取session对象
                            if (mSession != null && mSession.isConnected()) {
                                Toast.makeText(MainActivity.this, "连接成功", Toast.LENGTH_SHORT).show();
                                break;
                            }
                            Thread.sleep(3000);//每隔三秒循环一次
                        }
    
                    } catch (Exception e) {//连接异常
    
    
                    }
                    return null;
                }
            });
            executorService.execute(futureTask);//执行连接线程
        }
    
    
        /**
         * Mina处理消息的handler,从服务端返回的消息一般在这里处理
         */
        private class DefaultHandler extends IoHandlerAdapter {
    
    
            @Override
            public void sessionOpened(IoSession session) throws Exception {
                super.sessionOpened(session);
    
            }
    
            /**
             * 接收到服务器端消息
             *
             * @param session
             * @param message
             * @throws Exception
             */
            @Override
            public void messageReceived(IoSession session, Object message) throws Exception {
                Log.e("tag", "接收到服务器端消息:" + message.toString());
    
                Message message1 = new Message();
                message1.what = UPADTE_TEXT;
                message1.obj = message;
                handler.sendMessage(message1);
            }
    
    
            @Override
            public void sessionIdle(IoSession session, IdleStatus status) throws Exception {//客户端进入空闲状态.
                super.sessionIdle(session, status);
    
            }
        }
    
        /**
         * 更新UI
         */
        private Handler handler = new Handler() {
            @Override
            public void handleMessage(Message msg) {
                super.handleMessage(msg);
                switch (msg.what) {
                    case UPADTE_TEXT:
                        String message = (String) msg.obj;
                        tvShow.setText(message);
                        break;
                }
            }
        };
    }
    

    客户端截图:

    客户端截图

    服务端截图:

    服务端截图

    本文完整项目代码地址(欢迎来star):
    https://github.com/lb1207087645/Android-Mina-master

    参考资源:

    浅谈TCP粘包、断包问题与解决方案

    mina自定义编解码器接收处理byte数组(同时解决数据传输中的粘包、缺包问题)

    相关文章

      网友评论

          本文标题:Mina粘包,断包问题处理(附完整实例,客户端,服务端)

          本文链接:https://www.haomeiwen.com/subject/rmkpcftx.html