OKsocket实践总结
OkSocket是github上socket使用比较多的一个Java库,旨在解决轻量级套接字通信问题,使开发人员能够更多地关注业务逻辑,而不是TCP通信原则和一些协议。
github项目中有一份作者自带的中文博客连接,但是其中文字部分还是没有太完整,故根据其英文文档和个人使用经验写这篇文章.
协议
我们必须自己定义的数据包协议,实现IReaderProtocol
接口,说明headerLength
和BodyLength
(Payload length)
在demo程序中,数据包第一个字节代表Payload length(body length),后面的是具体的信息
/**
* 这个协议表示,数据包头第一个字节为包信息部长(Payload length)
*/
val myProtocol = object : IReaderProtocol {
/**
* 通过解析[header] 获取包体的长度并返回
* */
override fun getBodyLength(header: ByteArray, byteOrder: ByteOrder?): Int {
return header[0].toInt()
}
/**
* 包头的长度,里面应该含有数据体长度的信息
* */
override fun getHeaderLength() = 1
}
服务器部分
这部分描述使用OkSocket建立一个简单的服务器
开启监听
- 准备一个未使用的端口,并首先获得
IRegister
val register =OkSocket.server(8088)
- 通过
IRegister
获取一个`IServerManager'
val manage = register.registerReceiver(object : ServerActionAdapter() {
/**
* 当服务器侦听您成功指定的端口时,将调用此方法。
*/
override fun onServerListening(serverPort: Int) {
LogUtils.i("端口$serverPort 开启监听成功!")
}
})
ServerActionAdapter
是Server的一些行为监听类,可以按需求override.这里上面监听实现了,在端口监听开启成功时,打印日志.(处理客户端发送的消息也在这override)
- 开启监听
if (!manage.isLive) {
manage.listen()
}
处理消息
在ServerActionAdapter
中overrideonClientConnected
方法
LogUtils.i("${client.hostName} 连接成功")
client.setReaderProtocol(myProtocol)
client.addIOCallback(clientIOCallback)
上面代码除了打印连接日志以外重点两行是
-
指定数据解析协议
-
信息处理的回调方法
处理消息就可以在回调方法addIOCallback(clientIOCallback)
中处理
override fun onClientRead(
originalData: OriginalData,
client: IClient,
clientPool: IClientPool<IClient, String>
) {
/**
* 参数originalData是来自服务器的数据
* 参数client是指哪一个接收到这个原始数据
*/
LogUtils.i(
"${client.hostName}: \n " +
"payLength = ${originalData.headBytes[0].toInt()} \n" +
" ${String(
originalData.bodyBytes
)} "
)
client.sendMsg("get your ${originalData.bodyBytes}")
}
在这部分代码中.把收到的信息 加上"get your"再回复给客户端.
IClient.send(ISendable sendable)
客户端
先准备一个SocketActionAdapter
用来监听处理socket的各种行为,包括服务器的返回.
private val socketActionAdapter by lazy {
object : DefaultSocketActionAdapter() {
override fun onSocketReadResponse(
info: ConnectionInfo,
action: String,
originalData: OriginalData
) {
LogUtils.i(
"收到回复:" +
"荷载长${originalData.headBytes[0].toInt()} \n" +
" ${String(
originalData.bodyBytes
)}"
)
}
/**
* 根据官方文档,如果需要注销适配器,需要在这个回调中进行
*/
override fun onSocketDisconnection(
info: ConnectionInfo?,
action: String?,
e: Exception?
) {
super.onSocketDisconnection(info, action, e)
//manager在下文中会看到是什么
manager.unRegisterReceiver(this)
}
}
}
连接
- 新建一个ConnectionInfo
var connectionInfo = ConnectionInfo("104.238.184.237", 8080)
- 获取ConnectionManager对象实例
OkSocket默认为每个用于缓存管理的新通道,只有在第一次调用Open()方法时,才是new得的一个ConnectionManager管理器,调用该管理器之后将被缓存,可以传递对ConnectionManager的引用,然后继续调用相关方法。
val manage = OkSocket.open(connectionInfo)
这里的manage
是ConnectionManager对象实例.主要负责socket的连接、断开、消息发送、心跳管理等,是需要关注的重要对象之一。
- 配置
这里只配了协议,更多可以查阅文档,或文末副文.
fun IConnectionManager.setting(func: OkSocketOptions.Builder.() -> Unit) {
val builder = OkSocketOptions.Builder(option)
builder.func()
option(builder.build())
}
manager.setting {
setReaderProtocol(myProtocol)
}
- 连接
manage.connect()
//断连
if (manager.isConnect) {
manager.disconnect()
}
发送信息
fun IConnectionManager.sendMsg(msg: String) {
val msgByteArray = msg.toByteArray()
val re = mutableListOf<Byte>()
re.add(msgByteArray.size.toByte())
re.addAll(msgByteArray.toList())
send { re.toByteArray() }
}
.....
manager.sendMsg("Hello ")
额外
打开日志
- IOLog
SLog.setIsDebug(true)
- Client
// 加到项目初始化的地方就行,实际上加哪都可以
OkSocketOptions.setIsDebug(true)
- Server
// 加到项目初始化的地方就行,实际上加哪都可以
OkServerOptions.setIsDebug(true)
IConnectionManager 可以修改的选项
/**
* Socket通信方式
* IOThreadMode
* SIMPLEX Write Waiting Response(单工写等待)
* DUPLEX Simultaneous Reading And Writing(双同时读写)
*/
private IOThreadMode mIOThreadMode;
/**
* 是否保留连接缓存.
* 如果选择“False”,每次调用OkSocket.open(),OkSocket将创建一个新的管理器Manager
*/
private boolean isConnectionHolden;
/**
* 在套接字管道中写入服务器的字节顺序
* Default is BIG_ENDIAN
*/
private ByteOrder mWriteOrder;
/**
* *从Socket pipe读取endianness时的字节顺序
* Default is BIG_ENDIAN
*/
private ByteOrder mReadByteOrder;
/**
* 数据包头格式
*/
private IReaderProtocol mReaderProtocol;
/**
* 发送到服务器的单个数据包的总长度
*/
private int mWritePackageBytes;
/**
* 从服务器读取数据时一次读取的缓存字节的长度。
* 值越大,读取效率越高。
* 但相应的系统消耗会更大。
*/
private int mReadPackageBytes;
/**
* Pulse frequency脉冲频率,单位为毫秒 milliseconds
*/
private long mPulseFrequency;
/**
* Allow pulse loss times(允许脉冲损耗时间)
* Will be disconnected if it is greater than or equal to the number of lost times
* Throw {@link com.xuhao.didi.socket.client.impl.exceptions.DogDeadException}
*/
private int mPulseFeedLoseTimes;
/**
* Connection timeout (seconds)
*/
private int mConnectTimeoutSecond;
/**
* 最大读取数据 (MB)
* 防止服务器返回数据体中太大的数据会导致内存溢出。
*/
private int mMaxReadDataMB;
/**
* Reconnect manager
*/
private AbsReconnectionManager mReconnectionManager;
/**
* Secure Sockets Layer Configuration
*/
private OkSocketSSLConfig mSSLConfig;
/**
* Socket factory for custom Sockets
*/
private OkSocketFactory mOkSocketFactory;
/**
* 是否从一个单独的线程回调.
* Callback from IO thread by default
*/
private boolean isCallbackInIndependentThread;
/**
* Will be distributed to the handler,
* the external needs to pass in the HandlerToken and call
* Handler.post(runnable);
*/
private ThreadModeToken mCallbackThreadModeToken;
网友评论