消息推送功能可以说是手机APP不可或缺的功能之一,一般我们可以使用第三方推送的SDK进行简单推送,比如极光推送、信鸽推等,但是对于消息聊天的时效性或者三方推送不能满足业务需求,我们需要使用WebSocket来实现消息推送功能。
基本流程
基于开源协议我们封装实现WebSocket的连接、注册、心跳、消息分发、超时任务功能,基本流程如下:
连接功能实现
首先我们新建一个项目,在build.grade中添加配置
compile 'com.neovisionaries:nv-websocket-client:2.2'
新建websocket管理类WsManger
public class WsManager {
private volatile static WsManager wsManger;
private WsManager() {
}
public static WsManager getWsManger() {
if (wsManger == null) {
synchronized (WsManager.class) {
if (wsManger == null) {
wsManger = new WsManager();
}
}
}
return wsManger;
}
}
接下来添加连接方法,我们将webSocket的状态分为三种,新建WsStatue枚举类对应起来
public enum WsStatus {
/**
* 连接成功
*/
CONNECT_SUCCESS,
/**
* 连接失败
*/
CONNECT_FAIL,
/**
* 正在连接
*/
CONNECTING;
}
连接方法如下所示:
/**
* 连接方法 这里要判断是否登录 此处省略
*/
public void connect() {
//WEB_SOCKET_API 是连接的url地址,
// CONNECT_TIMEOUT是连接的超时时间 这里是 5秒
try {
ws = new WebSocketFactory().createSocket(WEB_SOCKET_API, CONNECT_TIMEOUT)
//设置帧队列最大值为5
.setFrameQueueSize(5)
//设置不允许服务端关闭连接却未发送关闭帧
.setMissingCloseFrameAllowed(false)
//添加回调监听
.addListener(new WsListener())
//异步连接
.connectAsynchronously();
} catch (IOException e) {
e.printStackTrace();
}
setStatus(WsStatus.CONNECTING);
}
调用连接方法后 我们来看连接的回调 也就是WsListener
/**
* websocket回调事件
*/
private class WsListener extends WebSocketAdapter {
@Override
public void onConnected(WebSocket websocket, Map<String, List<String>> headers) throws Exception {
Log.d(TAG, "onConnected: 连接成功");
}
@Override
public void onConnectError(WebSocket websocket, WebSocketException exception) throws Exception {
Log.d(TAG, "onConnectError: 连接失败");
}
@Override
public void onDisconnected(WebSocket websocket, WebSocketFrame serverCloseFrame,
WebSocketFrame clientCloseFrame,
boolean closedByServer) throws Exception {
Log.d(TAG, "onDisconnected: 断开连接");
}
@Override
public void onTextMessage(WebSocket websocket, String text) throws Exception {
Log.d(TAG, "onTextMessage: 收到消息:" + text);
}
}
下面我们调用连接方法
WsManager.getWsManger().connect();
运行项目我们可以看到如下打印:
image.png
此处我们要做的处理是,如果收到连接失败或者断开连接的回调 需要重新连接,我们重新调用一次连接方法即可,并且如果超过三次重连失败,我们在业务中可以通过调用接口来获取数据,避免数据丢失,此处细节省略。
协议封装
此处协议如下所示:
{
"action":"",
"requestChild":{
"clientType":"",
"id":""
}
}
心跳、发送请求属于客户端 主动发送请求,我们可以将请求结果分为成功失败和超时,发送超时我们是收不到服务器任何回复的,所以我们需要在发送之后将发送放在超时任务队列中,如果请求成功将任务从超时队列中移除,超时从超时队列中获取任务重新请求。
超时任务队列中的回调成功、失败、超时。
根据上述协议,我们添加相应的实体类,并采用Builder设计模式
ublic class Request {
/**
* 行为
*/
private String action;
/**
* 请求体
*/
private RequestChild req;
/**
* 请求次数
*/
private transient int reqCount;
/**
* 超时的时间
*/
private transient int timeOut;
public Request() {
}
public Request(String action, int reqCount, int timeOut, RequestChild req) {
this.action = action;
this.req = req;
this.reqCount = reqCount;
this.timeOut = timeOut;
}
public static class Builder {
//action 请求类型
private String action;
//请求子类数据 按照具体业务划分
private RequestChild req;
//请求次数 便于重试
private int reqCount;
//超时时间
private int timeOut;
public Builder action(String action) {
this.action = action;
return this;
}
public Builder req(RequestChild req) {
this.req = req;
return this;
}
public Builder reqCount(int reqCount) {
this.reqCount = reqCount;
return this;
}
public Builder timeOut(int timeOut) {
this.timeOut = timeOut;
return this;
}
public Request build() {
return new Request(action, reqCount, timeOut, req);
}
}
}
public class RequestChild {
/**
* 设备类型
*/
private String clientType;
/**
* 用于用户注册的id
*/
private String id;
public RequestChild(String clientType, String id) {
this.clientType = clientType;
this.id = id;
}
public RequestChild() {
}
public static class Builder {
private String clientType;
private String id;
public RequestChild.Builder setClientType(String clientType) {
this.clientType = clientType;
return this;
}
public RequestChild.Builder setId(String id) {
this.id = id;
return this;
}
public RequestChild build() {
return new RequestChild(clientType, id);
}
}
}
我们添加一个发送请求的方法如下:
/**
* 发送请求
*
* @param request 请求体
* @param reqCount 请求次数
* @param requestListern 请求回调
*/
private void senRequest(Request request, final int reqCount, final RequestListern requestListern) {
if (!isNetConnect()) {
requestListern.requestFailed("网络未连接");
return;
}
}
请求回调如下所示
public interface RequestListern {
/**
* 请求成功
*/
void requestSuccess();
/**
* 请求失败
*
* @param message 请求失败消息提示
*/
void requestFailed(String message);
}
接着我们要把请求放在超时队列中,新建超时任务类,对应的分别是请求参数、请求回调、任务调度
public class TimeOutTask {
/**
* 请求主体
*/
private Request request;
/**
* 通用返回
*/
private RequestCallBack requestCallBack;
/**
* r任务
*/
private ScheduledFuture scheduledFuture;
public TimeOutTask(Request request,
RequestCallBack requestCallBack,
ScheduledFuture scheduledFuture) {
this.request = request;
this.requestCallBack = requestCallBack;
this.scheduledFuture = scheduledFuture;
}
public ScheduledFuture getScheduledFuture() {
return scheduledFuture;
}
public void setScheduledFuture(ScheduledFuture scheduledFuture) {
this.scheduledFuture = scheduledFuture;
}
public Request getRequest() {
return request;
}
public void setRequest(Request request) {
this.request = request;
}
public RequestCallBack getRequestCallBack() {
return requestCallBack;
}
public void setRequestCallBack(RequestCallBack requestCallBack) {
this.requestCallBack = requestCallBack;
}
}
RequestCallBack是超时任务的回调,只是比请求回调多了个超时,因为超时的处理机制是一样的,所以这里我们没必要将超时回调到请求中
public interface RequestCallBack {
/**
* 请求成功
*/
void requestSuccess();
/**
* 请求失败
*
* @param request 请求体
* @param message 请求失败的消息
*/
void requestFailed(String message, Request request);
/**
* 请求超时
*
* @param request 请求体
*/
void timeOut(Request request);
}
/**
* 添加超时任务
*/
private ScheduledFuture enqueueTimeout(final Request request, final long timeout) {
Log.d(TAG, " " + "enqueueTimeout: 添加超时任务类型为:" + request.getAction());
return executor.schedule(new Runnable() {
@Override
public void run() {
TimeOutTask timeoutTask = callbacks.remove(request.getAction());
if (timeoutTask != null) {
timeoutTask.getRequestCallBack().timeOut(timeoutTask.getRequest());
}
}
}, timeout, TimeUnit.MILLISECONDS);
}
超时任务的方法 是通过任务调度定时调用,请求成功后我们会把超时任务移除,当到了超时时间时,任务还存在就说明任务超时了。
每次的任务我们以action为键值存在hashMap中
private Map<String, CallbackWrapper> callbacks = new HashMap<>();
将任务放入超时任务代码如下所示:
final ScheduledFuture timeoutTask = enqueueTimeout(request, request.getTimeOut());
final RequestCallBack requestCallBack = new RequestCallBack() {
@Override
public void requestSuccess() {
requestListern.requestSuccess();
}
@Override
public void requestFailed(String message, Request request) {
requestListern.requestFailed(message);
}
@Override
public void timeOut(Request request) {
timeOutHanlder(request);
}
};
callbacks.put(request.getAction(),
new CallbackWrapper(request, requestCallBack, timeoutTask));
一般而言,任务超时都是由于连接原因导致,所以我们这里可以尝试重试一次,如果还是超时,通过 timeOutHanlder(request);方法 进行重新连接,重连代码和连接代码一样,这里就省略了,做好这步操作,我们就可以发送消息了。
/**
* 超时任务
*/
private void timeOutHanlder(Request requset) {
setStatus(WsStatus.CONNECT_FAIL);
//这里假装有重连
Log.d(TAG, "timeOutHanlder: 请求超时 准备重连");
}
到这里我们的流程基本可以走通了。
心跳
首先,我们要知道心跳的作用是什么,连接成功后,heartbeat可以固定的时间间隔向服务器发送询问,当前是否还在线,很多人说心跳失败了就重新连接,心跳成功就继续心跳,但是这里需要注意的是,我们一般收不到心跳失败回调的,心跳也是向服务器发送数据,所以我们要将所有的主动请求都放在超时任务队列中,所以对websocket来说 请求结果有三种:成功、失败、超时,对于用户 只有成功、失败即可。
至于心跳、注册等请求发送的数据是什么,这就得看我们与服务端定的协议是什么样了,通常来说 分为action 和 requestBody,协议格式我们再第二步已经封装好了,这里我们以心跳任务为例验证上面的封装。
/**
* 心跳
*/
void keepAlive() {
Request request = new Request.Builder()
.reqCount(0)
.timeOut(REQUEST_TIMEOUT)
.action(ACTION_KEEPALIVE).build();
WsManager.getWsManger().senRequest(request, request.getReqCount() + 1, new RequestListern() {
@Override
public void requestSuccess() {
Log.d(TAG, "requestSuccess: 心跳发送成功了");
}
@Override
public void requestFailed(String message) {
}
});
}
我们每间隔10s中开启一次心跳任务
/**
* 开始心跳
*/
public void startKeepAlive() {
mHandler.postDelayed(mKeepAliveTask, HEART_BEAT_RATE);
}
/**
* 心跳任务
*/
private Runnable mKeepAliveTask = new Runnable() {
@Override
public void run() {
keepAlive();
mHandler.removeCallbacks(mKeepAliveTask);
mHandler.postDelayed(mKeepAliveTask, HEART_BEAT_RATE);
}
};
为了便于操作演示,在主页面上加个按钮 ,点击按钮调用startKeepAlive方法,运行如下所示:
image.png
我们可以看到心跳返回的statue是300 不成功,5秒之后走到了请求超时的方法中,所以如果状态返回成功的话,我们需要回调给调用者。
/**
* 处理 任务回调
*
* @param action 请求类型
*/
void disPatchCallbackWarp(String action, boolean isSuccess) {
CallbackWrapper callBackWarp = callbacks.remove(action);
if (callBackWarp == null) {
Logger.d(TAG+" "+ "disPatchCallbackWarp: 任务队列为空");
} else {
callBackWarp.getScheduledFuture().cancel(true);
if (isSuccess) {
callBackWarp.getRequestCallBack().requestSuccess();
} else {
callBackWarp.getRequestCallBack().requestFailed("", new Request());
}
}
}
这样调用者才知道成功或失败。
发送其他消息与心跳一样,只是请求参数不同而已,修改Request参数即可。这样我们根据协议和业务就实现一个比较规范的webSocket消息推送流程了。
网友评论