美文网首页Android 开发进阶
在Android项目中原生写游戏APP,使用Java-WebSo

在Android项目中原生写游戏APP,使用Java-WebSo

作者: JeffreyWorld | 来源:发表于2021-02-01 18:47 被阅读0次

    基于自己参与所开发的游戏项目用到的 Java-WebSocket,做一些经验总结。
    本文相关基础代码依赖于:
    https://github.com/TooTallNate/Java-WebSocket
    https://github.com/0xZhangKe/WebSocketDemo
    项目里使用 Java-WebSocket + Protocol Buffers,做的基础实现,游戏场景里,当前用户只存在一个游戏房间。没有IM的那种最新会话列表里面,会有多个聊天会话(群聊、单聊),要维护多个聊天室或房间内的场景。

    1.关于心跳的维护

    每隔15s(相关参数数据以自己项目里的设置来说的,本文后面的数据也一样)向服务器发送一次心跳类型请求。服务器那边也需要知道客户端这边的心跳维护请求,一直没有断开,如果服务器连续30s收不到心跳,认为用户离线。
    比如,在房间内游戏里的玩家,就会在UI展现上看到对方离线的标记。

    /**
     * 心跳管理工具类
     */
    public class HeartTimerUtils {
        private static final HeartTimerUtils ourInstance = new HeartTimerUtils();
        private Timer timer;
        private TimerTask timerTask;
        private HeartTimerListener listener;
        public static HeartTimerUtils getInstance() {
            return ourInstance;
        }
    
        private HeartTimerUtils() {
        }
        public void startHeartData() {
            stopTimer();
            if(timer == null){
                timer = new Timer();
                timerTask = new TimerTask() {
                    @Override
                    public void run() {
                        if(listener != null){
                            listener.sendHeartData();
                        }
                    }
                };
                timer.schedule(timerTask,1000,15000);//延时1s,每隔15000毫秒执行一次run方法
            }
        }
        public void stopTimer(){
            if (timer != null) {
                timer.cancel();
                timer = null;
            }
            if (timerTask != null) {
                timerTask.cancel();
                timerTask = null;
            }
        }
    
        public void setListener(HeartTimerListener listener) {
            this.listener = listener;
        }
    
        public interface HeartTimerListener{
            void sendHeartData();
        }
        public void closeTimer(){
            stopTimer();
            listener = null;
        }
    
    }
    
    /**
     * 已经绑定了 WebSocketService 服务的 Activity
     */
    public abstract class WebSocketActivity extends BaseActivity implements HeartTimerUtils.HeartTimerListener {
        ...省略
        @Override
        protected void onCreate(@Nullable Bundle savedInstanceState) {
            super.onCreate(savedInstanceState);
            if (!MainApplication.getInstance().isWebSocketManagerConnected()) {
                MainApplication.getInstance().initWebSocket();
            } else {
                HeartTimerUtils.getInstance().stopTimer();
                onConnectedEvent();
            }
            HeartTimerUtils.getInstance().setListener(this);
            try {
                if (WebSocketHandler.getDefault() != null) {
                    WebSocketHandler.getDefault().addListener(socketListener);
                }
            } catch (Exception e) {
            }
        }
    
        /**
         * 发送自定义类型proto数据(二进制)
         */
        public void sendData(byte[] text) {
            if (WebSocketHandler.getDefault() != null) {
                WebSocketHandler.getDefault().send(text);
            }
        }
    
        /**
         * WebSocket 连接成功,向服务器发送登录类型事件请求
         */
        public void onConnectedEvent() {
            LoginProtocalRequest request = new LoginProtocalRequest();
            int userId;
            if (!TextUtils.isEmpty(CommonUtil.getUid())) {
                userId = Integer.parseInt(CommonUtil.getUid());
            } else {
                userId = 0;
            }
            LoginRequest req = LoginRequest.newBuilder().setToken(Md5Utils.encrypt(CommonUtil.getAccessToken())).setUserId(userId).setRoomId(roomId).build();
            request.op = ProtocolMap.LOGIN_OP_REQ;
            request.data = req.toByteArray();
            sendData(request.getByteData());
        }
    
        @Override
        protected void onDestroy() {
            super.onDestroy();
            if (WebSocketHandler.getDefault() != null) {
                WebSocketHandler.getDefault().removeListener(socketListener);
            }
        }
    
        public void destoryWebSocket() {
            HeartTimerUtils.getInstance().stopTimer();
            if (WebSocketHandler.getDefault() != null) {
                WebSocketHandler.getDefault().disConnect();
                WebSocketHandler.getDefault().destroy();
            }
        }
    
        @Override
        public void sendHeartData() {
            BaseProtocol protocol = new BaseProtocol();
            protocol.op = ProtocolMap.SOCKET_HEART;
            sendData(protocol.getByteData());
        }
    
        ...省略
    }
    

    所用到具体业务的 Activity,只需要继承 WebSocketActivity,去实现自己发消息和收消息的业务。
    Websocket本身有封装好 ping/pong 事件来维护心跳,通过发送 ping/pong 来确保连接的可用,客户端发送 ping 帧,服务端响应 pong 帧。只是我们项目里没用这个来维护,自己写的逻辑来维护的。

    /**
     * 接收到 Ping 数据
     */
    public class PingResponse implements Response<Framedata> {
    
        private Framedata framedata;
    
        PingResponse() {
        }
    
        @Override
        public Framedata getResponseData() {
            return framedata;
        }
    
        @Override
        public void setResponseData(Framedata responseData) {
            this.framedata = responseData;
        }
    
        @Override
        public void onResponse(IResponseDispatcher dispatcher, ResponseDelivery delivery) {
            dispatcher.onPing(framedata, delivery);
        }
    
        @Override
        public void release() {
            framedata = null;
            ResponseFactory.releasePingResponse(this);
        }
    
        @Override
        public String toString() {
            return String.format("[@PingResponse%s->Framedata:%s]",
                    hashCode(),
                    framedata == null ?
                            "null" :
                            framedata.toString());
        }
    
    /**
     * 接收到 Pong
     */
    public class PongResponse implements Response<Framedata> {
    
        private Framedata framedata;
    
        PongResponse() {
        }
    
        @Override
        public Framedata getResponseData() {
            return framedata;
        }
    
        @Override
        public void setResponseData(Framedata responseData) {
            this.framedata = responseData;
        }
    
        @Override
        public void onResponse(IResponseDispatcher dispatcher, ResponseDelivery delivery) {
            dispatcher.onPong(this.framedata, delivery);
        }
    
        @Override
        public void release() {
            framedata = null;
            ResponseFactory.releasePongResponse(this);
        }
    
        @Override
        public String toString() {
            return String.format("[@PongResponse%s->Framedata:%s]",
                    hashCode(),
                    framedata == null ?
                            "null" :
                            framedata.toString());
        }
    }
    
    /**
     * 创建 {@link Response} 的工厂类
     */
    public class ResponseFactory {
    
        private static final int POOL_SIZE = 7;
        private static Queue<ErrorResponse> ERROR_RESPONSE_POOL = new ArrayDeque<>(POOL_SIZE);
        private static Queue<TextResponse> TEXT_RESPONSE_POOL = new ArrayDeque<>(POOL_SIZE);
        private static Queue<ByteBufferResponse> BYTE_BUFFER_RESPONSE_POOL = new ArrayDeque<>(POOL_SIZE);
        private static Queue<PingResponse> PING_RESPONSE_POOL = new ArrayDeque<>(POOL_SIZE);
        private static Queue<PongResponse> PONG_RESPONSE_POOL = new ArrayDeque<>(POOL_SIZE);
    
        public static ErrorResponse createErrorResponse(){
            ErrorResponse response = ERROR_RESPONSE_POOL.poll();
            if(response == null){
                response = new ErrorResponse();
            }
            return response;
        }
    
        public static Response<String> createTextResponse() {
            Response<String> response = TEXT_RESPONSE_POOL.poll();
            if (response == null) {
                response = new TextResponse();
            }
            return response;
        }
    
        public static Response<ByteBuffer> createByteBufferResponse() {
            Response<ByteBuffer> response = BYTE_BUFFER_RESPONSE_POOL.poll();
            if (response == null) {
                response = new ByteBufferResponse();
            }
            return response;
        }
    
        public static Response<Framedata> createPingResponse() {
            Response<Framedata> response = PING_RESPONSE_POOL.poll();
            if (response == null) {
                response = new PingResponse();
            }
            return response;
        }
    
        public static Response<Framedata> createPongResponse() {
            Response<Framedata> response = PONG_RESPONSE_POOL.poll();
            if (response == null) {
                response = new PongResponse();
            }
            return response;
        }
    
        static void releaseErrorResponse(ErrorResponse response){
            ERROR_RESPONSE_POOL.offer(response);
        }
    
        static void releaseTextResponse(TextResponse response) {
            TEXT_RESPONSE_POOL.offer(response);
        }
    
        static void releaseByteBufferResponse(ByteBufferResponse response) {
            BYTE_BUFFER_RESPONSE_POOL.offer(response);
        }
    
        static void releasePingResponse(PingResponse response) {
            PING_RESPONSE_POOL.offer(response);
        }
    
        static void releasePongResponse(PongResponse response) {
            PONG_RESPONSE_POOL.offer(response);
        }
    
    }
    

    2.关于断线重连

    断线重连,分为几种情况:
    一、客户端处于无网状态或弱网的情况,这时断开了。当客户端监听到网络恢复,重新给 socket 重连上;
    二、当客户端遇到弱网的情况,导致 socket 连接断开 onDisconnect ,此时网络是连接的状态。这时客户端发消息和收消息都会失败。客户端重连可能也连接不上,这时需要客户端多次重连网络恢复到正常时,才能连接成功。
    三、客户端发送协议消息是,发送失败了 onSendDataError 时,来触发重连操作。

    /**
     * 重连接口
     */
    public interface ReconnectManager {
    
        /**
         * 是否正在重连
         */
        boolean reconnecting();
    
        /**
         * 开始重连
         */
        void startReconnect();
    
        /**
         * 停止重连
         */
        void stopReconnect();
    
        /**
         * 连接成功
         */
        void onConnected();
    
        /**
         * 连接失败
         *
         * @param th 失败原因
         */
        void onConnectError(Throwable th);
    
        /**
         * 销毁资源
         */
        void destroy();
    
        /**
         * 连接成功或失败事件
         */
        interface OnConnectListener {
            void onConnected();
    
            void onDisconnect();
        }
    }
    
    
    /**
     * WebSocket 管理类
     */
    public class WebSocketManager {
    
        private static final String TAG = "WSManager";
    
        private WebSocketSetting mSetting;
    
        private WebSocketWrapper mWebSocket;
    
        /**
         * 注册的监听器集合
         */
        private ResponseDelivery mDelivery;
        private ReconnectManager mReconnectManager;
    
        private SocketWrapperListener mSocketWrapperListener;
        /**
         * 当前是否已销毁
         */
        private boolean destroyed = false;
        /**
         * 用户调用了 disconnect 方法后为 true
         */
        private boolean disconnect = false;
    
        private WebSocketEngine mWebSocketEngine;
        private ResponseProcessEngine mResponseProcessEngine;
    
        WebSocketManager(WebSocketSetting setting,
                         WebSocketEngine webSocketEngine,
                         ResponseProcessEngine responseProcessEngine) {
            this.mSetting = setting;
            this.mWebSocketEngine = webSocketEngine;
            this.mResponseProcessEngine = responseProcessEngine;
    
            mDelivery = mSetting.getResponseDelivery();
            if (mDelivery == null) {
                mDelivery = new MainThreadResponseDelivery();
            }
            mSocketWrapperListener = getSocketWrapperListener();
            if (mWebSocket == null) {
                mWebSocket = new WebSocketWrapper(this.mSetting, mSocketWrapperListener);
            }
            start();
        }
    
        /**
         * 启动,调用此方法开始连接
         */
        public WebSocketManager start() {
            if (mWebSocket == null) {
                mWebSocket = new WebSocketWrapper(this.mSetting, mSocketWrapperListener);
            }
            if (mWebSocket.getConnectState() == 0) {
                reconnect();
            }
            return this;
        }
    
        /**
         * WebSocket 是否已连接
         */
        public boolean isConnect() {
            return mWebSocket != null && mWebSocket.getConnectState() == 2;
        }
    
        /**
         * 设置重连管理类。
         * 用户可根据需求设置自己的重连管理类,只需要实现接口即可
         */
        public void setReconnectManager(ReconnectManager reconnectManager) {
            this.mReconnectManager = reconnectManager;
        }
    
        /**
         * 通过 {@link ReconnectManager} 开始重接
         */
        public WebSocketManager reconnect() {
            disconnect = false;
            if (mReconnectManager == null) {
                mReconnectManager = getDefaultReconnectManager();
            }
            if (!mReconnectManager.reconnecting()) {
                mReconnectManager.startReconnect();
            }
            return this;
        }
    
        /**
         * 使用新的 Setting 重新创建连接,同时会销毁之前的连接
         */
        public WebSocketManager reconnect(WebSocketSetting setting) {
            disconnect = false;
            if (destroyed) {
                LogUtil.e(TAG, "This WebSocketManager is destroyed!");
                return this;
            }
            this.mSetting = setting;
            if (mWebSocket != null) {
                mWebSocket.destroy();
                mWebSocket = null;
            }
            start();
            return this;
        }
    
        /**
         * 断开连接,断开后可使用 {@link this#reconnect()} 方法重新建立连接
         */
        public WebSocketManager disConnect() {
            disconnect = true;
            if (destroyed) {
                LogUtil.e(TAG, "This WebSocketManager is destroyed!");
                return this;
            }
            if (mWebSocket.getConnectState() != 0) {
                mWebSocketEngine.disConnect(mWebSocket, mSocketWrapperListener);
            }
            return this;
        }
    
        /**
         * 发送文本数据
         */
        public void send(String text) {
            if (TextUtils.isEmpty(text)) {
                return;
            }
            Request<String> request = RequestFactory.createStringRequest();
            request.setRequestData(text);
            sendRequest(request);
        }
    
        /**
         * 发送 byte[] 数据
         */
        public void send(byte[] bytes) {
            if (bytes == null || bytes.length == 0) {
                return;
            }
            Request<byte[]> request = RequestFactory.createByteArrayRequest();
            request.setRequestData(bytes);
            sendRequest(request);
        }
    
        /**
         * 发送 ByteBuffer 数据
         */
        public void send(ByteBuffer byteBuffer) {
            if (byteBuffer == null) {
                return;
            }
            Request<ByteBuffer> request = RequestFactory.createByteBufferRequest();
            request.setRequestData(byteBuffer);
            sendRequest(request);
        }
    
        /**
         * 发送 Ping
         */
        public void sendPing() {
            sendRequest(RequestFactory.createPingRequest());
        }
    
        /**
         * 发送 Pong
         */
        public void sendPong() {
            sendRequest(RequestFactory.createPongRequest());
        }
    
        /**
         * 发送 Pong
         */
        public void sendPong(PingFrame pingFrame) {
            if (pingFrame == null) {
                return;
            }
            Request<PingFrame> request = RequestFactory.createPongRequest();
            request.setRequestData(pingFrame);
            sendRequest(request);
        }
    
        /**
         * 发送 {@link Framedata}
         */
        public void sendFrame(Framedata framedata) {
            if (framedata == null) {
                return;
            }
            Request<Framedata> request = RequestFactory.createFrameDataRequest();
            request.setRequestData(framedata);
            sendRequest(request);
        }
    
        /**
         * 发送 {@link Framedata} 集合
         */
        public void sendFrame(Collection<Framedata> frameData) {
            if (frameData == null) {
                return;
            }
            Request<Collection<Framedata>> request = RequestFactory.createCollectionFrameRequest();
            request.setRequestData(frameData);
            sendRequest(request);
        }
    
        /**
         * 添加一个监听器,使用完成之后需要调用
         * {@link #removeListener(SocketListener)} 方法移除监听器
         */
        public WebSocketManager addListener(SocketListener listener) {
            mDelivery.addListener(listener);
            return this;
        }
    
        /**
         * 移除一个监听器
         */
        public WebSocketManager removeListener(SocketListener listener) {
            mDelivery.removeListener(listener);
            return this;
        }
    
        /**
         * 获取配置类,
         * 部分参数支持动态设定。
         */
        public WebSocketSetting getSetting() {
            return mSetting;
        }
    
        /**
         * 彻底销毁该连接,销毁后改连接完全失效,
         * 请勿使用其他方法。
         */
        public void destroy() {
            destroyed = true;
            if (mWebSocket != null) {
                mWebSocketEngine.destroyWebSocket(mWebSocket);
                mWebSocketEngine = null;
                mWebSocket = null;
            }
            if (mDelivery != null) {
                if (!mDelivery.isEmpty()) {
                    mDelivery.clear();
                }
                mDelivery = null;
            }
            if (mReconnectManager != null) {
                if (mReconnectManager.reconnecting()) {
                    mReconnectManager.stopReconnect();
                }
                mReconnectManager = null;
            }
            WebSocketHandler.clearDefaultWebSocket();
        }
    
        /**
         * 重新连接一次,
         * for {@link ReconnectManager}
         */
        void reconnectOnce() {
            if (destroyed) {
                LogUtil.e(TAG, "This WebSocketManager is destroyed!");
                return;
            }
            if (mWebSocket.getConnectState() == 0) {
                mWebSocketEngine.connect(mWebSocket, mSocketWrapperListener);
            } else {
                if (mReconnectManager != null) {
                    mReconnectManager.onConnected();
                }
                LogUtil.e(TAG, "WebSocket 已连接,请勿重试。");
            }
        }
    
        /**
         * 发送数据
         */
        private void sendRequest(Request request) {
            if (destroyed) {
                LogUtil.e(TAG, "This WebSocketManager is destroyed!");
                return;
            }
            LogUtil.d(TAG, "mWebSocketEngine.sendRequest");
            mWebSocketEngine.sendRequest(mWebSocket, request, mSocketWrapperListener);
        }
    
        /**
         * 获取默认的重连器
         */
        private ReconnectManager getDefaultReconnectManager() {
            return new DefaultReconnectManager(this, new ReconnectManager.OnConnectListener() {
                @Override
                public void onConnected() {
                    LogUtil.i(TAG, "重连成功");
         //         mSetting.getResponseDispatcher()
         //                .onConnected(mDelivery);
                }
    
                @Override
                public void onDisconnect() {
                    LogUtil.i(TAG, "重连失败");
                    mSetting.getResponseDispatcher()
                            .onDisconnect(mDelivery);
                }
            });
        }
    
        /**
         * 获取监听器
         */
        private SocketWrapperListener getSocketWrapperListener() {
            return new SocketWrapperListener() {
                @Override
                public void onConnected() {
                    if (mReconnectManager != null) {
                        mReconnectManager.onConnected();
                    }
                    mSetting.getResponseDispatcher()
                            .onConnected(mDelivery);
                }
    
                @Override
                public void onConnectFailed(Throwable e) {
                    //if reconnecting,interrupt this event for ReconnectManager.
                    if (mReconnectManager != null &&
                            mReconnectManager.reconnecting()) {
                        mReconnectManager.onConnectError(e);
                    }
                    mSetting.getResponseDispatcher()
                            .onConnectFailed(e, mDelivery);
                }
    
                @Override
                public void onDisconnect() {
                    LogUtil.i(TAG, "onDisconnect 开始重连");
    //                mSetting.getResponseDispatcher()
    //                        .onDisconnect(mDelivery);
                    if (mReconnectManager != null &&
                            mReconnectManager.reconnecting()) {
                        if (disconnect) {
                            mSetting.getResponseDispatcher()
                                    .onDisconnect(mDelivery);
                        } else {
                            mReconnectManager.onConnectError(null);
                        }
                    } else {
                        if (!disconnect) {
                            if (mReconnectManager == null) {
                                mReconnectManager = getDefaultReconnectManager();
                            }
                            mReconnectManager.onConnectError(null);
                            mReconnectManager.startReconnect();
                        }
                    }
                }
                @Override
                public void onSendDataError(Request request, int type, Throwable tr) {
                    ErrorResponse errorResponse = ResponseFactory.createErrorResponse();
                    errorResponse.init(request, type, tr);
                    if (mSetting.processDataOnBackground()) {
                        mResponseProcessEngine
                                .onSendDataError(errorResponse,
                                        mSetting.getResponseDispatcher(),
                                        mDelivery);
                    } else {
                        mSetting.getResponseDispatcher().onSendDataError(errorResponse, mDelivery);
                    }
                    if (!disconnect && type == ErrorResponse.ERROR_NO_CONNECT) {
                        LogUtil.e(TAG, "数据发送失败,网络未连接,开始重连。。。");
                        reconnect();
                    }
                }
    
                @Override
                public void onMessage(Response message) {
                    if (mSetting.processDataOnBackground()) {
                        mResponseProcessEngine
                                .onMessageReceive(message,
                                        mSetting.getResponseDispatcher(),
                                        mDelivery);
                    } else {
                        message.onResponse(mSetting.getResponseDispatcher(), mDelivery);
                    }
                }
            };
        }
    
    /**
     * 监听网络变化广播,网络变化时自动重连
     */
    public class NetworkChangedReceiver extends BroadcastReceiver {
    
        private static final String TAG = "WSNetworkReceiver";
    
        public NetworkChangedReceiver() {
        }
    
        @Override
        public void onReceive(Context context, Intent intent) {
            if (ConnectivityManager.CONNECTIVITY_ACTION.equals(intent.getAction())) {
                ConnectivityManager manager = (ConnectivityManager) context
                        .getSystemService(Context.CONNECTIVITY_SERVICE);
                if (manager == null) return;
                try {
                    if (PermissionUtil.checkPermission(context, Manifest.permission.ACCESS_NETWORK_STATE)) {
                        NetworkInfo activeNetwork = manager.getActiveNetworkInfo();
                        if (activeNetwork != null) {
                            if (activeNetwork.isConnected()) {
                                if (activeNetwork.getType() == ConnectivityManager.TYPE_WIFI) {
                                    LogUtil.i(TAG, "网络连接发生变化,当前WiFi连接可用,正在尝试重连。");
                                } else if (activeNetwork.getType() == ConnectivityManager.TYPE_MOBILE) {
                                    LogUtil.i(TAG, "网络连接发生变化,当前移动连接可用,正在尝试重连。");
                                }
                                if (WebSocketHandler.getDefault() != null) {
                                    if (WebSocketHandler.getDefault().getSetting().reconnectWithNetworkChanged()) {
                                        WebSocketHandler.getDefault().reconnect();
                                    }
                                }
                                if (!WebSocketHandler.getAllWebSocket().isEmpty()) {
                                    Map<String, WebSocketManager> webSocketManagerMap = WebSocketHandler.getAllWebSocket();
                                    for (String key : webSocketManagerMap.keySet()) {
                                        WebSocketManager item = webSocketManagerMap.get(key);
                                        if (item != null && item.getSetting().reconnectWithNetworkChanged()) {
                                            item.reconnect();
                                        }
                                    }
                                }
                            } else {
                                LogUtil.i(TAG, "当前没有可用网络");
                            }
                        }else{
                            LogUtil.i(TAG, "当前网络断开");
    //                        EventBus.getDefault().post(new NetworkChangedReceiverNotify(false));
                        }
                    }
                } catch (Exception e) {
                    LogUtil.e(TAG, "网络状态获取错误", e);
                }
            }
        }
    
    }
    

    3.关于弱网的环境下的一些情况:

    一、发送消息失败或收取消息失败(让用户在ui交互上,自己点击重新发送或者下拉刷新通过http接口拉取最新服务器数据,内部封装好重新连接socket的逻辑)
    二、手机有弱网情况,房间内很多用户同时向服务器发送同种类型消息,比如:准备协议。这时,弱网的手机,发送的消息给服务器,服务器可能收到了,但是因为网络环境不稳定,客户端不一定能收到这条对应回来的消息。发送的数据,一定会有与之配对服务器返回的数据。但是在一些特定的场景下,可能服务器回来的消息,客户端这边收到不到。比如:弱网的环境,会出现丢包、丢数据的情况。

    4.关于客户端断开 socket 的时机和策略

    断开分为几种情况:
    一、客户端自己主动离开房间,正常断开;
    二、客户端杀掉APP后台进程,这时服务器依赖于心跳,30s没有收到客户端的心跳请求,会把客户端的连接状态断开;

    5.关于消息分发器

    接受到的数据,需要通过消息分发器来分发。先进入该类中处理,处理完再发送到下游。

    public class AppResponseDispatcher extends SimpleDispatcher {
    
        /**
         * JSON 数据格式错误
         */
        public static final int JSON_ERROR = 11;
        /**
         * code 码错误
         */
        public static final int CODE_ERROR = 12;
    
        @Override
        public void onMessage(String message, ResponseDelivery delivery) {
            try {
                Gson gson = new Gson();
                BaseProtocol response  = gson.fromJson(message,BaseProtocol.class);
                //op在1000与2000之间的协议,是客户端和服务器之间约定的有效op协议
                if (response.op >= 1000 && response.op < 2000) {
                    delivery.onMessage(message, response);
                } else {
                    ErrorResponse errorResponse = ResponseFactory.createErrorResponse();
                    errorResponse.setErrorCode(CODE_ERROR);
                    Response<String> textResponse = ResponseFactory.createTextResponse();
                    textResponse.setResponseData(message);
                    errorResponse.setResponseData(textResponse);
                    errorResponse.setReserved(response);
                    onSendDataError(errorResponse, delivery);
                }
            } catch (Exception e) {
                ErrorResponse errorResponse = ResponseFactory.createErrorResponse();
                Response<String> textResponse = ResponseFactory.createTextResponse();
                textResponse.setResponseData(message);
                errorResponse.setResponseData(textResponse);
                errorResponse.setErrorCode(JSON_ERROR);
                errorResponse.setCause(e);
                onSendDataError(errorResponse, delivery);
            }
        }
    
        /**
         * 统一处理错误信息,
         * 界面上可使用 ErrorResponse#getDescription() 来当做提示语
         */
        @Override
        public void onSendDataError(ErrorResponse error, ResponseDelivery delivery) {
            switch (error.getErrorCode()) {
                case ErrorResponse.ERROR_NO_CONNECT:
                    error.setDescription("网络错误");
                    break;
                case ErrorResponse.ERROR_UN_INIT:
                    error.setDescription("连接未初始化");
                    break;
                case ErrorResponse.ERROR_UNKNOWN:
                    error.setDescription("未知错误");
                    break;
                case JSON_ERROR:
                    error.setDescription("数据格式异常");
                    break;
                case CODE_ERROR:
                    error.setDescription("响应码错误");
                    break;
            }
            delivery.onSendDataError(error);
        }
    }
    
    /**
     * 一个简单的 WebSocket 消息分发器,实现了 {@link IResponseDispatcher} 接口,
     * 因为 IResponseDispatcher 中的方法比较多,所以在此提供了一个简单版本,
     * 只需要实现其中主要的几个方法即可。
     */
    public abstract class SimpleDispatcher implements IResponseDispatcher {
    
        @Override
        public void onConnected(ResponseDelivery delivery) {
            delivery.onConnected();
        }
    
        @Override
        public void onConnectFailed(Throwable cause, ResponseDelivery delivery) {
            delivery.onConnectFailed(cause);
        }
    
        @Override
        public void onDisconnect(ResponseDelivery delivery) {
            if(delivery != null){
                delivery.onDisconnect();
            }
        }
    
        @Override
        public void onMessage(ByteBuffer byteBuffer, ResponseDelivery delivery) {
            delivery.onMessage(byteBuffer, null);
        }
    
        @Override
        public void onPing(Framedata framedata, ResponseDelivery delivery) {
            delivery.onPing(framedata);
        }
    
        @Override
        public void onPong(Framedata framedata, ResponseDelivery delivery) {
            delivery.onPong(framedata);
        }
    
    }
    
    /**
     * 消息发射器接口
     */
    public interface ResponseDelivery extends SocketListener {
    
        void addListener(SocketListener listener);
    
        void removeListener(SocketListener listener);
    
        void clear();
    
        boolean isEmpty();
    }
    
    
    /**
     * WebSocket 监听器
     */
    public interface SocketListener {
    
        /**
         * 连接成功
         */
        void onConnected();
    
        /**
         * 连接失败
         */
        void onConnectFailed(Throwable e);
    
        /**
         * 连接断开
         */
        void onDisconnect();
    
        /**
         * 数据发送失败
         *
         * @param errorResponse 失败响应
         */
        void onSendDataError(ErrorResponse errorResponse);
    
        /**
         * 接收到文本消息
         *
         * @param message 文本消息
         * @param data    用户可将数据转成对应的泛型类型,可能为空,具体看用户在 {@link com.im.websocket.websocketlib.dispatcher.IResponseDispatcher}
         *                中的实现,默认为空
         * @param <T>     IResponseDispatcher 中转换的泛型类型
         */
        <T> void onMessage(String message, T data);
    
        /**
         * 接收到二进制消息
         *
         * @param bytes 二进制消息
         * @param data  用户可将数据转成对应的泛型类型,可能为空,具体看用户在 {@link com.im.websocket.websocketlib.dispatcher.IResponseDispatcher}
         *              中的实现,默认为空
         * @param <T>   IResponseDispatcher 中转换的泛型类型
         */
        <T> void onMessage(ByteBuffer bytes, T data);
    
        /**
         * 接收到 ping
         */
        void onPing(Framedata framedata);
    
        /**
         * 接收到 pong
         */
        void onPong(Framedata framedata);
    }
    
    

    6.关于用到的相关源码部分

    connectBlocking:WebSocketClient 的 connectBlocking() 会多出一个等待操作,会先连接再发送,否则未连接发送会报错。里面有用到 countDownLatch 这个类使一个线程等待其他线程各自执行完毕后再执行。
    在 Java 的 原生 Socket 类里:

    public class Socket implements java.io.Closeable {
    
       /**
        * Connects this socket to the server with a specified timeout value.
        * A timeout of zero is interpreted as an infinite timeout. The connection
        * will then block until established or an error occurs.
        *
        * @param   endpoint the {@code SocketAddress}
        * @param   timeout  the timeout value to be used in milliseconds.
        * @throws  IOException if an error occurs during the connection
        * @throws  SocketTimeoutException if timeout expires before connecting
        * @throws  java.nio.channels.IllegalBlockingModeException
        *          if this socket has an associated channel,
        *          and the channel is in non-blocking mode
        * @throws  IllegalArgumentException if endpoint is null or is a
        *          SocketAddress subclass not supported by this socket
        * @since 1.4
        * @spec JSR-51
        */
       public void connect(SocketAddress endpoint, int timeout) throws IOException {
           if (endpoint == null)
               throw new IllegalArgumentException("connect: The address can't be null");
    
           if (timeout < 0)
             throw new IllegalArgumentException("connect: timeout can't be negative");
    
           if (isClosed())
               throw new SocketException("Socket is closed");
    
           if (!oldImpl && isConnected())
               throw new SocketException("already connected");
    
           if (!(endpoint instanceof InetSocketAddress))
               throw new IllegalArgumentException("Unsupported address type");
    
           InetSocketAddress epoint = (InetSocketAddress) endpoint;
           InetAddress addr = epoint.getAddress ();
           int port = epoint.getPort();
           checkAddress(addr, "connect");
    
           SecurityManager security = System.getSecurityManager();
           if (security != null) {
               if (epoint.isUnresolved())
                   security.checkConnect(epoint.getHostName(), port);
               else
                   security.checkConnect(addr.getHostAddress(), port);
           }
           if (!created)
               createImpl(true);
           if (!oldImpl)
               impl.connect(epoint, timeout);
           else if (timeout == 0) {
               if (epoint.isUnresolved())
                   impl.connect(addr.getHostName(), port);
               else
                   impl.connect(addr, port);
           } else
               throw new UnsupportedOperationException("SocketImpl.connect(addr, timeout)");
           connected = true;
           /*
            * If the socket was not bound before the connect, it is now because
            * the kernel will have picked an ephemeral port & a local address
            */
           bound = true;
       }
    }
    

    关于 WebSocketImpl 里的BlockingQueue。BlockingQueue即阻塞队列,它是基于ReentrantLock。

    在Java中,BlockingQueue是一个接口,它的实现类有ArrayBlockingQueue、DelayQueue、 LinkedBlockingDeque、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue等,它们的区别主要体现在存储结构上或对元素操作上的不同,但是对于take与put操作的原理,却是类似的。


    /**
     *  表示单个WebSocketImpl连接的一端(客户端或服务器)。处理“握手”阶段,然后允许通过基于 
     *  事件的模型,轻松发送文本框架和接收框架。
     */
    public class WebSocketImpl implements WebSocket {
        /**
         * Queue of buffers that need to be sent to the client.
         */
        public final BlockingQueue<ByteBuffer> outQueue;
        /**
         * Queue of buffers that need to be processed
         */
        public final BlockingQueue<ByteBuffer> inQueue;  
    
        /**
         * creates a websocket with client role
         *
         * @param listener The listener for this instance
         * @param draft    The draft which should be used
         */
        public WebSocketImpl( WebSocketListener listener, Draft draft ) {
            ...
            this.outQueue = new LinkedBlockingQueue<ByteBuffer>();
            inQueue = new LinkedBlockingQueue<ByteBuffer>();
            ...
        }
    }
    
    LinkedBlockingQueue是一个基于链表实现的可选容量的阻塞队列。队头的元素是插入时间最长的,队尾的元素是最新插入的。新的元素将会被插入到队列的尾部。 LinkedBlockingQueue的容量限制是可选的,如果在初始化时没有指定容量,那么默认使用int的最大值作为队列容量。LinkedBlockingQueue内部是使用链表实现一个队列的,但是却有别于一般的队列,在于该队列至少有一个节点,头节点不含有元素。结构图如下:

    LinkedBlockingQueue中维持两把锁,一把锁用于入队,一把锁用于出队,这也就意味着,同一时刻,只能有一个线程执行入队,其余执行入队的线程将会被阻塞;同时,可以有另一个线程执行出队,其余执行出队的线程将会被阻塞。换句话说,虽然入队和出队两个操作同时均只能有一个线程操作,但是可以一个入队线程和一个出队线程共同执行,也就意味着可能同时有两个线程在操作队列,那么为了维持线程安全,LinkedBlockingQueue使用一个AtomicInterger类型的变量表示当前队列中含有的元素个数,所以可以确保两个线程之间操作底层队列是线程安全的。

    关于握手协议

    在 WebSocketClient 类下:

        /**
         * 创建握手并将其发送到另一个端点
         * @throws InvalidHandshakeException  a invalid handshake was created
         */
        private void sendHandshake() throws InvalidHandshakeException {
            String path;
            String part1 = uri.getRawPath();
            String part2 = uri.getRawQuery();
            if( part1 == null || part1.length() == 0 )
                path = "/";
            else
                path = part1;
            if( part2 != null )
                path += '?' + part2;
            int port = getPort();
            String host = uri.getHost() + ( 
                (port != WebSocketImpl.DEFAULT_PORT && port != WebSocketImpl.DEFAULT_WSS_PORT)
                ? ":" + port 
                : "" );
    
            HandshakeImpl1Client handshake = new HandshakeImpl1Client();
            handshake.setResourceDescriptor( path );
            handshake.put( "Host", host );
            if( headers != null ) {
                for( Map.Entry<String,String> kv : headers.entrySet() ) {
                    handshake.put( kv.getKey(), kv.getValue() );
                }
            }
            engine.startHandshake( handshake );
        }
    
    /**
     * 由WebSocketClient和WebSocketServer实现。 WebSocket调用其中的方法。几乎每个方法都采 
     * 用第一个参数conn,该参数代表相应事件的来源。
     */
    public interface WebSocketListener {
    
        /**
         * Called on the server side when the socket connection is first established, and the WebSocket
         * handshake has been received. This method allows to deny connections based on the received handshake.<br>
         * By default this method only requires protocol compliance.
         * 
         * @param conn
         *            The WebSocket related to this event
         * @param draft
         *            The protocol draft the client uses to connect
         * @param request
         *            The opening http message send by the client. Can be used to access additional fields like cookies.
         * @return Returns an incomplete handshake containing all optional fields
         * @throws InvalidDataException
         *             Throwing this exception will cause this handshake to be rejected
         */
        ServerHandshakeBuilder onWebsocketHandshakeReceivedAsServer( WebSocket conn, Draft draft, ClientHandshake request ) throws InvalidDataException;
    
        /**
         * Called on the client side when the socket connection is first established, and the WebSocketImpl
         * handshake response has been received.
         * 
         * @param conn
         *            The WebSocket related to this event
         * @param request
         *            The handshake initially send out to the server by this websocket.
         * @param response
         *            The handshake the server sent in response to the request.
         * @throws InvalidDataException
         *             Allows the client to reject the connection with the server in respect of its handshake response.
         */
        void onWebsocketHandshakeReceivedAsClient( WebSocket conn, ClientHandshake request, ServerHandshake response ) throws InvalidDataException;
    
        /**
         * Called on the client side when the socket connection is first established, and the WebSocketImpl
         * handshake has just been sent.
         * 
         * @param conn
         *            The WebSocket related to this event
         * @param request
         *            The handshake sent to the server by this websocket
         * @throws InvalidDataException
         *             Allows the client to stop the connection from progressing
         */
        void onWebsocketHandshakeSentAsClient( WebSocket conn, ClientHandshake request ) throws InvalidDataException;
    
        /**
         * Called when an entire text frame has been received. Do whatever you want
         * here...
         * 
         * @param conn
         *            The <tt>WebSocket</tt> instance this event is occurring on.
         * @param message
         *            The UTF-8 decoded message that was received.
         */
        void onWebsocketMessage( WebSocket conn, String message );
    
        /**
         * Called when an entire binary frame has been received. Do whatever you want
         * here...
         * 
         * @param conn
         *            The <tt>WebSocket</tt> instance this event is occurring on.
         * @param blob
         *            The binary message that was received.
         */
        void onWebsocketMessage( WebSocket conn, ByteBuffer blob );
    
        /**
         * Called after <var>onHandshakeReceived</var> returns <var>true</var>.
         * Indicates that a complete WebSocket connection has been established,
         * and we are ready to send/receive data.
         * 
         * @param conn The <tt>WebSocket</tt> instance this event is occuring on.
         * @param d The handshake of the websocket instance
         */
        void onWebsocketOpen( WebSocket conn, Handshakedata d );
    
        /**
         * Called after <tt>WebSocket#close</tt> is explicity called, or when the
         * other end of the WebSocket connection is closed.
         * 
         * @param ws The <tt>WebSocket</tt> instance this event is occuring on.
         * @param code The codes can be looked up here: {@link CloseFrame}
         * @param reason Additional information string
         * @param remote Returns whether or not the closing of the connection was initiated by the remote host.
         */
        void onWebsocketClose( WebSocket ws, int code, String reason, boolean remote );
    
        /** Called as soon as no further frames are accepted
         *
         * @param ws The <tt>WebSocket</tt> instance this event is occuring on.
         * @param code The codes can be looked up here: {@link CloseFrame}
         * @param reason Additional information string
         * @param remote Returns whether or not the closing of the connection was initiated by the remote host.
         */
        void onWebsocketClosing( WebSocket ws, int code, String reason, boolean remote );
    
        /** send when this peer sends a close handshake
         *
         * @param ws The <tt>WebSocket</tt> instance this event is occuring on.
         * @param code The codes can be looked up here: {@link CloseFrame}
         * @param reason Additional information string
         */
        void onWebsocketCloseInitiated( WebSocket ws, int code, String reason );
    
        /**
         * Called if an exception worth noting occurred.
         * If an error causes the connection to fail onClose will be called additionally afterwards.
         *
         * @param conn The <tt>WebSocket</tt> instance this event is occuring on.
         * @param ex
         *            The exception that occurred. <br>
         *            Might be null if the exception is not related to any specific connection. For example if the server port could not be bound.
         */
        void onWebsocketError( WebSocket conn, Exception ex );
    
        /**
         * Called a ping frame has been received.
         * This method must send a corresponding pong by itself.
         *
         * @param conn The <tt>WebSocket</tt> instance this event is occuring on.
         * @param f The ping frame. Control frames may contain payload.
         */
        void onWebsocketPing( WebSocket conn, Framedata f );
    
        /**
         * Called when a pong frame is received.
         *
         * @param conn The <tt>WebSocket</tt> instance this event is occuring on.
         * @param f The pong frame. Control frames may contain payload.
         **/
        void onWebsocketPong( WebSocket conn, Framedata f );
    
        /** This method is used to inform the selector thread that there is data queued to be written to the socket.
         * @param conn The <tt>WebSocket</tt> instance this event is occuring on.
         */
        void onWriteDemand( WebSocket conn );
    
        /**
         * @see  WebSocket#getLocalSocketAddress()
         *
         * @param conn The <tt>WebSocket</tt> instance this event is occuring on.
         * @return Returns the address of the endpoint this socket is bound to.
         */
        InetSocketAddress getLocalSocketAddress( WebSocket conn );
    
        /**
         * @see  WebSocket#getRemoteSocketAddress()
         *
         * @param conn The <tt>WebSocket</tt> instance this event is occuring on.
         * @return Returns the address of the endpoint this socket is connected to, or{@code null} if it is unconnected.
         */
        InetSocketAddress getRemoteSocketAddress( WebSocket conn );
    }
    

    WebSocketHandler 是个很重要的概念,我们无论是 WebSocket 的初始化、创建连接、断开连接、数据收发等等都要使用它来实现。

    /**
     * WebSocket 用户控制句柄
     */
    public class WebSocketHandler {
    
        private final static String TAG = "WebSocketHandler";
    
        /**
         * 消息发送引擎
         */
        private static WebSocketEngine webSocketEngine;
        /**
         * 消息处理引擎
         */
        private static ResponseProcessEngine responseProcessEngine;
        /**
         * 默认的 WebSocket 连接
         */
        private static WebSocketManager defaultWebSocket;
    
        /**
         * 对 {@link #mWebSocketMap} 操作时的锁
         */
        private static final Object WS_MAP_BLOCK = new HashMap<>();
    
        /**
         * 通过 Map 存储 WSM 对象,以此支持多个连接
         */
        private static Map<String, WebSocketManager> mWebSocketMap;
    
        private static Logable mLog;
    
        /**
         * 初始化默认的 WebSocket 连接
         *
         * @param setting 该连接的相关设置参数
         */
        public static WebSocketManager init(WebSocketSetting setting) {
            defaultWebSocket = null;
            synchronized (WebSocketHandler.class) {
                if (webSocketEngine == null) {
                    webSocketEngine = new WebSocketEngine();
                }
                if (responseProcessEngine == null) {
                    responseProcessEngine = new ResponseProcessEngine();
                }
                if (defaultWebSocket == null) {
                    defaultWebSocket = new WebSocketManager(setting,
                            webSocketEngine,
                            responseProcessEngine);
                }
            }
    
            return defaultWebSocket;
        }
    
        public static void clearDefaultWebSocket(){
            defaultWebSocket = null;
        }
    
        /**
         * 通过唯一标识符新建一个 WebSocket 连接
         *
         * @param key     该 WebSocketManager 的唯一标识符,
         *                后面需要通过这个 key 来获取到对应的 WebSocketManager
         * @param setting 该连接的相关设置参数
         */
        public static WebSocketManager initGeneralWebSocket(String key, WebSocketSetting setting) {
            checkEngineNullAndInit();
            checkWebSocketMapNullAndInit();
            synchronized (WS_MAP_BLOCK) {
                if (mWebSocketMap.containsKey(key)) {
                    LogUtil.e(TAG, "WebSocketManager exists!do not start again!");
                    return mWebSocketMap.get(key);
                } else {
                    WebSocketManager wsm = new WebSocketManager(setting,
                            webSocketEngine,
                            responseProcessEngine);
                    mWebSocketMap.put(key, wsm);
                    return wsm;
                }
            }
        }
    
        /**
         * 获取默认的 WebSocket 连接,
         * 调用此方法之前需要先调用 {@link #init(WebSocketSetting)} 方法初始化
         *
         * @return 返回一个 {@link WebSocketManager} 实例
         */
        public static WebSocketManager getDefault() {
            return defaultWebSocket;
        }
    
        /**
         * 获取 WebSocketManager 对象
         *
         * @param key 该 WebSocketManager 的 key
         * @return 可能为空,代表该 WebSocketManager 对象不存在或已移除
         */
        public static WebSocketManager getWebSocket(String key) {
            checkWebSocketMapNullAndInit();
            if (mWebSocketMap.containsKey(key)) {
                return mWebSocketMap.get(key);
            } else {
                return null;
            }
        }
    
        /**
         * 获取所有 WebSocketManager(defaultWebSocketManager 除外)
         */
        public static Map<String, WebSocketManager> getAllWebSocket() {
            checkWebSocketMapNullAndInit();
            return mWebSocketMap;
        }
    
        /**
         * 移除一个 WebSocketManager 对象
         *
         * @param key 该 WebSocketManager 的 key
         * @return 返回移除的 WebSocketManager,可能为空
         */
        public static WebSocketManager removeWebSocket(String key) {
            checkWebSocketMapNullAndInit();
            if (mWebSocketMap.containsKey(key)) {
                WebSocketManager removed = mWebSocketMap.get(key);
                synchronized (WS_MAP_BLOCK) {
                    mWebSocketMap.remove(key);
                }
                return removed;
            } else {
                return null;
            }
        }
    
        /**
         * 注册网络变化监听广播,网络由不可用变为可用时会重新连接 WebSocket
         *
         * @param context 此处应该使用 ApplicationContext,避免内存泄漏以及其它异常。
         */
        public static void registerNetworkChangedReceiver(Context context) {
            if (PermissionUtil.checkPermission(context, Manifest.permission.ACCESS_NETWORK_STATE)) {
                try {
                    IntentFilter filter = new IntentFilter();
                    filter.addAction(ConnectivityManager.CONNECTIVITY_ACTION);
                    context.registerReceiver(new NetworkChangedReceiver(), filter);
                } catch (Exception e) {
                    LogUtil.e(TAG, "网络监听广播注册失败:", e);
                }
            } else {
                LogUtil.e(TAG, "未获取到网络状态权限,广播监听器无法注册");
            }
        }
    
        /**
         * 初始化引擎
         */
        private static void checkEngineNullAndInit() {
            if (webSocketEngine == null || responseProcessEngine == null) {
                synchronized (WebSocketHandler.class) {
                    if (webSocketEngine == null) {
                        webSocketEngine = new WebSocketEngine();
                    }
                    if (responseProcessEngine == null) {
                        responseProcessEngine = new ResponseProcessEngine();
                    }
                }
            }
        }
    
        /**
         * 初始化 mWebSocketMap
         */
        private static void checkWebSocketMapNullAndInit() {
            if (mWebSocketMap == null) {
                synchronized (WS_MAP_BLOCK) {
                    if (mWebSocketMap == null) {
                        mWebSocketMap = new HashMap<>();
                    }
                }
            }
        }
    
        /**
         * 设置打印日志实现类,设置完成后内部运行日志会通过设置的实现类打印。
         * 需实现 {@link Logable} 接口
         */
        public static void setLogable(Logable logable) {
            mLog = logable;
        }
    
        public static Logable getLogable() {
            if (mLog == null) {
                mLog = new LogImpl();
            }
            return mLog;
        }
    
    }
    

    相关文章

      网友评论

        本文标题:在Android项目中原生写游戏APP,使用Java-WebSo

        本文链接:https://www.haomeiwen.com/subject/wsxqtltx.html