Android SocketChannel使用

作者: 寒江楓雨 | 来源:发表于2019-04-18 17:16 被阅读6次

    一 概述:

    是一种面向流连接只sockets套接字的可选择通道。是基于TCP连接传输,主要用来处理网络I/O的通道,实现了可选择通道,可以被多路复用。

    二 特征:

    1 对于已经存在的socket不能创建SocketChannel

    2 SocketChannel中提供的open接口创建的Channel并没有进行网络级联,需要使用connect接口连接到指定地址

    3 未进行连接的SocketChannle执行I/O操作时,会抛出NotYetConnectedException

    4 SocketChannel支持两种I/O模式:阻塞式和非阻塞式

    5 SocketChannel支持异步关闭。如果SocketChannel在一个线程上read阻塞,另一个线程对该SocketChannel调用shutdownInput,则读阻塞的线程将返回-1表示没有读取任何数据;如果SocketChannel在一个线程上write阻塞,另一个线程对该SocketChannel调用shutdownWrite,则写阻塞的线程将抛出AsynchronousCloseException

    三 SocketChannel的使用:

    (1)创建

      Selector mSelector = Selector.open();
    
      InetSocketAddress inetSocketAddress =new InetSocketAddress(mRemoteIp, Constants.TCP_PORT);
    
      SocketChannel socketChannel= SocketChannel.open(inetSocketAddress);
    
     socketChannel.configureBlocking(false);
    
     socketChannel.register(mSelector, SelectionKey.OP_READ);
    

    (2) 连接校验

    socketChannel.isOpen();// 测试SocketChannel是否为open状态socketChannel.isConnected();//测试SocketChannel是否已经被连接socketChannel.isConnectionPending();//测试SocketChannel是否正在进行连接socketChannel.finishConnect();//校验正在进行套接字连接的SocketChannel是否已经完成连接

    (3) 读写模式

    前面提到SocketChannel支持阻塞和非阻塞两种模式:

    socketChannel.configureBlocking(false);
    

    主要是通过以上方法设置SocketChannel的读写模式。false表示非阻塞,true表示阻塞。

    (4) 读写

    SocketChannel socketChannel = SocketChannel.open(newInetSocketAddress("www.baidu.com",8080));
    ByteBuffer byteBuffer = ByteBuffer.allocate(16);
    socketChannel.read(byteBuffer);
    socketChannel.close();
    System.out.println("test end!");
    

    以上为阻塞式读,当执行到read出,线程将阻塞,控制台将无法打印test end!。

    SocketChannel socketChannel =SocketChannel.open(newInetSocketAddress("www.baidu.com",8080));socketChannel.configureBlocking(false);
    ByteBuffer byteBuffer = ByteBuffer.allocate(16);
    socketChannel.read(byteBuffer);
    socketChannel.close();
    System.out.println("test end!");
    

    以上为非阻塞读,控制台将打印test end!。

    读写都是面向缓冲区,这个读写方式与前文中的FileChannel一样,这里不再赘述。

    (5) 设置和获取参数

    socketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE,Boolean.TRUE).setOption(StandardSocketOptions.TCP_NODELAY,Boolean.TRUE);

    通过setOptions方法可以设置socket套接字的相关参数。

    socketChannel.getOption(StandardSocketOptions.SO_KEEPALIVE)socketChannel.getOption(StandardSocketOptions.SO_RCVBUF)

    可以通过getOption获取相关参数的值。如默认的接收缓冲区大小是8192byte。

    四 一个完整的例子

    package com.zongmu.rpa.probes;
    
    import android.text.TextUtils;
    
    import com.google.protobuf.InvalidProtocolBufferException;
    import com.google.protobuf.Message;
    import com.google.protobuf.RpcCallback;
    import com.zongmu.rpa.constant.Constants;
    import com.zongmu.rpa.model.ProtoPackage;
    import com.zongmu.rpa.probes.callback.TcpConnCallback;
    import com.zongmu.rpa.utils.ByteUtils;
    import com.zongmu.rpa.utils.LogUtil;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    
    public class TcpProbes implements ZmProbes {
        private static String TAG = "TcpProbes";
        private Selector mSelector;
        private SocketChannel mSocketChannel;
        private String mRemoteIp;
        private int mRecvLength = -1;
        private TcpConnCallback mCallback;
        private ProtoPackage mProtoPackage;
    
        public void setTcpConnCallback(TcpConnCallback callback){
            mCallback = callback;
        }
    
        public Selector getSelector() {
            return mSelector;
        }
    
        public String getRemoteIp() {
            return mRemoteIp;
        }
    
        public void setRemoteIp(String ip) {
            this.mRemoteIp = ip;
        }
    
        @Override
        public void sendData(final byte[] pkg) {
            if (mSocketChannel.isOpen() && mSocketChannel.isConnected()) {
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            mSocketChannel.write(ByteBuffer.wrap(pkg));
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                }).start();
            }
        }
    
        @Override
        public int receData(Message responsePrototype,RpcCallback<Message>done) {
            if (mSocketChannel == null || !mSocketChannel.isOpen()|| !mSocketChannel.isConnected()){
                return Constants.SOCKET_NOT_CONNECT;
            }
            int ret = -1;
            try {
                ret = mSelector.select(3000);
            } catch (IOException e) {
                e.printStackTrace();
            }
            if(ret < 0){
                return Constants.RECEIVE_DATA_TIMEOUT;
            }
            byte[] recvData = new byte[512];
            Iterator<SelectionKey> iterator = mSelector.selectedKeys().iterator();
            while(iterator.hasNext()) {
                SelectionKey key = iterator.next();
                if(key.isValid()) {
                    if(key.isReadable()) {
                        LogUtil.e(TAG, "receive data");
                        try {
                            mRecvLength = mSocketChannel.read(ByteBuffer.wrap(recvData));
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                        LogUtil.e(TAG, "Received data size: " + mRecvLength);
                    }
                }else {
                    LogUtil.e(TAG, "wait server response timeout");
                    mRecvLength = -1;
                    recvData = null;
                }
                if (!iterator.hasNext()) {
                    break;
                }
            }
            
            if(mProtoPackage == null){
                mProtoPackage = new ProtoPackage();
            }
            String raw_data = new String(recvData, 0, mRecvLength);
            LogUtil.e(TAG, "RawPackage:" + raw_data);
            LogUtil.e(TAG, "RawPackage:" + ByteUtils.bytes2HexString(ByteUtils.getSubArrays(recvData,0,mRecvLength)));
            String data = new String(recvData, 0, 13);
            LogUtil.e(TAG, "data:"+data);
            if (TextUtils.equals("ZongMuService", data)) {
                LogUtil.e(TAG, "receive server response");
                mProtoPackage.setPackageBuffer(recvData);
                mProtoPackage.parsePackage();
                
                if(mRecvLength != mProtoPackage.getPackageSize()){
                    LogUtil.e(TAG,"data length parse Error");
                    recvData = null;
                    mRecvLength = -1;
                    return -3;
                }
    
                byte[] serviceParamPackage = mProtoPackage.getParamPackage();
                try {
                    responsePrototype = responsePrototype.getParserForType().parseFrom(serviceParamPackage);
                    done.run(responsePrototype);
                } catch (InvalidProtocolBufferException e) {
                    e.printStackTrace();
                }
                recvData = null;
                mRecvLength = -1;
                LogUtil.d(TAG,"callback finish");
            }else {
                LogUtil.d(TAG,"receive error");
                recvData = null;
                mRecvLength = -1;
                return Constants.RECEIVE_DATA_ERROR;
            }
            return Constants.RECEIVE_DATA_SUCCESS;
        }
    
        @Override
        public void start() {
            if(TextUtils.isEmpty(mRemoteIp)){
                LogUtil.i( TAG,"remote ip is null");
                return;
            }
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        mSelector = Selector.open();
                        InetSocketAddress inetSocketAddress = new InetSocketAddress(mRemoteIp, Constants.TCP_PORT);
                        mSocketChannel = SocketChannel.open(inetSocketAddress);
                        mSocketChannel.configureBlocking(false);
                        mSocketChannel.register(mSelector, SelectionKey.OP_READ);
                        LogUtil.e(TAG, "tcp socket connect success");
                        mCallback.onTcpConnect(Constants.SOCKET_CONN_SUCCESS);
                    } catch (IOException e) {
                        e.printStackTrace();
                        LogUtil.e(TAG, "tcp socket connect failed");
                        mCallback.onTcpConnect(Constants.SOCKET_CONN_FAILED);
                    }
                }
            }).start();
        }
    
        @Override
        public void stop() {
            try {
                if (mSocketChannel != null && mSocketChannel.isConnected()) {
                    mSocketChannel.finishConnect();
                    mSelector.close();
                    mSocketChannel.close();
                    LogUtil.e( TAG,"tcp socket closed");
                }
            } catch (IOException e) {
                e.printStackTrace();
                LogUtil.e( TAG,"tcp socket closed:"+e.toString());
            }
        }
    
    }
    
    

    相关文章

      网友评论

        本文标题:Android SocketChannel使用

        本文链接:https://www.haomeiwen.com/subject/qjitgqtx.html