美文网首页
一个基于传统IO的轻量级的通讯框架

一个基于传统IO的轻量级的通讯框架

作者: songsforjane | 来源:发表于2018-08-21 23:10 被阅读0次

平时工作中难免会需要编写一些涉及到通讯方面的代码。在通讯方面也有许多优秀的开源项目,比如大名鼎鼎的Netty,Mina等。但是有一种情况是,如果,自己的使用场景比较小,这时候使用这些大型的开源项目,会显得大材小用,造成性能浪费不说,学习成本也是一件需要考虑的事。这时候如果有一个轻量级的,健壮的通讯框架,就显得特别好用。笔者工作这一年多,基本都是在与一些简单自定义的通讯框架打交道。恰巧,最近有时间,总结下,学习的心得。

在开始本文前,先来看看几个问题。

1.一个完整的通讯框架应该有哪几部分组成?
2.怎么样保证数据的完整性(处理分包,拼包问题)?
3.怎么样自定义协议,自定义应该要遵循那些规则?

别急, 我们一个一个的来看。

对于问题1

笔者的看法是,一个完整的通信协议至少应该包含两个部分,1>数据模型部分,主要处理数据的序列化与反序列化问题。2>通信部分,主要用于发送与接受序列化过的数据。

对于问题2

在通信模块接收到序列化过的数据后,需要讲这些二进制的数据进行反序列化。难点就是这个将数据反序列化的过程。面对一大堆接受到的二进制的数据,你根本就不知道,一个完整的数据包是从那里开始,从哪里结束。要解决这个问题,至少得包含两个方面的信息,1)一个唯一标识的包头,即包开始的地方。2)整包数据的长度,即包结束的地方。这里要提醒的是,如果别人破解了你的协议,要主意防范Socket攻击。即在在一个合法的包头后面,放置一个足够大的包的长度,造成程序内存溢出,存而使程序崩溃。

一个简单的通信框架模型
对于问题3

笔者认为,一个设计良好的自定义协议至少应该包含两个接口,一个进行包数据的序列化,一个进行包数据的反序列化,即readFromBytes 与 writeToBytes,即数据对象与java对象之间的转化。

下面先看看整个工程的目录结构:

目录结构

这些基本问题解决了,写起来就会轻松很多。废话不多说,上代码。上面提到,一个轻量级的通信框架至少应该包含两个部分,数据模型部分与通信部分。本文先讲简单的,从数据模型部分开始。下面是数据模块的基类Packet。


/**
*   +--------------------------------------------------------------------------------------------+
*    | 包头(2字节)| 时间戳(8字节)| 有效数据长度(4字节)| 有效数据 |
*   +--------------------------------------------------------------------------------------------+
*/

public class Packet {
    
//包头,标识一个数据包的开始
    private short HEAD = -256;
//时间戳,标注数据包生成时的时间
    private long timeStamp = System.currentTimeMillis();
//包中有效数据的长度,因此,一个完整包的长度 = 2 + 8 + 4 + length;
    private int length;
//包中Id,用于区别Packet的类型
    private long id;
    
    public Packet(long id) {
        this.id = id;
    }
    
 // 序列化方法,将一个Java对象转换为数据对象
    public byte[] getBytes() {
        
        JsonObject jo = writeToJson();
        byte[] content = jo.toString().getBytes();
        
        length = content.length;
        
        byte []bytes = new byte[2+8+4+length];
        
        ByteBuffer buffer = ByteBuffer.wrap(bytes);
        
        buffer.putShort(HEAD);
        buffer.putLong(timeStamp);
        buffer.putInt(length);
        buffer.put(content);
        
        return buffer.array();
    }
    
    public void readFromBytes(byte[] bytes) {
        readFromBytes(bytes, 0, bytes.length);
    }
    
 // 反序列化方法,将一个数据对象转换为Java对象
    public void readFromBytes(byte[] bytes, int offset, int length) {
        String msg = new String(bytes, offset, length);
        JsonObject jo = (new JsonParser()).parse(msg).getAsJsonObject();
        readFromJson(jo);
    }
    
    public JsonObject writeToJson() {
        JsonObject jo = new JsonObject();
        jo.addProperty("id", id);
        return jo;
    }
    
    public void readFromJson(JsonObject jo) {
        this.id = jo.get("id").getAsLong();
    }
    
