美文网首页
Android最小局域网单对单TCP实现

Android最小局域网单对单TCP实现

作者: Xerrard | 来源:发表于2017-06-29 15:36 被阅读54次

    1. 背景

    今日需要实现一个局域网的wifi数据传输的功能。不可避免的,要进行TCP socket的操作。由于逻辑比较简单,就使用最小化的Socket通信即可。

    2. 目标

    实现一个TCP模块,供业务层调用。业务层把要发送的数据和发送的目标给到TCP模块,TCP模块完成传输,并将传输状态和传输结果反馈给业务层

    3. 需求分析

    需要一个类来封装所有的TCP操作,我们定义为ChannelTransport

    接口:

    1. 启动网络连接
      startTcpService()
    2. 关闭网络连接
      stopTcpService()
    3. 网络联通
      onConnected()
    4. 网络连接失败
      onConnectFail()
    5. 网络联通的情况下,一端close,另外一端收到-1
      onConnectEnd()
    6. 断网
      onConnectException()
    7. 发送数据
      sendByte(Byte[] datas)
    8. 收到数据
      onRead(Byte[] datas)

    4. TCP模块封装

    4.1 接口层 IfSocket

    接口层主要就是一接口定义:如打开socket,关闭socket,发送数据,接收数据,连接状态监听,数据监听

    public interface IfSocket {
        
        public void start();
        public void sendTo(byte[] var1);
        public void receive();
        public void stop();
        public void setConnectEventListener(SocketConnectEventListener connectEventListener);
        public void setReadStreamListener(OnStreamListener onReadStreamListener);
    
        public static interface SocketConnectEventListener {
            /**
             * 用于Socket主线程,socket连接成功
             */
            public void onConnected();
            /**
             * 用于Socket主线程,socket连接失败
             */
            public void onConnectFail();
            
            /**
             * 用于IOReadThread,socket 传输过程中收到-1结束符,标志对方socket close或者关闭输入
             */
            public void onConnectEnd(); 
            
            /**
             *  用于IOReadThread和IOWriteThread,socket 传输过程中的Io exception
             */
            public void onConnectException();
        }
    
        
    /**
     * 用于IO Thread ,一次socket传输接收到的数据
     * @author xuqiang
     *
     */
        public static interface OnStreamListener {
            public void onRead(byte[] var1);
            public void onSent();
        }
    
    
    }
    

    4.2 Socket端的具体实现

    几个注意点

    1. start要分server和client两种情况
    2. IO线程用线程池实现TcpWriteIORunnable TcpReadIORunnable
    3. 设计一个心跳包线程TcpWriteAliveRunable,在当前没有send数据的情况下,循环send心跳包
    public class TcpSocket implements IfSocket {
        boolean isServer = true; //是不是Server
        String ipAddress;                  //Server的IP,给client用于connect的
        protected ExecutorService mThreadPool; //线程池,用于新建receive和send线程
        protected ScheduledExecutorService mScheduledThreadpool; //Timer线程池,用于发送心跳包
        protected int mState; //当前的状态
        protected Socket mSocket; 
        protected ServerSocket mServerSocket;
        protected SocketConnectEventListener mConnectEventListener;
        protected OnStreamListener mOnStreamListener;
        private InputStream mInStream;
        private OutputStream mOutStream;
        public static final byte[] SEND_TAG = new byte[] { -5, -17, -13, -19 }; //数据头部,用于数据校验
        public static final byte[] SEND_ALIVE_TAG = new byte[] { -25, -31, -37, -43 }; //心跳包
        protected TcpWriteAliveRunable mTcpWriteAliveRunable;  //心跳包的task
    
        public TcpSocket(boolean isServer, String ipAddress) {
            super();
            this.isServer = isServer;
            this.ipAddress = ipAddress;
        }
    
        @Override
        public void setConnectEventListener(
                SocketConnectEventListener connectEventListener) {
            this.mConnectEventListener = connectEventListener;
        }
    
        @Override
        public void setReadStreamListener(OnStreamListener onReadStreamListener) {
            this.mOnStreamListener = onReadStreamListener;
        }
    
        @Override
        public void start() {
            this.mThreadPool = Executors.newCachedThreadPool();
            this.mScheduledThreadpool = Executors.newScheduledThreadPool(1);
            this.mTcpWriteAliveRunable = new TcpWriteAliveRunable(
                    mOutStream, mConnectEventListener);
            try {
                if (isServer) {
                    mServerSocket = new ServerSocket(TcpVar.PORT);
                    this.mSocket = this.mServerSocket.accept();
                } else {
                    this.mSocket = new Socket(ipAddress, TcpVar.PORT);
                }
                mState = TcpVar.STATE_CONNECTED;
                mConnectEventListener.onConnected();
                Dbg.i(TcpVar.TAG, " create socket sucess");
                mSocket.setSoTimeout(20000); // 加入超时
                mScheduledThreadpool.scheduleAtFixedRate(mTcpWriteAliveRunable, 4, 4, TimeUnit.SECONDS);
            } catch (Exception e) {
                mState = TcpVar.STATE_CONNECT_FAIL;
                mConnectEventListener.onConnectFail();
                Dbg.w(TcpVar.TAG, " create socket failed", e);
            }
    
        }
    
        @Override
        public void receive() {
            if (mState != TcpVar.STATE_CONNECTED) {
                return;
            }
            try {
                mInStream = new BufferedInputStream(this.mSocket.getInputStream());
            } catch (IOException e) {
                mInStream = null;
            }
            mThreadPool.execute(new TcpReadIORunnable(mInStream,
                    mConnectEventListener, mOnStreamListener));
        }
    
        @Override
        public void sendTo(byte[] var1) {
            if (mState != TcpVar.STATE_CONNECTED) {
                return;
            }
            try {
                mOutStream = new BufferedOutputStream(
                        this.mSocket.getOutputStream());
            } catch (IOException e) {
                mOutStream = null;
            }
    
            try {
                //发送时阻塞当前线程,心跳包暂停发送,发送完毕后,心跳包重新发送
                mScheduledThreadpool.shutdownNow();
                mThreadPool.submit(new TcpWriteIORunnable(mOutStream,
                        mConnectEventListener, mOnStreamListener,var1)).get();
                mScheduledThreadpool.scheduleAtFixedRate(mTcpWriteAliveRunable, 4, 4, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            };
    
        }
    
        @Override
        public void stop() {
            mThreadPool.shutdownNow();
            mScheduledThreadpool.shutdownNow();
            try {
                mSocket.close();
                mServerSocket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
            
    
        }
    
    }
    

    4.3 TcpWriteIoRunable的实现

    数据格式很简单

    1. SEND_TAG
    2. data_length
    3. data
    public class TcpWriteIORunnable implements Runnable {
        OutputStream mOutStream;
        SocketConnectEventListener mConnectEventListener;
        OnStreamListener mOnStreamListener;
        byte[] data;
    
        public TcpWriteIORunnable(OutputStream mOutStream,
                SocketConnectEventListener mConnectEventListener,
                OnStreamListener mOnStreamListener, byte[] datas) {
            this.mOutStream = mOutStream;
            this.mConnectEventListener = mConnectEventListener;
            this.mOnStreamListener = mOnStreamListener;
            this.data = data;
        }
    
        @Override
        public void run() {
            try {
                mOutStream.write(TcpSocket.SEND_TAG);
                mOutStream.write(Util.int2bytes(this.data.length));
                mOutStream.write(this.data);
                mOutStream.flush();
                mOnStreamListener.onSent();
            } catch (Exception e) {
                mConnectEventListener.onConnectException();
            }
    
        }
    }
    

    4.4 TcpWriteAliveRunable的实现

    心跳包的设计非常简单,就是循环发送SEND_ALIVE_TAG

    mScheduledThreadpool.scheduleAtFixedRate(mTcpWriteAliveRunable, 4, 4, TimeUnit.SECONDS);
    
    public class TcpWriteAliveRunable implements Runnable {
        OutputStream mOutStream;
        SocketConnectEventListener mConnectEventListener;
        
        public TcpWriteAliveRunable(OutputStream mOutStream,
                SocketConnectEventListener mConnectEventListener) {
            super();
            this.mOutStream = mOutStream;
            this.mConnectEventListener = mConnectEventListener;
        }
    
        @Override
        public void run() {
            try{
                mOutStream.write(TcpSocket.SEND_ALIVE_TAG);
            }
             catch (Exception e) {
                    mConnectEventListener.onConnectException();
                }
        }
    
    }
    

    4.5 TcpReadIORunnable的实现

    Read线程的流程主要分三步:

    1. 校验SEND_TAG。校验的过程中我们是一个字节一个字节的校验
    2. 第二步还是在读取数据长度
    3. 第三步就是读取真正的数据了。有三种策略读数据:
      1. 一个byte一个byte的读,这样效率较低
      2. mmInStream.read(len)。但是InputStream.read(len)有个问题就是,他可能实际读取的长度是小于len的。这个len是数据读取的最大值,所以也不能直接使用;
      3. 我的算法是:mmInStream.read(len),每次记录已经read的数据量,然后通过len-readBytes得到还剩下的数据长度,然后依次循环读取,直到数据量读满len或者read==-1(断网)为止。
    public class TcpReadIORunnable implements Runnable {
    
        private boolean isStoped = false;
        InputStream mInStream;
        SocketConnectEventListener mConnectEventListener;
        OnStreamListener mOnReadStreamListener;
    
        public TcpReadIORunnable(InputStream mInStream,
                SocketConnectEventListener mConnectEventListener,
                OnStreamListener mOnReadStreamListener) {
            this.mInStream = mInStream;
            this.mConnectEventListener = mConnectEventListener;
            this.mOnReadStreamListener = mOnReadStreamListener;
        }
    
        @Override
        public void run() {
            int i = 0;
            ByteBuffer errorByteBuffer = ByteBuffer.allocate(1024 * 16);
            while (!this.isStoped) {
                try {
                    // 1.判断起始标记 start
                    int t = this.mInStream.read();
                    if (t == -1) {
                        Dbg.e(TcpVar.TAG, "read stream is -1!!!!!!!"); // 网络一旦断了,或者一端关闭,则出循环,结束io线程
                        mConnectEventListener.onConnectEnd();
                        break;
                    }
                    Dbg.d(TcpVar.TAG, "mmInStream.read() one sucess ");
                    byte b = (byte) (t & 0xFF);
                    if (TcpSocket.SEND_TAG[i] != b) {
                        errorByteBuffer.put(b);
                        Dbg.e(TcpVar.TAG,
                                "!read byte error i:"
                                        + i
                                        + "  b:"
                                        + EncrypUtil
                                                .byteArrayToHexStr(new byte[] { b })
                                        + "  tag:"
                                        + EncrypUtil
                                                .byteArrayToHexStr(new byte[] { TcpSocket.SEND_TAG[i] }));
                        i = 0;
                        continue;
                    }
                    i++;
                    if (i != TcpSocket.SEND_TAG.length) {
                        continue;//继续读下一个数据,直到SEND_TAG读完
                    }
                    i = 0;//到此处全部SEND_TAG全部读完
                    //下面是数据的打印,用于调试
                    if (errorByteBuffer.position() != 0) {
                        byte[] dst = new byte[errorByteBuffer.position()];
                        errorByteBuffer.position(0);
                        errorByteBuffer.get(dst, 0, dst.length);
                        errorByteBuffer.clear();
                        Dbg.e(TcpVar.TAG,
                                "!read byte error data:"
                                        + EncrypUtil.byteArrayToHexStr(dst));
                    }
    
                    errorByteBuffer.clear();
                    // 2.读取包长度
                    byte[] len = new byte[4];
                    for (int j = 0; j < len.length; j++) {
                        len[j] = (byte) (this.mInStream.read() & 0xFF);
                    }
    
                    // mmInStream.read(len);
                    int length = Util.bytes2int(len);
                    // Dbg.d("read length:"+length);
                    byte[] data = new byte[length];
                    Dbg.e(TcpVar.TAG, "start read data,length =  " + length);
                    // 3. 读取数据
    
                    int readBytes = 0;
                    while (readBytes < data.length) {
                        int read = mInStream.read(data, readBytes, data.length
                                - readBytes);
                        if (read == -1) {
                            break;
                        }
                        readBytes += read;
                    }
    
                    mOnReadStreamListener.onRead(data);
                    Dbg.d("read byte end!!!!!!!");
                } catch (Exception e) {
                    Dbg.e("WifiTransferService",
                            "Fail to read bytes from input stream of Wifiiothread "
                                    + e.getMessage(), e.getMessage());
                    mConnectEventListener.onConnectException();
                    return;
                }
    
            }
        }
    
    
    }
    

    5.5 Android业务层调用的注意事项。

    1. 将ChannelTransport中使用TcpSocket做相应的操作,并且实现OnStreamListener和SocketConnectEventListener,即可。
    2. Socket的开启/关闭/发送/接收,以及OnStreamListener和SocketConnectEventListener的回调都是在不同的线程中工作的。为了保证线程同步问题,我们需要使用一个HandlerThread,并将所有的callback让HandlerThread去处理;然后使用ChannelTransport去extends Handler或者再新建一个Handler与这个HandlerThread对应起来。

    相关文章

      网友评论

          本文标题:Android最小局域网单对单TCP实现

          本文链接:https://www.haomeiwen.com/subject/aoyqattx.html