美文网首页首页推荐程序员
Google Protobuf 协议+Socket实现异步登录

Google Protobuf 协议+Socket实现异步登录

作者: 传达室马大爷 | 来源:发表于2017-04-06 11:35 被阅读1492次

    背景:公司登录基础服务是C开发,业务是Java开发,需要Java调用C的服务验证登录、获取登录信息
    实现方式:

    1. 交互通过发送Socket通讯,Google Protobuf 协议,发送心跳保持TCP连接
    2. 自定义ExecutorCompletionService,实现一个包含Map<Integer, BlockingQueue<Future<V>>>
      https://github.com/shawntime/shawn-common-utils/tree/master/src/main/java/com/shawntime/common/socket
    Google Protobuf 协议定义
    • 定义.proto文件:头文件(head_outer_class.proto)、登录(login_logic.proto)、心跳(login_proxy.proto)
      1)字段类型:bool,int32,float,double,string
      2)支持消息嵌套
      3)支持enum
      4)索引号要按顺序指定
      5)字段前缀:required:必须的;optional:可选的;repeated:可以重复的
    package qiqi;
    
    enum ENUM_HEAD_TYEP
    {
        enum_cs_head_type_cs = 0x1;         //0x1表示CSHead
        enum_cs_head_type_sc = 0x2;         //0x2表示SCHead
        enum_cs_head_type_ss = 0x3;         //0x3表示SSHead
    };
    
    enum ENUM_CS_HEAD_COMMAND
    {
        enum_cs_head_cmd_medal = 0x1;           //勋章系统
        enum_cs_head_cmd_login = 0x2;           //登录系统
        enum_cs_head_cmd_guild = 0x3;           //公会推荐
        enum_cs_head_cmd_user_info = 0x4;       //用户信息
    };
    
    enum ENUM_SC_HEAD_COMMAND
    {
        enum_sc_head_cmd_medal = 0x1;           //勋章系统
        enum_sc_head_cmd_login = 0x2;           //登录系统
        enum_sc_head_cmd_lottery = 0x3;     //博彩系统
        enum_sc_head_cmd_guild=0x4; // 公会推荐系统
    };
    enum ENUM_SS_HEAD_COMMAND
    {
        enum_ss_head_cmd_medal  = 0x1;          //勋章系统
        enum_ss_head_cmd_TVwall = 0x2;          //电视墙系统
        enum_ss_head_cmd_login  = 0x3;          //登录系统
        enum_ss_head_cmd_guild  = 0x4;          //公会推荐系统
        enum_ss_head_cmd_user_info  = 0x5;      //用户信息
        enum_ss_head_cmd_keep_alive  = 0x1001;      //心跳
    };
    message Head
    {
        optional uint32 uint32_head_type = 1;       // Head类型, 见ENUM_HEAD_TYEP定义
        optional LoginSig msg_login_sig = 2;        // 登录态
        optional uint32 uint32_paint_flag = 3;      // 染色字段
    
        optional CSHead msg_cs_head = 4;            // CS协议头部
        optional SCHead msg_sc_head = 5;
        optional SSHead msg_ss_head = 6;
    };
    
    message LoginSig
    {
        optional uint32 uint32_type = 1;        // 登录态type
        optional bytes bytes_sig = 2;           // key
    }
    
    message CSHead          //Client request --> Server, Server response --> Client
    {
        optional uint64 uint64_uid = 1;                     // openid
        optional uint32 uint32_command = 2;                 // 消息协议族编号,区分业务系统,见定义(ENUM_CS_HEAD_COMMAND)
        optional uint32 uint32_seq = 3;                     // 对单个用户的seq
        optional uint32 uint32_version = 4;                 // 客户端版本号
        optional uint32 uint32_retry_times = 5;             // 填正确的重试次数
        optional uint32 uint32_client_type = 6;             // 登录时要带上客户端类型,复用此字段即可
        optional uint32 uint32_pubno = 7;                   // 等felix分配
        optional uint32 uint32_localid = 8;                 // 地区ID
        optional uint32 uint32_timezone = 9;                // 时区
        optional fixed32 uint32_client_ip = 10;             // 客户端IP
        optional uint32 uint32_client_port = 11;            // 客户端Port
        optional fixed32 uint32_conn_ip = 12;               // ConnIP
        optional uint32 uint32_conn_port = 13;              // ConnPort
        optional fixed32 uint32_interface_ip = 14;          // 接口机器IP
        optional uint32 uint32_interface_port = 15;         // 接口机Port
        optional fixed32 uint32_actual_ip = 16;             // 客户端真实IP
        optional uint32 uint32_flag = 17;
    
        optional fixed32 uint32_timestamp = 18;             // 发包的当前时间戳,用于延时时间上报统计,Server需要原封不动带回Conn
        optional uint32 uint32_subcmd = 19;                 // 子命令号
        optional uint32 uint32_result = 20;                 // 回包给客户端的返回值
        // 成功:0x0-0x3f(优先用0x0),conn失败:0x40-0x7f,
        // 系统失败:0x80-0xbf,业务失败:0xc0-0xff,其他取值未定义
        optional uint64 uint64_session_id = 21;             // 如果存在,后端Server需要原样返回!!
        optional string str_client_ip = 22;                 //client 公网IP ,由代理填写
    }
    
    message SCHead              //Push broadcast
    {
        optional uint64 uint64_from_uid = 1;                // 来源用户,
        optional uint64 uint64_to_uid = 2;                  // 目的用户,
        optional uint32 uint32_type = 3;                    // 预留push类型, 0表示点对点push,1表示对uint64_from_uid所在房间广播
        optional uint32 uint32_command = 4;                 // 消息协议族编号,区分业务系统,见定义(ENUM_SC_HEAD_COMMAND)
        optional uint32 uint32_timestamp = 5;
        optional uint32 uint32_seq = 6;
        optional uint32 uint32_source_type = 7;                         //来源服务器类型(为兼容windows服务器)
        optional uint64 uint64_from_roomid = 8;             // 来源用户所在房间
        optional uint64 uint64_to_roomid = 9;               // 目的用户所在房间,
    
    }
    
    message SSHead              //Server <--> Server
    {
        optional uint64 uint64_jobid = 1;
        optional uint64 uint64_seq = 2;
        optional uint32 uint32_command = 3;                 // 消息协议族编号,区分业务系统,见定义(ENUM_SS_HEAD_COMMAND)
        optional uint32 uint32_server_id = 4;               // 标记消息来源 
    }
    
    package qiqi.login_logic;
    
    enum ENUM_LOGIN_LOGIC_CMD
    {
        emum_login_logic_tencent_verify = 0x1;
    };
    
    message ReqBody{
        optional uint32     uint32_cmd                            = 1;  //  ENUM_LOGIN_LOGIC_CMD
        optional Reqlogic_Tencent_Verify msg_subcmd0x01_req_tencent_verify = 2; 
    };
    
    message RspBody{
        optional uint32 uint32_cmd                      = 1;                //  ENUM_LOGIN_LOGIC_CMD
        optional int32  int32_result                    = 2;                //  0 success
        optional bytes  bytes_error                     = 3;
        optional Rsplogic_Tencent_Verify msg_subcmd0x1_rsp_tencent_verify = 4;
    };
    
    message Reqlogic_Tencent_Verify{
        optional string str_openid = 1;
        optional string str_openkey = 2;
        optional uint32 uint32_logmode = 3;  //登陆模式 0正常登陆  1 KEY 2 续期
        optional string str_pf = 4;
        optional string str_userip = 5;
        optional string bytes_PcID = 6;
        optional uint32 uint32_version = 7; 
    
        optional uint32 uint32_StartSource = 8; 
        optional int64  int64_sourcetype = 9;
    };
    
    message Rsplogic_Tencent_Verify{
        optional string str_openid = 1;
        optional bytes  bytes_userkey = 2;
        optional int64  int64_qiqi_id = 3;
        optional string str_username = 4;
        optional int32  int32_user_state = 5; // 0:old user , 1:new user
    };
    package qiqi.login_porxy;
    
    enum ENUM_LOGIN_PROXY_CMD
    {
        emum_login_proxy_keep_alive = 0x1001;
    };
    
    message ReqBody{
        optional uint32     uint32_cmd                            = 1;  //  ENUM_LOGIN_LOGIC_CMD
        optional Req_keep_alive msg_subcmd0x1001_req_keep_alive = 0x1001; 
    };
    
    message RspBody{
        optional Rsp_keep_alive msg_subcmd0x1001_rsp_keep_alive = 0x1001; 
    };
    message Req_keep_alive{
        optional uint32 uint32_ts = 1;
    }
    message Rsp_keep_alive{
        optional uint32 uint32_ts = 1;
    }
    
    • 用 protoc.exe 生成head_outer_class.proto和login_logic.proto的协议文件
    D:\protoc-2.6.1-win32\protoc-2.6.1-win32>protoc.exe ./head_outer_class.proto --java_out=./
    D:\protoc-2.6.1-win32\protoc-2.6.1-win32>protoc.exe ./login_logic.proto --java_out=./
    D:\protoc-2.6.1-win32\protoc-2.6.1-win32>protoc.exe ./login_proxy.proto --java_out=./
    
    • 实现代码
    package com.shawntime.common.socket;
    
    import java.io.OutputStream;
    
    /**
     * protobuf请求封装抽象类
     * @author admin
     *
     */
    public abstract class RequestHandle {
        private int seqId;
        public final void send(OutputStream outputStream) throws Exception {
            
            byte[] data = pack();
            
            outputStream.write(data);
            outputStream.flush();
        }
        
        /**
         * 将输入的数据打包成TCP通讯包,返回值: 打包好的TCP数据
         * @throws Exception 
         */
        private final byte[] pack() throws Exception {
            
            byte[] headBuf = packHead();
            byte[] bodyBuf = packBody();
            
            //生成sendBuf
            byte[] sendBuf = new byte[headBuf.length + bodyBuf.length + 10];
            sendBuf[0] = (byte) '(';
            writeInt(sendBuf, headBuf.length, 1);
            writeInt(sendBuf, bodyBuf.length, 5);
            System.arraycopy(headBuf, 0, sendBuf, 9, headBuf.length);
            System.arraycopy(bodyBuf, 0, sendBuf, headBuf.length + 9, bodyBuf.length);
            sendBuf[sendBuf.length - 1] = (byte) ')';
            return sendBuf;
        }
        
        //按小端模式写入int
        private final void writeInt(byte[] writeBuffer, int v, int pos) {
            writeBuffer[pos] = (byte) (v >>> 24);
            writeBuffer[pos + 1] = (byte) (v >>> 16);
            writeBuffer[pos + 2] = (byte) (v >>> 8);
            writeBuffer[pos + 3] = (byte) (v >>> 0);
        }
        
        protected abstract byte[] packBody();
        
        /**
         * 请求头内容
         * @return
         */
        protected abstract byte[] packHead();
    
        public int getSeqId() {
            return seqId;
        }
    
        public final void setSeqId(int seqId) {
            this.seqId = seqId;
        }
        
    }
    
    
    package com.shawntime.common.socket;
    
    import java.io.EOFException;
    import java.io.IOException;
    import java.io.InputStream;
    
    import com.shawntime.common.socket.pool.ProtoPack;
    
    /**
     * protobuf响应处理封装抽象类
     * 
     * @author admin
     *
     */
    public abstract class ResponseHandle<T> {
        
        private String ip;
        
        private int port;
        
        private int seqId;
        
        private final byte[] readIn(InputStream inputStream) throws IOException {
            int readBytes = 0;  
            byte[] buffer = new byte[1024];//1024可改成任何需要的值  
            int len = buffer.length;
            while (readBytes < len) {
                
                int read = inputStream.read(buffer, readBytes, len - readBytes);  
                
                //判断是不是读到了数据流的末尾 ,防止出现死循环。  
                if (read == -1 || read < (len - readBytes)) {
                    readBytes += read; 
                    break;
                }
                
                if(read == (len - readBytes)) {
                    byte[] tmpBuffer = new byte[len * 2];
                    System.arraycopy(buffer, 0, tmpBuffer, 0, buffer.length);
                    buffer = tmpBuffer;
                    len = buffer.length;
                }
              
                readBytes += read;
            }
            
            byte[] endodedData = new byte[readBytes];
            System.arraycopy(buffer, 0, endodedData, 0, readBytes);
            
            return endodedData;
        }
    
        public final ProtoPack unpack(InputStream inputStream) throws IOException {
            byte[] data = readIn(inputStream);
            byte[] cache = new byte[1024 * 16];
            int end = 0;
            
            System.arraycopy(data, 0, cache, end, data.length);
            end += data.length;
            while (end > 0) {
                if (end < 9) {
                    return null;
                }
                try {
                    int headLen = readInt(cache, 1);
                    int bodyLen = readInt(cache, 5);
                    if (end < 10 + headLen + bodyLen) {
                        return null;
                    }
    
                    byte[] headBuf = new byte[headLen];
                    byte[] bodyBuf = new byte[bodyLen];
                    System.arraycopy(cache, 9, headBuf, 0, headLen);
                    System.arraycopy(cache, 9 + headLen, bodyBuf, 0, bodyLen);
    
                    //数据前移
                    int frameLen = 10 + headLen + bodyLen;
                    int newEnd = end - frameLen;
                    if (newEnd > 0) {
                        System.arraycopy(cache, frameLen, cache, 0, newEnd);
                    }
                    end = newEnd;
                    
                    seqId = encodeHeaderReqId(headBuf);
                    if(seqId > 0) {
                        ProtoPack protoPack = new ProtoPack();
                        protoPack.setHeader(headBuf);
                        protoPack.setBody(bodyBuf);
                        protoPack.setSeqId(seqId);
                        return protoPack;
                    }
                }
                catch (IOException e) {
                    e.printStackTrace();
                    return null;
                }
            }
            return null;
        }
        
        //按小端模式读取一个int
        private final int readInt(byte[] readBuffer, int pos) throws IOException {
            if (readBuffer.length < pos + 4) {
                throw new EOFException();
            }
            return (((int) (readBuffer[pos] & 255) << 24) + ((readBuffer[pos + 1] & 255) << 16) + ((readBuffer[pos + 2] & 255) << 8) + ((readBuffer[pos + 3] & 255) << 0));
        }
        
        public int encodeHeaderReqId(byte[] headBuf) {
            
    //      try {
    //          HeadOuterClass.Head header = HeadOuterClass.Head.parseFrom(headBuf);
    //          int seqId = header.getMsgCsHead().getUint32Seq();
    //          return seqId;
    //      } catch (InvalidProtocolBufferException e) {
    //          e.printStackTrace();
    //      }
    //
            return -1;
            
        }
        
        public abstract void encode(byte[] headBuf, byte[] bodyBuf);
        
        public abstract T get();
    
        public String getIp() {
            return ip;
        }
    
        public void setIp(String ip) {
            this.ip = ip;
        }
    
        public int getPort() {
            return port;
        }
    
        public void setPort(int port) {
            this.port = port;
        }
    
        public int getSeqId() {
            return seqId;
        }
    
        public void setSeqId(int seqId) {
            this.seqId = seqId;
        }
        
    }
    
    package com.shawntime.common.socket.pool;
    
    import java.net.InetSocketAddress;
    import java.net.Socket;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.atomic.AtomicInteger;
    import java.util.concurrent.atomic.AtomicLong;
    
    import org.apache.commons.pool2.BasePooledObjectFactory;
    import org.apache.commons.pool2.PooledObject;
    import org.apache.commons.pool2.impl.DefaultPooledObject;
    import org.apache.log4j.Logger;
    
    /**
     * Socket连接创建工厂
     * @author shma
     *
     */
    public class SocketConnectionFactory extends BasePooledObjectFactory<Socket> {
    
        private static final Logger logger = Logger.getLogger(SocketConnectionFactory.class);
        
        private List<InetSocketAddress> socketAddress = null;
        
        private final AtomicInteger atomicIntCount;
        
        //自增,遍历标记位
        private final AtomicLong atomicLongTail;
        
        public SocketConnectionFactory(String hosts) {
            
            socketAddress = new ArrayList<InetSocketAddress>();
            
            String[] hostsAdd = hosts.split(";");
            if(hostsAdd.length > 0) {
                for(String tmpHost : hostsAdd) {
                    String[] dataStrings = tmpHost.split(":");
                    InetSocketAddress address = new InetSocketAddress(dataStrings[0], Integer.parseInt(dataStrings[1]));
                    socketAddress.add(address);
                }
            }
            
            atomicIntCount = new AtomicInteger(0);
            atomicLongTail = new AtomicLong(0);
        }
        
        private InetSocketAddress getSocketAddress() {
            int index = (int) (atomicLongTail.getAndIncrement() % socketAddress.size());
            logger.info("创建Socket>>>address:" + socketAddress.get(index).getHostName() + ", counter:" + atomicIntCount.incrementAndGet());
            
            return socketAddress.get(index);
        }
    
        @Override
        public void destroyObject(PooledObject<Socket> p) throws Exception {
            Socket socket = p.getObject();
            logger.info("销毁Socket>>>socket:" + socket + ", counter:" + atomicIntCount.decrementAndGet());
            if(socket != null) {
                socket.close();
            }
        }
    
        @Override
        public boolean validateObject(PooledObject<Socket> p) {
            
            Socket socket = p.getObject();
            if(socket != null) {
                if(!socket.isConnected()) { 
                    return false;
                }
                if(socket.isClosed()) {
                    return false;
                }
                
    //            LoginProxyRequest request = new LoginProxyRequest();
    //            LoginProxyResponse response = new LoginProxyResponse();
    //            SocketSession socketSession = PassportCommunication.getSocketSession();
    //            Optional<Boolean> optional = socketSession.send(request, response, socket);
                
                boolean state = false;
                
    //            if(optional.isPresent()) {
    //              state = optional.get();
    //            }
                
                logger.info("验证socket心跳>>>socket:" + socket + ", state:" + state);
                
                return state;
            }
            
            return false; 
        }
    
        @Override
        public Socket create() throws Exception {
            Socket socket = new Socket();
            socket.connect(getSocketAddress(), 30000);
            socket.setSoTimeout(10000);
            return socket;
        }
    
        @Override
        public PooledObject<Socket> wrap(Socket obj) {
            return new DefaultPooledObject<Socket>(obj);
        }   
        
    }
    
    
    package com.shawntime.common.socket.pool;
    
    
    import java.io.InputStream;
    import java.io.OutputStream;
    import java.net.Socket;
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicInteger;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    import com.shawntime.common.socket.RequestHandle;
    import com.shawntime.common.socket.ResponseHandle;
    import org.apache.commons.pool2.impl.GenericObjectPool;
    import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
    import org.apache.log4j.Logger;
    
    import com.google.common.base.Optional;
    
    /**
     * 连接会话
     * @author shma
     *
     */
    public class SocketSession {
    
        private final GenericObjectPool<Socket> pool;
        private static final Logger logger = Logger.getLogger(SocketSession.class);
        
        private static final ExecutorCompletionService<ProtoPack> COMPLETION_SERVICE;
        
        static {
            ExecutorService executorService = Executors.newCachedThreadPool();
            COMPLETION_SERVICE = new ExecutorCompletionService<ProtoPack>(executorService);
        }
        
        public SocketSession(GenericObjectPoolConfig config, String hosts) {
            SocketConnectionFactory factory = new SocketConnectionFactory(hosts); 
            pool = new GenericObjectPool<Socket>(factory, config);
        }
        
        public Socket getConnection() throws Exception {
            Socket socket = pool.borrowObject();
            return socket;
        }
        
        /**
         * 
         * 回收连接socket
         *
         * @author shma
         * @param socket
         * @since JDK 1.6
         */
        public void releaseConnection(Socket socket){  
            try {
                pool.returnObject(socket);
            } catch(Throwable e) {  
                if(socket != null){  
                    try{  
                        socket.close();
                    }catch(Exception ex){  
                        e.printStackTrace();
                    }  
                }  
            }  
        }
        
        public <T> Optional<T> send(final RequestHandle request, final ResponseHandle<T> response) {
             
            //获取socket
            Socket socket = null;
            try {
                socket = getConnection();
                return send(request, response, socket);
            } catch (Exception e) {
                logger.error("Get socket error, msg : " + e.getMessage());
                e.printStackTrace();
            } finally {
                if(socket != null) {
                    releaseConnection(socket);
                }
            }
            
            return Optional.fromNullable(null);
        }
        
        public <T> Optional<T> send(final RequestHandle request, final ResponseHandle<T> response, Socket socket) {
    
            final InputStream inputStream;
            final OutputStream outputStream;
            int incrementId = getDid();
            try {
                request.setSeqId(incrementId);
                //增加ip地址 端口信息
                response.setIp(socket.getInetAddress().getHostAddress());
                response.setPort(socket.getPort());
                inputStream = socket.getInputStream();
                outputStream = socket.getOutputStream();
                
                COMPLETION_SERVICE.submit(new Callable<ProtoPack>() {
                    
                    @Override
                    public ProtoPack call() throws Exception {
                        request.send(outputStream);
                        ProtoPack protoPack = response.unpack(inputStream);
                        System.out.println("callback seqid : " + protoPack.getSeqId());
                        return protoPack;
                    }
                }, incrementId);
                
                Future<ProtoPack> future = COMPLETION_SERVICE.poll(3, TimeUnit.SECONDS, incrementId);
                if(future != null) {
                    ProtoPack pack = future.get();
                    if(pack != null) {
                        response.encode(pack.getHeader(), pack.getBody());
                        return Optional.fromNullable(response.get());
                    }
                }
                
            } catch(RuntimeException e) {
                throw e;
            } catch (Exception e) {
                logger.error("SocketSession error, msg : " + e.getMessage());
                e.printStackTrace();
            } finally {
                COMPLETION_SERVICE.remove(incrementId);
            }
            
            return Optional.fromNullable(null);
        }
        
        private final AtomicInteger counter = new AtomicInteger(1);
        
        private final Lock lock = new ReentrantLock();
        
        private int getDid() {
    
            if(counter.get() == Integer.MAX_VALUE) {
                lock.lock();
                try {
                    if(counter.get() == Integer.MAX_VALUE) {
                        counter.set(1);
                    }
                } finally {
                    lock.unlock();
                }
            }
            
            int did = counter.getAndIncrement();
            
            return did;
            
        }
    }
    
    
    package com.shawntime.common.socket.pool;
    
    public class ProtoPack {
    private int seqId;
    private byte[] header;
    private byte[] body;
    public int getSeqId() {
        return seqId;
    }
    public void setSeqId(int seqId) {
        this.seqId = seqId;
    }
    public byte[] getHeader() {
        return header;
    }
    public void setHeader(byte[] header) {
        this.header = header;
    }
    public byte[] getBody() {
        return body;
    }
    public void setBody(byte[] body) {
        this.body = body;
    }
    
    }
    
    
    package com.shawntime.common.socket.pool;
    
    import java.util.concurrent.ExecutionException;
    
    
    /**
     * Exception thrown when attempting to retrieve the result of a task
     * that aborted by throwing an exception. This exception can be
     * inspected using the {@link #getCause()} method.
     *
     * @see Future
     * @since 1.5
     * @author Doug Lea
     */
    public class MyExecutionException extends ExecutionException {
        
    }
    
    package com.shawntime.common.socket.pool;
    
    import java.util.Map;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.Callable;
    import java.util.concurrent.Executor;
    import java.util.concurrent.Future;
    import java.util.concurrent.FutureTask;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.RunnableFuture;
    import java.util.concurrent.TimeUnit;
    
    import com.google.common.collect.Maps;
    
    public class ExecutorCompletionService<V> implements CompletionService<V> {
        private final Executor executor;
        private final AbstractExecutorService aes;
        private final Map<Integer, BlockingQueue<Future<V>>> completionQueueMap;
    
        private class QueueingFuture extends FutureTask<V> {
            QueueingFuture(RunnableFuture<V> task, int did) {
                super(task, null);
                this.task = task;
                this.did = did;
            }
            protected void done() { 
                BlockingQueue<Future<V>> blockingQueue = ExecutorCompletionService.this.completionQueueMap.get(did);
                blockingQueue.add(task);
            }
            private final Future<V> task;
            private final int did;
        }
    
        private RunnableFuture<V> newTaskFor(Callable<V> task) {
            if (aes == null)
                return new FutureTask<V>(task);
            else
                return aes.newTaskFor(task);
        }
    
        private RunnableFuture<V> newTaskFor(Runnable task, V result) {
            if (aes == null)
                return new FutureTask<V>(task, result);
            else
                return aes.newTaskFor(task, result);
        }
    
        /**
         * Creates an ExecutorCompletionService using the supplied
         * executor for base task execution and a
         * {@link LinkedBlockingQueue} as a completion queue.
         *
         * @param executor the executor to use
         * @throws NullPointerException if executor is {@code null}
         */
        public ExecutorCompletionService(Executor executor) {
            if (executor == null)
                throw new NullPointerException();
            this.executor = executor;
            this.aes = (executor instanceof AbstractExecutorService) ?
                (AbstractExecutorService) executor : null;
            this.completionQueueMap = Maps.newConcurrentMap();
        }
    
        /**
         * Creates an ExecutorCompletionService using the supplied
         * executor for base task execution and the supplied queue as its
         * completion queue.
         *
         * @param executor the executor to use
         * @param completionQueue the queue to use as the completion queue
         *        normally one dedicated for use by this service. This
         *        queue is treated as unbounded -- failed attempted
         *        {@code Queue.add} operations for completed taskes cause
         *        them not to be retrievable.
         * @throws NullPointerException if executor or completionQueue are {@code null}
         */
        public ExecutorCompletionService(Executor executor,
                                         BlockingQueue<Future<V>> completionQueue) {
            if (executor == null || completionQueue == null)
                throw new NullPointerException();
            this.executor = executor;
            this.aes = (executor instanceof AbstractExecutorService) ?
                (AbstractExecutorService) executor : null;
                this.completionQueueMap = Maps.newConcurrentMap();
        }
    
        public Future<V> submit(Callable<V> task, int did) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<V> f = newTaskFor(task);
            BlockingQueue<Future<V>> blockingQueue = new LinkedBlockingQueue<Future<V>>(1); 
            completionQueueMap.put(did, blockingQueue);
            executor.execute(new QueueingFuture(f, did));
            return f;
        }
    
        public Future<V> submit(Runnable task, V result, int did) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<V> f = newTaskFor(task, result);
            BlockingQueue<Future<V>> blockingQueue = new LinkedBlockingQueue<Future<V>>(1); 
            completionQueueMap.put(did, blockingQueue);
            executor.execute(new QueueingFuture(f, did));
            return f;
        }
    
        public Future<V> take(int did) throws InterruptedException {
            return completionQueueMap.get(did).take();
        }
    
        public Future<V> poll(int did) {
            return completionQueueMap.get(did).poll();
        }
    
        public Future<V> poll(long timeout, TimeUnit unit, int did)
                throws InterruptedException {
            return completionQueueMap.get(did).poll(timeout, unit);
        }
        
        public void remove(int did) {
            completionQueueMap.remove(did);
        }
    
    }
    
    package com.shawntime.common.socket.pool;
    
    import java.util.concurrent.Callable;
    import java.util.concurrent.Executor;
    import java.util.concurrent.Future;
    import java.util.concurrent.RejectedExecutionException;
    import java.util.concurrent.TimeUnit;
    
    
    /**
     * A service that decouples the production of new asynchronous tasks
     * from the consumption of the results of completed tasks.  Producers
     * <tt>submit</tt> tasks for execution. Consumers <tt>take</tt>
     * completed tasks and process their results in the order they
     * complete.  A <tt>CompletionService</tt> can for example be used to
     * manage asynchronous IO, in which tasks that perform reads are
     * submitted in one part of a program or system, and then acted upon
     * in a different part of the program when the reads complete,
     * possibly in a different order than they were requested.
     *
     * <p>Typically, a <tt>CompletionService</tt> relies on a separate
     * {@link Executor} to actually execute the tasks, in which case the
     * <tt>CompletionService</tt> only manages an internal completion
     * queue. The {@link ExecutorCompletionService} class provides an
     * implementation of this approach.
     *
     * <p>Memory consistency effects: Actions in a thread prior to
     * submitting a task to a {@code CompletionService}
     * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
     * actions taken by that task, which in turn <i>happen-before</i>
     * actions following a successful return from the corresponding {@code take()}.
     *
     */
    public interface CompletionService<V> {
        /**
         * Submits a value-returning task for execution and returns a Future
         * representing the pending results of the task.  Upon completion,
         * this task may be taken or polled.
         *
         * @param task the task to submit
         * @return a Future representing pending completion of the task
         * @throws RejectedExecutionException if the task cannot be
         *         scheduled for execution
         * @throws NullPointerException if the task is null
         */
        Future<V> submit(Callable<V> task, int did);
    
        /**
         * Submits a Runnable task for execution and returns a Future
         * representing that task.  Upon completion, this task may be
         * taken or polled.
         *
         * @param task the task to submit
         * @param result the result to return upon successful completion
         * @return a Future representing pending completion of the task,
         *         and whose <tt>get()</tt> method will return the given
         *         result value upon completion
         * @throws RejectedExecutionException if the task cannot be
         *         scheduled for execution
         * @throws NullPointerException if the task is null
         */
        Future<V> submit(Runnable task, V result, int did);
    
        /**
         * Retrieves and removes the Future representing the next
         * completed task, waiting if none are yet present.
         *
         * @return the Future representing the next completed task
         * @throws InterruptedException if interrupted while waiting
         */
        Future<V> take(int did) throws InterruptedException;
    
    
        /**
         * Retrieves and removes the Future representing the next
         * completed task or <tt>null</tt> if none are present.
         *
         * @return the Future representing the next completed task, or
         *         <tt>null</tt> if none are present
         */
        Future<V> poll(int did);
    
        /**
         * Retrieves and removes the Future representing the next
         * completed task, waiting if necessary up to the specified wait
         * time if none are yet present.
         *
         * @param timeout how long to wait before giving up, in units of
         *        <tt>unit</tt>
         * @param unit a <tt>TimeUnit</tt> determining how to interpret the
         *        <tt>timeout</tt> parameter
         * @return the Future representing the next completed task or
         *         <tt>null</tt> if the specified waiting time elapses
         *         before one is present
         * @throws InterruptedException if interrupted while waiting
         */
        Future<V> poll(long timeout, TimeUnit unit, int did) throws InterruptedException;
    }
    
    package com.shawntime.common.socket.pool;
    
    import java.util.ArrayList;
    import java.util.Collection;
    import java.util.Iterator;
    import java.util.List;
    import java.util.concurrent.Callable;
    import java.util.concurrent.CancellationException;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Future;
    import java.util.concurrent.FutureTask;
    import java.util.concurrent.RejectedExecutionException;
    import java.util.concurrent.RunnableFuture;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.TimeoutException;
    
    /**
     * Provides default implementations of {@link ExecutorService}
     * execution methods. This class implements the <tt>submit</tt>,
     * <tt>invokeAny</tt> and <tt>invokeAll</tt> methods using a
     * {@link RunnableFuture} returned by <tt>newTaskFor</tt>, which defaults
     * to the {@link FutureTask} class provided in this package.  For example,
     * the implementation of <tt>submit(Runnable)</tt> creates an
     * associated <tt>RunnableFuture</tt> that is executed and
     * returned. Subclasses may override the <tt>newTaskFor</tt> methods
     * to return <tt>RunnableFuture</tt> implementations other than
     * <tt>FutureTask</tt>.
     *
     * <p> <b>Extension example</b>. Here is a sketch of a class
     * that customizes {@link ThreadPoolExecutor} to use
     * a <tt>CustomTask</tt> class instead of the default <tt>FutureTask</tt>:
     *  <pre> {@code
     * public class CustomThreadPoolExecutor extends ThreadPoolExecutor {
     *
     *   static class CustomTask<V> implements RunnableFuture<V> {...}
     *
     *   protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
     *       return new CustomTask<V>(c);
     *   }
     *   protected <V> RunnableFuture<V> newTaskFor(Runnable r, V v) {
     *       return new CustomTask<V>(r, v);
     *   }
     *   // ... add constructors, etc.
     * }}</pre>
     *
     * @since 1.5
     * @author Doug Lea
     */
    public abstract class AbstractExecutorService implements ExecutorService {
    
        /**
         * Returns a <tt>RunnableFuture</tt> for the given runnable and default
         * value.
         *
         * @param runnable the runnable task being wrapped
         * @param value the default value for the returned future
         * @return a <tt>RunnableFuture</tt> which when run will run the
         * underlying runnable and which, as a <tt>Future</tt>, will yield
         * the given value as its result and provide for cancellation of
         * the underlying task.
         * @since 1.6
         */
        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return new FutureTask<T>(runnable, value);
        }
    
        /**
         * Returns a <tt>RunnableFuture</tt> for the given callable task.
         *
         * @param callable the callable task being wrapped
         * @return a <tt>RunnableFuture</tt> which when run will call the
         * underlying callable and which, as a <tt>Future</tt>, will yield
         * the callable's result as its result and provide for
         * cancellation of the underlying task.
         * @since 1.6
         */
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return new FutureTask<T>(callable);
        }
    
        /**
         * @throws RejectedExecutionException {@inheritDoc}
         * @throws NullPointerException       {@inheritDoc}
         */
        public Future<?> submit(Runnable task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<Void> ftask = newTaskFor(task, null);
            execute(ftask);
            return ftask;
        }
    
        /**
         * @throws RejectedExecutionException {@inheritDoc}
         * @throws NullPointerException       {@inheritDoc}
         */
        public <T> Future<T> submit(Runnable task, T result) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task, result);
            execute(ftask);
            return ftask;
        }
    
        /**
         * @throws RejectedExecutionException {@inheritDoc}
         * @throws NullPointerException       {@inheritDoc}
         */
        public <T> Future<T> submit(Callable<T> task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task);
            execute(ftask);
            return ftask;
        }
    
        /**
         * the main mechanics of invokeAny.
         */
        private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                                boolean timed, long nanos, int did)
            throws InterruptedException, ExecutionException, TimeoutException {
            if (tasks == null)
                throw new NullPointerException();
            int ntasks = tasks.size();
            if (ntasks == 0)
                throw new IllegalArgumentException();
            List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);
            ExecutorCompletionService<T> ecs =
                new ExecutorCompletionService<T>(this);
    
            // For efficiency, especially in executors with limited
            // parallelism, check to see if previously submitted tasks are
            // done before submitting more of them. This interleaving
            // plus the exception mechanics account for messiness of main
            // loop.
    
            try {
                // Record exceptions so that if we fail to obtain any
                // result, we can throw the last exception we got.
                ExecutionException ee = null;
                long lastTime = timed ? System.nanoTime() : 0;
                Iterator<? extends Callable<T>> it = tasks.iterator();
    
                // Start one task for sure; the rest incrementally
                futures.add(ecs.submit(it.next(), did));
                --ntasks;
                int active = 1;
    
                for (;;) {
                    Future<T> f = ecs.poll(did);
                    if (f == null) {
                        if (ntasks > 0) {
                            --ntasks;
                            futures.add(ecs.submit(it.next(), did));
                            ++active;
                        }
                        else if (active == 0)
                            break;
                        else if (timed) {
                            f = ecs.poll(nanos, TimeUnit.NANOSECONDS, did);
                            if (f == null)
                                throw new TimeoutException();
                            long now = System.nanoTime();
                            nanos -= now - lastTime;
                            lastTime = now;
                        }
                        else
                            f = ecs.take(did);
                    }
                    if (f != null) {
                        --active;
                        try {
                            return f.get();
                        } catch (ExecutionException eex) {
                            ee = eex;
                        } catch (RuntimeException rex) {
                            ee = new ExecutionException(rex);
                        }
                    }
                }
    
                if (ee == null)
                    ee = new MyExecutionException();
                throw ee;
    
            } finally {
                for (Future<T> f : futures)
                    f.cancel(true);
            }
        }
    
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks, int did)
            throws InterruptedException, ExecutionException {
            try {
                return doInvokeAny(tasks, false, 0, did);
            } catch (TimeoutException cannotHappen) {
                assert false;
                return null;
            }
        }
    
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                               long timeout, TimeUnit unit, int did)
            throws InterruptedException, ExecutionException, TimeoutException {
            return doInvokeAny(tasks, true, unit.toNanos(timeout), did);
        }
    
        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException {
            if (tasks == null)
                throw new NullPointerException();
            List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
            boolean done = false;
            try {
                for (Callable<T> t : tasks) {
                    RunnableFuture<T> f = newTaskFor(t);
                    futures.add(f);
                    execute(f);
                }
                for (Future<T> f : futures) {
                    if (!f.isDone()) {
                        try {
                            f.get();
                        } catch (CancellationException ignore) {
                        } catch (ExecutionException ignore) {
                        }
                    }
                }
                done = true;
                return futures;
            } finally {
                if (!done)
                    for (Future<T> f : futures)
                        f.cancel(true);
            }
        }
    
        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                             long timeout, TimeUnit unit)
            throws InterruptedException {
            if (tasks == null || unit == null)
                throw new NullPointerException();
            long nanos = unit.toNanos(timeout);
            List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
            boolean done = false;
            try {
                for (Callable<T> t : tasks)
                    futures.add(newTaskFor(t));
    
                long lastTime = System.nanoTime();
    
                // Interleave time checks and calls to execute in case
                // executor doesn't have any/much parallelism.
                Iterator<Future<T>> it = futures.iterator();
                while (it.hasNext()) {
                    execute((Runnable)(it.next()));
                    long now = System.nanoTime();
                    nanos -= now - lastTime;
                    lastTime = now;
                    if (nanos <= 0)
                        return futures;
                }
    
                for (Future<T> f : futures) {
                    if (!f.isDone()) {
                        if (nanos <= 0)
                            return futures;
                        try {
                            f.get(nanos, TimeUnit.NANOSECONDS);
                        } catch (CancellationException ignore) {
                        } catch (ExecutionException ignore) {
                        } catch (TimeoutException toe) {
                            return futures;
                        }
                        long now = System.nanoTime();
                        nanos -= now - lastTime;
                        lastTime = now;
                    }
                }
                done = true;
                return futures;
            } finally {
                if (!done)
                    for (Future<T> f : futures)
                        f.cancel(true);
            }
        }
    
    }
    
    package com.shawntime.common.socket.pool.communication;
    
    import com.shawntime.common.config.PropertyConfigurer;
    import com.shawntime.common.socket.pool.SocketSession;
    import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
    
    /**
     * 普通模式
     * 2015-10-19 16:46:19
     * @author admin
     * @version 
     * @since JDK 1.6
     */
    public class PassportCommunication {
    
        private static final SocketSession SOCKET_SESSION;
        
        static {
            GenericObjectPoolConfig config = new GenericObjectPoolConfig();
            config.setMaxIdle(Integer.parseInt(PropertyConfigurer.getString("passport_maxIdle")));
            config.setMaxWaitMillis(Integer.parseInt(PropertyConfigurer.getString("passport_maxWait")));
            config.setMinEvictableIdleTimeMillis(Integer.parseInt(PropertyConfigurer.getString("passport_minEvictableIdleTimeMillis")));
            config.setMinIdle(Integer.parseInt(PropertyConfigurer.getString("passport_minIdle")));
            config.setTestOnBorrow(Boolean.valueOf(PropertyConfigurer.getString("passport_testOnBorrow")));
            config.setTestOnCreate(Boolean.valueOf(PropertyConfigurer.getString("passport_testOnCreate")));
            config.setTestOnReturn(Boolean.valueOf(PropertyConfigurer.getString("passport_testOnReturn")));
            config.setTestWhileIdle(Boolean.valueOf(PropertyConfigurer.getString("passport_testWhileIdle")));
            config.setTimeBetweenEvictionRunsMillis(Integer.parseInt(PropertyConfigurer.getString("passport_timeBetweenEvictionRunsMillis")));
            config.setMaxTotal(Integer.parseInt(PropertyConfigurer.getString("passport_maxTotal")));
            config.setNumTestsPerEvictionRun(Integer.parseInt(PropertyConfigurer.getString("passport_numTestsPerEvictionRun")));
            config.setLifo(Boolean.valueOf(PropertyConfigurer.getString("passport_lifo")));
            
            String normalHosts = PropertyConfigurer.getString("passport_server_info");
            SOCKET_SESSION = new SocketSession(config, normalHosts);
        }
    
        public static SocketSession getSocketSession() {
            return SOCKET_SESSION;
        }
    
    }
    

    相关文章

      网友评论

        本文标题:Google Protobuf 协议+Socket实现异步登录

        本文链接:https://www.haomeiwen.com/subject/rxkrnttx.html