美文网首页
一个基于传统IO的轻量级的通讯框架

一个基于传统IO的轻量级的通讯框架

作者: songsforjane | 来源:发表于2018-08-21 23:10 被阅读0次

    平时工作中难免会需要编写一些涉及到通讯方面的代码。在通讯方面也有许多优秀的开源项目,比如大名鼎鼎的Netty,Mina等。但是有一种情况是,如果,自己的使用场景比较小,这时候使用这些大型的开源项目,会显得大材小用,造成性能浪费不说,学习成本也是一件需要考虑的事。这时候如果有一个轻量级的,健壮的通讯框架,就显得特别好用。笔者工作这一年多,基本都是在与一些简单自定义的通讯框架打交道。恰巧,最近有时间,总结下,学习的心得。

    在开始本文前,先来看看几个问题。

    1.一个完整的通讯框架应该有哪几部分组成?
    2.怎么样保证数据的完整性(处理分包,拼包问题)?
    3.怎么样自定义协议,自定义应该要遵循那些规则?

    别急, 我们一个一个的来看。

    对于问题1

    笔者的看法是,一个完整的通信协议至少应该包含两个部分,1>数据模型部分,主要处理数据的序列化与反序列化问题。2>通信部分,主要用于发送与接受序列化过的数据。

    对于问题2

    在通信模块接收到序列化过的数据后,需要讲这些二进制的数据进行反序列化。难点就是这个将数据反序列化的过程。面对一大堆接受到的二进制的数据,你根本就不知道,一个完整的数据包是从那里开始,从哪里结束。要解决这个问题,至少得包含两个方面的信息,1)一个唯一标识的包头,即包开始的地方。2)整包数据的长度,即包结束的地方。这里要提醒的是,如果别人破解了你的协议,要主意防范Socket攻击。即在在一个合法的包头后面,放置一个足够大的包的长度,造成程序内存溢出,存而使程序崩溃。

    一个简单的通信框架模型
    对于问题3

    笔者认为,一个设计良好的自定义协议至少应该包含两个接口,一个进行包数据的序列化,一个进行包数据的反序列化,即readFromBytes 与 writeToBytes,即数据对象与java对象之间的转化。

    下面先看看整个工程的目录结构:

    目录结构

    这些基本问题解决了,写起来就会轻松很多。废话不多说,上代码。上面提到,一个轻量级的通信框架至少应该包含两个部分,数据模型部分与通信部分。本文先讲简单的,从数据模型部分开始。下面是数据模块的基类Packet。

    
    /**
    *   +--------------------------------------------------------------------------------------------+
    *    | 包头(2字节)| 时间戳(8字节)| 有效数据长度(4字节)| 有效数据 |
    *   +--------------------------------------------------------------------------------------------+
    */
    
    public class Packet {
        
    //包头,标识一个数据包的开始
        private short HEAD = -256;
    //时间戳,标注数据包生成时的时间
        private long timeStamp = System.currentTimeMillis();
    //包中有效数据的长度,因此,一个完整包的长度 = 2 + 8 + 4 + length;
        private int length;
    //包中Id,用于区别Packet的类型
        private long id;
        
        public Packet(long id) {
            this.id = id;
        }
        
     // 序列化方法,将一个Java对象转换为数据对象
        public byte[] getBytes() {
            
            JsonObject jo = writeToJson();
            byte[] content = jo.toString().getBytes();
            
            length = content.length;
            
            byte []bytes = new byte[2+8+4+length];
            
            ByteBuffer buffer = ByteBuffer.wrap(bytes);
            
            buffer.putShort(HEAD);
            buffer.putLong(timeStamp);
            buffer.putInt(length);
            buffer.put(content);
            
            return buffer.array();
        }
        
        public void readFromBytes(byte[] bytes) {
            readFromBytes(bytes, 0, bytes.length);
        }
        
     // 反序列化方法,将一个数据对象转换为Java对象
        public void readFromBytes(byte[] bytes, int offset, int length) {
            String msg = new String(bytes, offset, length);
            JsonObject jo = (new JsonParser()).parse(msg).getAsJsonObject();
            readFromJson(jo);
        }
        
        public JsonObject writeToJson() {
            JsonObject jo = new JsonObject();
            jo.addProperty("id", id);
            return jo;
        }
        
        public void readFromJson(JsonObject jo) {
            this.id = jo.get("id").getAsLong();
        }
        
        public Long getId() {
            return id;
        }
        
    }
    
    

    Packet 的有效数据部分是一个Json字符串,这里引用了Google的Gson。为啥数据的有效部分要用Json字符串呢?这里是为了可读性。子类只需要复写writeToJson,如下:

    @Override
        public JsonObject writeToJson() {
            JsonObject jo = super.writeToJson();
            jo.addProperty("heart_beat", "yes");
            return jo;
        }
    
    

    就能将要传输的任何数据放进包中。

    与此同时,还需要改写readFromJson方法,获取Json中传输的数据,组装一个Java对象,如下:

    @Override
        public void readFromJson(JsonObject jo) {
            super.readFromJson(jo);
            heartbeat = jo.get("heart_beat").getAsString();
        }
    

    数据模型部分就说到这里,理解了原理,其实还是非常简单的。

    下面讲述本篇博文的重中之重,数据通讯的部分。先别急看代码,画个图简单的说明下设计思想:

    通信模块

    Session:主要面向业务,比如发送数据包,接收数据包,分发数据包

    Connection:主要面向连接,是发送数据,接受数据的具体实现类

    这样设计的好处是,将业务与底层连接分离开。Session 只需要处理各种业务,比如登陆,注册,维护心跳等。Session并不需要关心Connection是否损坏,也不需要自己维护Connection。Connection就专注于发送数据,至于发送的是啥,并不需要关心。

    下面,看Connection的实现:

    
    public class Connection {
        
        private Socket mSocket;
        private SocketAddress mAddress;
        private STATUS mStatus;
        
    //接受数据的缓存队列
        private BlockingQueue<Packet> mReceiverQueue = new LinkedBlockingQueue<>();
    //发送数据的缓存队列
        private BlockingQueue<Packet> mSenderQueue = new LinkedBlockingQueue<>();
    //Socket 读数据流
        private InputStream inputStream;
    //Socket 写数据流
        private OutputStream outputStream;
    //分发接收到的数据包的监听器
        private OnPacketReceivedListener mListener;
        
        private boolean mStop;
        
    //
        public enum STATUS{
            CONNECTED,
            DISCONNECTED,
            CONNECTING;
        }
        
        public boolean connect(SocketAddress address, int timeout) {
            
            if(address == null) {
                return false;
            }
            
            if(mSocket != null) {
                close();
                setStatus(STATUS.DISCONNECTED);
            }
            
            Socket socket = new Socket();
            boolean success = true;
            try {
                socket.setSoTimeout(10000);
                setStatus(STATUS.CONNECTING);
                socket.setReuseAddress(true);
                socket.connect(address, timeout);
                setStatus(STATUS.CONNECTED);
                setSocket(socket);
            } catch (IOException e) {
                System.out.println("connect failed, message: "+e.getMessage());
                success = false;
            }finally {
                try {
                    if(socket!=null) {
                        socket.close();
                    }
                } catch (IOException e) {
                }
            }
            return success;
            
        }
        
        public boolean connect(SocketAddress address) {
            return connect(address, Integer.MAX_VALUE);
        }
        
        public boolean setSocket(Socket socket) {
            
            if(mSocket!=null) {
                close();
                setStatus(STATUS.DISCONNECTED);
            }
            
            if(!socket.isConnected()) {
                return false;
            }
            
            try {
                mSocket = socket;
                socket.setSoTimeout(30000);
                inputStream = mSocket.getInputStream();
                outputStream = mSocket.getOutputStream();
                
                mStop = false;
                
                setStatus(STATUS.CONNECTED);
                startSender();
                startReader();
            } catch (IOException e) {
                setStatus(STATUS.DISCONNECTED);
                return false;
            }
            
            return true;
        }
        
        public void setOnPacketListener(OnPacketReceivedListener l) {
            this.mListener = l;
        }
        
        private void startReader() {
            
            new Thread(new Runnable() {
                
                @Override
                public void run() {
                    
                    byte buffer[] = new byte[512*1024];
                    int dataLen = 0;
                    
                    while(!mStop) {
                        
                        try {
                            if(inputStream.available() <= 0) {
                                sleep(100);
                                continue;
                            }
                            
                            int read = inputStream.read(buffer, dataLen, buffer.length-dataLen);
                            
                            dataLen += read;
                            
                            int handled = parsePackets(buffer, dataLen);
                            
                            dataLen = dataLen - handled;
                            System.arraycopy(buffer, handled, buffer, 0, dataLen);
                            
                            dispatchPackets();
                            
                            if(dataLen == buffer.length) {
                                dataLen = 0;
                            }
                            
                            
                        } catch (IOException e) {
                        } catch (InterruptedException e) {
                        }
                        
                    }
                    
                }
    
            }).start();
            
        }
        
        protected void dispatchPackets() throws InterruptedException {
            
            if(mSenderQueue.isEmpty()) {
                return;
            }
            //dispatch message
            Packet p = null;
            try {
                p = mReceiverQueue.poll(100, TimeUnit.MILLISECONDS);
            } catch (Exception e) {
            }
            
            if(mListener != null 
                    && p != null) {
                mListener.onPacketReceived(p);
            }
            
        }
    //这个方法是分包的重点
        private int parsePackets(byte[] buffer, int length) {
            int handled = 0;
            int i = 0;
            
            ByteBuffer buf = ByteBuffer.wrap(buffer, 0, length);
            while( i < length) {
    //总长度小于14,即有效数据前面部分,直接pass掉                  
                if(i + 14 < length) {
                    return handled;
                }
    //不是以包头标识符开始的,移动指针到下一位
                if(buf.getShort(i) != -256) {
                    i++;
                    continue;
                }
    //获取有效数据部分的长度
                int len = buf.getInt(i+10);
    //2+8+4+length > 剩余字节,则说明,这一包数据不完整,直接pass
                if(i + 14 + len < length) {
                    return handled;
                }
    //通过包id,可以知道包的类型,进行包的反序列化,并且扔进接收的缓存队列
                byte[] packet = new byte[14+len];
                System.arraycopy(buffer, i, packet, 0, packet.length);
                String joStr = new String(packet, 14, packet.length-14);
                JsonObject jo = (new JsonParser()).parse(joStr).getAsJsonObject();
                long packetId = jo.get("id").getAsLong();
                if(packetId > 0) {
                    Packet p = PacketFactory.createPacket(packetId);
                    if(p != null) {
                        p.readFromJson(jo);
                        mReceiverQueue.offer(p);
                    }
                }
                        handled = (i+14+len);
                i = handled;
            }
            return handled;
        }
    
        private void startSender() {
            
            new Thread(new Runnable() {
                
                @Override
                public void run() {
                    
                    while(!mStop) {
                        
                        if(mSenderQueue.isEmpty()) {
                            sleep(100);
                            continue;
                        }
                        
                        try {
                            Packet packet = mSenderQueue.take();
                            byte[] msg = packet.getBytes();
                            outputStream.write(msg);
                        } catch (InterruptedException e) {
                        } catch (IOException e) {
                        }
                        
                    }
                    
                }
            }).start();
            
        }
        
        public boolean delivery(Packet packet) {
            
            return mSenderQueue.offer(packet);
        }
    
        public boolean isConnected() {
            return getStatus() == STATUS.CONNECTED
                      && mSocket != null
                        && mSocket.isConnected();
        }
        
        public static void closeQuietly(Socket socket) {
            if(socket!=null) {
                try {
                    socket.close();
                } catch (IOException e) {
                    System.out.println("close socket failed, message: "+e.getMessage());
                }
            }
        }
        
        public void close() {
            closeQuietly(mSocket);
            mSocket = null;
            
            mReceiverQueue.clear();
            mSenderQueue.clear();
            mStop = true;
        }
        
        public SocketAddress getAddress() {
            return mAddress;
        }
        
        public void setStatus(STATUS status) {
            this.mStatus = status;
        }
        
        public STATUS getStatus() {
            return mStatus;
        }
        
        public static void sleep(long timeInMills) {
            try {
                Thread.sleep(timeInMills);
            } catch (InterruptedException e) {
            }
        }
        
        public interface OnPacketReceivedListener{
            void onPacketReceived(Packet packet);
        }
    
    }
    
    

    Connection中开启了两个线程进行数据的读/写。最复杂的部分就是分包部分。Read the fucking code!这里我也不多讲了。没什么技巧,纯看编程的基本功!

    Session的实现:

    public class Session {
        
        private long clientPacketCount = 0;
        private long serverPacketCount = 0;
        private long startDate = System.currentTimeMillis();
        private long lastActiveDate;
        
        private String mSessionId;
        
        private Connection mConnection;
        
        private PacketDispatcher mDispatcher = new PacketDispatcher();
        
        private OnPacketReceivedListener mPacketListenner = new OnPacketReceivedListener() {
            
            @Override
            public void onPacketReceived(Packet packet) {
                mDispatcher.notifyPacketReceived(packet);
                incrementServerPacketCount();
            }
        };
        
        public Session(String sessionId, Connection conn) {
            this.mSessionId = sessionId;
            this.mConnection = conn;
            this.mConnection.setOnPacketListener(mPacketListenner);
        }
        
        public void updateConnection(Connection conn) {
            
            if(mConnection != null) {
                mConnection.close();
            }
            
            mConnection = conn;
            mConnection.setOnPacketListener(mPacketListenner);
            
        }
        
        public String getSessionId() {
            return mSessionId;
        }
        
        public boolean isConnected() {
            return mConnection!=null && mConnection.isConnected();
        }
        
        public STATUS getStatus() {
            return mConnection == null 
                    ? STATUS.DISCONNECTED : mConnection.getStatus();
        }
        
        public Connection getConnection() {
            return mConnection;
        }
        
        public long getNumClientPackets() {
            return clientPacketCount;
        }
    
        public long getNumServerPackets() {
            return serverPacketCount;
        }
        
        public void deliver(Packet packet) {
            if (mConnection != null) {
                mConnection.delivery(packet);
                incrementClientPacketCount();
            }
        }
        
        public void close() {
            if (mConnection != null) {
                mConnection.close();
            }
        }
        
        public boolean isClosed() {
            return mConnection.isConnected();
        }
        
        public Date getCreationDate() {
            return new Date(startDate);
        }
        
        public Date getLastActiveDate() {
            return new Date(lastActiveDate);
        }
        
        public void incrementClientPacketCount() {
            clientPacketCount++;
            lastActiveDate = System.currentTimeMillis();
        }
    
        public void incrementServerPacketCount() {
            serverPacketCount++;
            lastActiveDate = System.currentTimeMillis();
        }
        
        public void registerObserver(long packetId, Observer ob) {
            mDispatcher.registerObserver(packetId, ob);
        }
        
        public void unregisterObserver(long packetId) {
            mDispatcher.unregisterObserver(packetId);
        }
        
        public void registerAll() {
            mDispatcher.unregisterAll();
        }
        
        private class PacketDispatcher{
            
            private HashMap<Long, Observable> mObservables = new HashMap<>();
            
            public void notifyPacketReceived(Packet packet) {
                
                if(packet == null) {
                    return;
                }
                
                Observable observable = mObservables.get(packet.getId());
                
                if(observable==null) {
                    System.out.println("Havn't registered this type observable.");
                }
                
                if(observable != null) {
                    
                    try {
                        Field changed = observable.getClass().getDeclaredField("changed");
                        if(!changed.isAccessible()) {
                            changed.setAccessible(true);
                        }
                        changed.set(observable, true);
                    } catch (NoSuchFieldException e) {
                    } catch (SecurityException e) {
                    } catch (IllegalArgumentException e) {
                    } catch (IllegalAccessException e) {
                    }
                    
                    observable.notifyObservers(packet);
                }
                
            }
            
            public void registerObserver(Long packetId, Observer ob) {
                
                Observable observable = mObservables.get(packetId);
                
                if(observable==null) {
                    observable = new Observable();
                    mObservables.put(packetId, observable);
                }
                
                observable.addObserver(ob);
                
            }
            
            public void unregisterObserver(Long packetId) {
                if(!mObservables.containsKey(packetId)) {
                    return;
                }
                
                Observable observable = mObservables.get(packetId);
                observable.deleteObservers();
                mObservables.remove(packetId);
                
            }
            
            public void unregisterAll() {
                
                if(!mObservables.isEmpty()) {
                    
                    for(Observable o : mObservables.values()) {
                        o.deleteObservers();
                    }
                    mObservables.clear();
                }
            }
            
        }
    
    }
    
    

    Session中的实现就比较简单了,主要是处理包的分发。这里通过自定义的PacketDispatcher分发数据包,写得非常简单。

    至此,一个基本的数据通信框架就搭成了。时间不早了,今天就写道这里,明天继续。

    相关文章

      网友评论

          本文标题:一个基于传统IO的轻量级的通讯框架

          本文链接:https://www.haomeiwen.com/subject/hduriftx.html