    public Long getId() {
        return id;
    }
    
}

Packet 的有效数据部分是一个Json字符串,这里引用了Google的Gson。为啥数据的有效部分要用Json字符串呢?这里是为了可读性。子类只需要复写writeToJson,如下:

@Override
    public JsonObject writeToJson() {
        JsonObject jo = super.writeToJson();
        jo.addProperty("heart_beat", "yes");
        return jo;
    }

就能将要传输的任何数据放进包中。

与此同时,还需要改写readFromJson方法,获取Json中传输的数据,组装一个Java对象,如下:

@Override
    public void readFromJson(JsonObject jo) {
        super.readFromJson(jo);
        heartbeat = jo.get("heart_beat").getAsString();
    }

数据模型部分就说到这里,理解了原理,其实还是非常简单的。

下面讲述本篇博文的重中之重,数据通讯的部分。先别急看代码,画个图简单的说明下设计思想:

通信模块

Session:主要面向业务,比如发送数据包,接收数据包,分发数据包

Connection:主要面向连接,是发送数据,接受数据的具体实现类

这样设计的好处是,将业务与底层连接分离开。Session 只需要处理各种业务,比如登陆,注册,维护心跳等。Session并不需要关心Connection是否损坏,也不需要自己维护Connection。Connection就专注于发送数据,至于发送的是啥,并不需要关心。

下面,看Connection的实现:


public class Connection {
    
    private Socket mSocket;
    private SocketAddress mAddress;
    private STATUS mStatus;
    
//接受数据的缓存队列
    private BlockingQueue<Packet> mReceiverQueue = new LinkedBlockingQueue<>();
//发送数据的缓存队列
    private BlockingQueue<Packet> mSenderQueue = new LinkedBlockingQueue<>();
//Socket 读数据流
    private InputStream inputStream;
//Socket 写数据流
    private OutputStream outputStream;
//分发接收到的数据包的监听器
    private OnPacketReceivedListener mListener;
    
    private boolean mStop;
    
//
    public enum STATUS{
        CONNECTED,
        DISCONNECTED,
        CONNECTING;
    }
    
    public boolean connect(SocketAddress address, int timeout) {
        
        if(address == null) {
            return false;
        }
        
        if(mSocket != null) {
            close();
            setStatus(STATUS.DISCONNECTED);
        }
        
        Socket socket = new Socket();
        boolean success = true;
        try {
            socket.setSoTimeout(10000);
            setStatus(STATUS.CONNECTING);
            socket.setReuseAddress(true);
            socket.connect(address, timeout);
            setStatus(STATUS.CONNECTED);
            setSocket(socket);
        } catch (IOException e) {
            System.out.println("connect failed, message: "+e.getMessage());
            success = false;
        }finally {
            try {
                if(socket!=null) {
                    socket.close();
                }
            } catch (IOException e) {
            }
        }
        return success;
        
    }
    
    public boolean connect(SocketAddress address) {
        return connect(address, Integer.MAX_VALUE);
    }
    
    public boolean setSocket(Socket socket) {
        
        if(mSocket!=null) {
            close();
            setStatus(STATUS.DISCONNECTED);
        }
        
        if(!socket.isConnected()) {
            return false;
        }
        
        try {
            mSocket = socket;
            socket.setSoTimeout(30000);
            inputStream = mSocket.getInputStream();
            outputStream = mSocket.getOutputStream();
            
            mStop = false;
            
            setStatus(STATUS.CONNECTED);
            startSender();
            startReader();
        } catch (IOException e) {
            setStatus(STATUS.DISCONNECTED);
            return false;
        }
        
        return true;
    }
    
    public void setOnPacketListener(OnPacketReceivedListener l) {
        this.mListener = l;
    }
    
    private void startReader() {
        
        new Thread(new Runnable() {
            
            @Override
            public void run() {
                
                byte buffer[] = new byte[512*1024];
                int dataLen = 0;
                
                while(!mStop) {
                    
                    try {
                        if(inputStream.available() <= 0) {
                            sleep(100);
                            continue;
                        }
                        
                        int read = inputStream.read(buffer, dataLen, buffer.length-dataLen);
                        
                        dataLen += read;
                        
                        int handled = parsePackets(buffer, dataLen);
                        
                        dataLen = dataLen - handled;
                        System.arraycopy(buffer, handled, buffer, 0, dataLen);
                        
                        dispatchPackets();
                        
                        if(dataLen == buffer.length) {
                            dataLen = 0;
                        }
                        
                        
                    } catch (IOException e) {
                    } catch (InterruptedException e) {
                    }
                    
                }
                
            }

        }).start();
        
    }
    
