目的:由于公司项目需求,现需实现一个基于TCP的网络请求框架。
功能:包含心跳机制、请求缓冲、请求回调、基本的网络通信。
Tip:由于该项目是由kotlin编写的,所以不太熟悉kotlin的老铁可以先看看kotlin的基本语法,没看过kotlin的也没关系,因为kotlin和java语法差别不是很大。相信大部分还是能看懂的。
该框架大致可分为下面几个部分:
conn();//用于链接服务器
reconn();//用于重连
handshake();//用于与服务器握手
startTCPServer();//用于接收服务器的数据
sendRequest();//用于发送请求
PingTask.class;//用于监听心跳
RequestTask.class;//用于发送请求到服务器
lockThread();//锁住线程,由于请求和返回的数据是一对一的,所以发送完一条请求后会等待服务器返回数据,直到服务器返回数据或者请求超时才会解除锁发送下一条数据。
代码里面注释也比较详细,其他的我就不多哔哔了直接上代码,由于本人也是第一次写TCP框架所以还有很多不完善的地方,请各位老铁多多包容。
import android.os.Handler
import android.util.Log
import com.google.gson.Gson
import com.google.gson.JsonParseException
import com.google.gson.JsonParser
import com.mmy.acassistant.data.bean.ConnBean
import com.mmy.acassistant.data.bean.Request
import java.io.BufferedWriter
import java.io.OutputStreamWriter
import java.io.PrintWriter
import java.net.Socket
import kotlin.concurrent.thread
/**
* @file TCPClient.kt
* @brief TCP服务
* @author lucas
* @date 2018/4/27 0027
* @version V1.0
* @par Copyright (c):
* @par History:
* version: zsr, 2017-09-23
*/
object TCPClient {
//服务器地址
val ip = "122.xx.xxx.xx"
//端口
val port = 1234
//心跳包
val ping = "{\"type\":\"ping\"}"
//通道
var socket: Socket? = null
//服务器链接状态
var isConn = false
//链接异常回调
var _onConnFail: (Exception) -> Unit = {}
//握手失败回调
var _onHandshakFail: (Exception) -> Unit = {}
val gson = Gson()
val handler = Handler()
//请求任务
var requestTask: RequestTask? = null
val pingCheckTask = PingTask()
//同步锁标记
val syncTag = this.javaClass
/**
* 链接服务器
* Tip:该方法被调用的地方必须长期保活,因为里面创建子线程在与服务器链接。比如在activity中调用后,界面关闭了就会导致
* 线程被杀掉,也就意味着与服务器的链接断开了。建议在MainActivity中创建,或者在server中创建。
*/
fun conn() {
thread {
println("准备链接")
try {
socket = Socket(ip, port)
println("链接成功,开始接受握手数据..")
handshake()
} catch (e: Exception) {
println("链接失败,原因:${e.printStackTrace()}")
//链接异常
_onConnFail(e)
}
}
}
/**
* 重连
*/
private fun reConn() {
TCPClient.println("准备重连..")
handler.post {
//关闭任务
requestTask?.foreClose()
//关闭心跳检查任务
pingCheckTask.stopTask()
//关闭通道
socket?.shutdownInput()
socket?.shutdownOutput()
socket?.close()
conn()
}
}
/**
* 接受握手数据
*
*/
private fun handshake() {
println("开始接受握手数据")
try {
if (socket != null && socket?.isConnected!!) {
//判断输入流是否关闭
if (socket != null && !socket?.isInputShutdown!!) {
val reader = socket?.getInputStream()
var content = ""
var size = 0
//等待服务器推数据过来,并获取数据大小
while (size == 0)
size = reader?.available()!!
val buffer = ByteArray(size)
// Log.d("lucas111", "size:" + size.toString())
reader?.read(buffer)
content = String(buffer, 0, size)
println("收到握手数据:$content")
val connBean = gson.fromJson<ConnBean>(content, ConnBean::class.java)
if (connBean.status == 1) {
println("握手成功,启动服务开始保持长连接")
//开启心跳检查任务
pingCheckTask.startTask()
isConn = true
startTCPServer()
} else {
isConn = false
_onHandshakFail(Exception("握手失败,重新链接"))
println("握手失败,重新链接")
reConn()
}
}
Thread.sleep(1000 * 5)
} else {
println("aaa")
}
} catch (e: JsonParseException) {
//json解析异常
TCPClient.println("握手数据解析异常")
_onHandshakFail(e)
} catch (e: Exception) {
println("握手失败,重新链接:${e.printStackTrace()}")
_onHandshakFail(e)
reConn()
}
}
/**
* 开启长连接服务
*/
private fun startTCPServer() {
while (true)
try {
if (!socket?.isInputShutdown!!) {
val reader = socket?.getInputStream()
var content = ""
var size = 0
//等待服务器推数据过来,并获取数据大小
while (size == 0)
size = reader?.available()!!
val buffer = ByteArray(size)
// Log.d("lucas111", "size:" + size.toString())
reader?.read(buffer)
content = String(buffer, 0, size)
//判断数据格式是否为json格式
if (isJson(content)) {
pingCheckTask.receiverPing()
//分离出心跳
if (ping == content) {
TCPClient.println("ping.")
//回复心跳
replyHeartbeat()
} else {
//解析数据
TCPClient.println("收到数据:$content")
//释放锁,继续发送下一条请求
synchronized(lock,{ lock.notifyAll()})
TODO("回调接口")
}
} else {
println("数据格式错误:$content")
TODO("数据格式错误")
}
}
} catch (e: Exception) {
println("服务异常")
TODO("服务异常")
}
}
/**
* 加入请求
* Tip:该方法一般都是在主线程或者其他线程中调用,而请求体要加入到缓冲区中,这里必须要用同步锁来处理。
* @param request 请求内容
*/
fun sendRequest(request: Request) {
if (requestTask == null) {
requestTask = RequestTask()
requestTask?.startTask()
}
//加入任务
requestTask?.addTask(request)
}
/**
* 心跳检查任务
* Tip:由于心跳是由服务器主动发送断开的,所以我们自己也要有一套检查心跳的程序来保证链接
*/
class PingTask : Runnable {
//上次收到心跳时间
private var lastReceiverPingTime = System.currentTimeMillis()
//心跳间隔 30S 一般这个间隔要设置的比服务器的大,因为网络有延时,比如服务器的是10s,我就设置30s
private val pingSpace = 30 * 1000L
//检查间隔 5S
private val checkSpace = 5 * 1000L
override fun run() {
if (System.currentTimeMillis() - lastReceiverPingTime > pingSpace) {
TCPClient.println("心跳异常,尝试重新链接..")
handler.removeCallbacks(this)
reConn()
} else {
handler.postDelayed(this, checkSpace)
}
}
//重置接受心跳时间
fun receiverPing() {
lastReceiverPingTime = System.currentTimeMillis()
}
//开始任务
fun startTask() {
handler.post(this)
}
//结束任务
fun stopTask() {
handler.removeCallbacks(this)
}
}
//同步锁
val lock = java.lang.Object()
//锁状态
var isLock = false
class RequestTask : Thread() {
//请求缓冲区
val requestBuffer = ArrayList<Request>()
//运行状态
var isRunning = false
override fun run() {
synchronized(lock) {
while (true) {
TCPClient.println(isRunning.toString())
if (isRunning) {
//取出一条等待发送的请求
var request: Request? = getRequest()
if (request == null) {
TCPClient.println("缓冲区暂无可发送的请求,结束任务。")
isRunning = false
continue
}
//发送请求
try {
if (socket != null && socket?.isConnected!! && !socket?.isOutputShutdown!!) {
val writer = PrintWriter(BufferedWriter(OutputStreamWriter(socket?.getOutputStream())))
val content = gson.toJson(request)
writer.print(content)
writer.flush()
println("send data:$content")
//标记发送成功
synchronized(syncTag, {
request.sendTag = SendTag.SUCCESS
})
//锁住线程,等待数据接收完毕
lockThread()
}
} catch (e: Exception) {
println("请求发送失败")
//移除失败的任务
synchronized(syncTag, {
request.sendTag = SendTag.FAIL
request.onFail(Exception("请求发送失败"))
requestBuffer.filter { request == it }
})
}
} else {
lockThread()
}
}
}
}
private fun getRequest(): Request? {
synchronized(syncTag, {
requestBuffer.forEach {
if (it.sendTag == SendTag.WAIT_SEND) {
it.sendTag = SendTag.SENDING
return it
}
}
})
return null
}
/**
* 开启任务
*/
fun startTask() {
start()
}
/**
* 恢复任务
*/
fun addTask(request: Request) {
synchronized(syncTag, {
//判断请求是否存在,如果存在就移除,并使用新的请求
requestBuffer.filter {
request.msg_source == it.msg_source && request.cmd == it.cmd_rec
}
requestBuffer.add(request)
})
if (!isConn) {
TCPClient.println("服务器正在链接或者链接失败")
return
}
if (isRunning || requestBuffer.isEmpty()) return
if (isLock)
synchronized(lock, {
lock.notifyAll()
})
isRunning = true
}
/**
* 暂停任务
* Tip:软关闭,当当前的一条请求发送完才关闭
*/
fun parseTask() {
isRunning = false
}
/**
* 强制关闭
*/
fun foreClose() {
interrupt()
}
}
/**
* 锁住线程
*/
private fun lockThread() {
isLock = true
//强制关闭线程会导致InterruptedException异常,暂时不处理它
try {
lock.wait()
} catch (e: Exception) {
e.printStackTrace()
}
}
/**
* 回复心跳
*/
private fun replyHeartbeat() {
try {
if (socket != null && socket?.isConnected!! && !socket?.isOutputShutdown!!) {
val writer = PrintWriter(BufferedWriter(OutputStreamWriter(socket?.getOutputStream())))
writer.print(ping)
writer.flush()
println("send ping.")
}
} catch (e: Exception) {
println("心跳回复失败")
}
}
private fun println(msg: String) {
Log.d("lucas111", "$msg\n")
// print("$msg\n")
}
/**
* 判断字符串是否为json格式
*/
private fun isJson(json: String): Boolean {
if (json.isEmpty()) return false
return try {
JsonParser().parse(json)
true
} catch (e: Exception) {
false
}
}
enum class SendTag {
//发送失败 发送成功 发送中 未发送
FAIL,
SUCCESS,
SENDING,
WAIT_SEND
}
}
网友评论