    protected void dispatchPackets() throws InterruptedException {
        
        if(mSenderQueue.isEmpty()) {
            return;
        }
        //dispatch message
        Packet p = null;
        try {
            p = mReceiverQueue.poll(100, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
        }
        
        if(mListener != null 
                && p != null) {
            mListener.onPacketReceived(p);
        }
        
    }
//这个方法是分包的重点
    private int parsePackets(byte[] buffer, int length) {
        int handled = 0;
        int i = 0;
        
        ByteBuffer buf = ByteBuffer.wrap(buffer, 0, length);
        while( i < length) {
//总长度小于14,即有效数据前面部分,直接pass掉                  
            if(i + 14 < length) {
                return handled;
            }
//不是以包头标识符开始的,移动指针到下一位
            if(buf.getShort(i) != -256) {
                i++;
                continue;
            }
//获取有效数据部分的长度
            int len = buf.getInt(i+10);
//2+8+4+length > 剩余字节,则说明,这一包数据不完整,直接pass
            if(i + 14 + len < length) {
                return handled;
            }
//通过包id,可以知道包的类型,进行包的反序列化,并且扔进接收的缓存队列
            byte[] packet = new byte[14+len];
            System.arraycopy(buffer, i, packet, 0, packet.length);
            String joStr = new String(packet, 14, packet.length-14);
            JsonObject jo = (new JsonParser()).parse(joStr).getAsJsonObject();
            long packetId = jo.get("id").getAsLong();
            if(packetId > 0) {
                Packet p = PacketFactory.createPacket(packetId);
                if(p != null) {
                    p.readFromJson(jo);
                    mReceiverQueue.offer(p);
                }
            }
                    handled = (i+14+len);
            i = handled;
        }
        return handled;
    }

    private void startSender() {
        
        new Thread(new Runnable() {
            
            @Override
            public void run() {
                
                while(!mStop) {
                    
                    if(mSenderQueue.isEmpty()) {
                        sleep(100);
                        continue;
                    }
                    
                    try {
                        Packet packet = mSenderQueue.take();
                        byte[] msg = packet.getBytes();
                        outputStream.write(msg);
                    } catch (InterruptedException e) {
                    } catch (IOException e) {
                    }
                    
                }
                
            }
        }).start();
        
    }
    
    public boolean delivery(Packet packet) {
        
        return mSenderQueue.offer(packet);
    }

    public boolean isConnected() {
        return getStatus() == STATUS.CONNECTED
                  && mSocket != null
                    && mSocket.isConnected();
    }
    
    public static void closeQuietly(Socket socket) {
        if(socket!=null) {
            try {
                socket.close();
            } catch (IOException e) {
                System.out.println("close socket failed, message: "+e.getMessage());
            }
        }
    }
    
    public void close() {
        closeQuietly(mSocket);
        mSocket = null;
        
        mReceiverQueue.clear();
        mSenderQueue.clear();
        mStop = true;
    }
    
    public SocketAddress getAddress() {
        return mAddress;
    }
    
    public void setStatus(STATUS status) {
        this.mStatus = status;
    }
    
    public STATUS getStatus() {
        return mStatus;
    }
    
    public static void sleep(long timeInMills) {
        try {
            Thread.sleep(timeInMills);
        } catch (InterruptedException e) {
        }
    }
    
    public interface OnPacketReceivedListener{
        void onPacketReceived(Packet packet);
    }

}

Connection中开启了两个线程进行数据的读/写。最复杂的部分就是分包部分。Read the fucking code!这里我也不多讲了。没什么技巧,纯看编程的基本功!

Session的实现:

public class Session {
    
    private long clientPacketCount = 0;
    private long serverPacketCount = 0;
    private long startDate = System.currentTimeMillis();
    private long lastActiveDate;
    
    private String mSessionId;
    
    private Connection mConnection;
    
    private PacketDispatcher mDispatcher = new PacketDispatcher();
    
    private OnPacketReceivedListener mPacketListenner = new OnPacketReceivedListener() {
        
        @Override
        public void onPacketReceived(Packet packet) {
            mDispatcher.notifyPacketReceived(packet);
            incrementServerPacketCount();
        }
    };
    
    public Session(String sessionId, Connection conn) {
        this.mSessionId = sessionId;
        this.mConnection = conn;
        this.mConnection.setOnPacketListener(mPacketListenner);
    }
    
    public void updateConnection(Connection conn) {
        
        if(mConnection != null) {
            mConnection.close();
        }
        
        mConnection = conn;
        mConnection.setOnPacketListener(mPacketListenner);
        
    }
    
    public String getSessionId() {
        return mSessionId;
    }
    
    public boolean isConnected() {
        return mConnection!=null && mConnection.isConnected();
    }
    
    public STATUS getStatus() {
        return mConnection == null 
                ? STATUS.DISCONNECTED : mConnection.getStatus();
    }
    
    public Connection getConnection() {
        return mConnection;
    }
    
    public long getNumClientPackets() {
        return clientPacketCount;
    }

    public long getNumServerPackets() {
        return serverPacketCount;
    }
    
    public void deliver(Packet packet) {
        if (mConnection != null) {
            mConnection.delivery(packet);
            incrementClientPacketCount();
        }
    }
    
    public void close() {
        if (mConnection != null) {
            mConnection.close();
        }
    }
    
    public boolean isClosed() {
        return mConnection.isConnected();
    }
    
    public Date getCreationDate() {
        return new Date(startDate);
    }
    
    public Date getLastActiveDate() {
        return new Date(lastActiveDate);
    }
    
    public void incrementClientPacketCount() {
        clientPacketCount++;
        lastActiveDate = System.currentTimeMillis();
    }

    public void incrementServerPacketCount() {
        serverPacketCount++;
        lastActiveDate = System.currentTimeMillis();
    }
    
    public void registerObserver(long packetId, Observer ob) {
        mDispatcher.registerObserver(packetId, ob);
    }
    
    public void unregisterObserver(long packetId) {
        mDispatcher.unregisterObserver(packetId);
    }
    
    public void registerAll() {
        mDispatcher.unregisterAll();
    }
    
    private class PacketDispatcher{
        
        private HashMap<Long, Observable> mObservables = new HashMap<>();
        
        public void notifyPacketReceived(Packet packet) {
            
            if(packet == null) {
                return;
            }
            
            Observable observable = mObservables.get(packet.getId());
            
            if(observable==null) {
                System.out.println("Havn't registered this type observable.");
            }
            
            if(observable != null) {
                
                try {
                    Field changed = observable.getClass().getDeclaredField("changed");
                    if(!changed.isAccessible()) {
                        changed.setAccessible(true);
                    }
                    changed.set(observable, true);
                } catch (NoSuchFieldException e) {
                } catch (SecurityException e) {
                } catch (IllegalArgumentException e) {
                } catch (IllegalAccessException e) {
                }
                
                observable.notifyObservers(packet);
            }
            
        }
        
        public void registerObserver(Long packetId, Observer ob) {
            
            Observable observable = mObservables.get(packetId);
            
            if(observable==null) {
                observable = new Observable();
                mObservables.put(packetId, observable);
            }
            
            observable.addObserver(ob);
            
        }
        
        public void unregisterObserver(Long packetId) {
            if(!mObservables.containsKey(packetId)) {
                return;
            }
            
            Observable observable = mObservables.get(packetId);
            observable.deleteObservers();
            mObservables.remove(packetId);
            
        }
        
        public void unregisterAll() {
            
            if(!mObservables.isEmpty()) {
                
                for(Observable o : mObservables.values()) {
                    o.deleteObservers();
                }
                mObservables.clear();
            }
        }
        
    }

}

Session中的实现就比较简单了,主要是处理包的分发。这里通过自定义的PacketDispatcher分发数据包,写得非常简单。

至此,一个基本的数据通信框架就搭成了。时间不早了,今天就写道这里,明天继续。

相关文章

网友评论

      本文标题:一个基于传统IO的轻量级的通讯框架

      本文链接:https://www.haomeiwen.com/subject/hduriftx.html