美文网首页Android
封装TCP请求框架

封装TCP请求框架

作者: lucasDev | 来源:发表于2019-03-15 11:40 被阅读0次

    目的:由于公司项目需求,现需实现一个基于TCP的网络请求框架。

    功能:包含心跳机制、请求缓冲、请求回调、基本的网络通信。

    Tip:由于该项目是由kotlin编写的,所以不太熟悉kotlin的老铁可以先看看kotlin的基本语法,没看过kotlin的也没关系,因为kotlin和java语法差别不是很大。相信大部分还是能看懂的。
    

    该框架大致可分为下面几个部分:

    conn();//用于链接服务器

    reconn();//用于重连

    handshake();//用于与服务器握手

    startTCPServer();//用于接收服务器的数据

    sendRequest();//用于发送请求

    PingTask.class;//用于监听心跳

    RequestTask.class;//用于发送请求到服务器

    lockThread();//锁住线程,由于请求和返回的数据是一对一的,所以发送完一条请求后会等待服务器返回数据,直到服务器返回数据或者请求超时才会解除锁发送下一条数据。

    代码里面注释也比较详细,其他的我就不多哔哔了直接上代码,由于本人也是第一次写TCP框架所以还有很多不完善的地方,请各位老铁多多包容。
    
    import android.os.Handler
    import android.util.Log
    import com.google.gson.Gson
    import com.google.gson.JsonParseException
    import com.google.gson.JsonParser
    import com.mmy.acassistant.data.bean.ConnBean
    import com.mmy.acassistant.data.bean.Request
    import java.io.BufferedWriter
    import java.io.OutputStreamWriter
    import java.io.PrintWriter
    import java.net.Socket
    import kotlin.concurrent.thread
    
    /**
     * @file       TCPClient.kt
     * @brief      TCP服务
     * @author     lucas
     * @date       2018/4/27 0027
     * @version    V1.0
     * @par        Copyright (c):
     * @par History:
     *             version: zsr, 2017-09-23
     */
    object TCPClient {
        //服务器地址
        val ip = "122.xx.xxx.xx"
        //端口
        val port = 1234
        //心跳包
        val ping = "{\"type\":\"ping\"}"
        //通道
        var socket: Socket? = null
        //服务器链接状态
        var isConn = false
        //链接异常回调
        var _onConnFail: (Exception) -> Unit = {}
        //握手失败回调
        var _onHandshakFail: (Exception) -> Unit = {}
        val gson = Gson()
        val handler = Handler()
        //请求任务
        var requestTask: RequestTask? = null
    
        val pingCheckTask = PingTask()
    
        //同步锁标记
        val syncTag = this.javaClass
    
        /**
         * 链接服务器
         * Tip:该方法被调用的地方必须长期保活,因为里面创建子线程在与服务器链接。比如在activity中调用后,界面关闭了就会导致
         * 线程被杀掉,也就意味着与服务器的链接断开了。建议在MainActivity中创建,或者在server中创建。
         */
        fun conn() {
            thread {
                println("准备链接")
                try {
                    socket = Socket(ip, port)
                    println("链接成功,开始接受握手数据..")
                    handshake()
                } catch (e: Exception) {
                    println("链接失败,原因:${e.printStackTrace()}")
                    //链接异常
                    _onConnFail(e)
                }
            }
        }
    
        /**
         * 重连
         */
        private fun reConn() {
            TCPClient.println("准备重连..")
            handler.post {
                //关闭任务
                requestTask?.foreClose()
                //关闭心跳检查任务
                pingCheckTask.stopTask()
                //关闭通道
                socket?.shutdownInput()
                socket?.shutdownOutput()
                socket?.close()
                conn()
            }
        }
    
        /**
         * 接受握手数据
         *
         */
        private fun handshake() {
            println("开始接受握手数据")
            try {
                if (socket != null && socket?.isConnected!!) {
                    //判断输入流是否关闭
                    if (socket != null && !socket?.isInputShutdown!!) {
                        val reader = socket?.getInputStream()
                        var content = ""
                        var size = 0
                        //等待服务器推数据过来,并获取数据大小
                        while (size == 0)
                            size = reader?.available()!!
                        val buffer = ByteArray(size)
    //                    Log.d("lucas111", "size:" + size.toString())
                        reader?.read(buffer)
                        content = String(buffer, 0, size)
                        println("收到握手数据:$content")
                        val connBean = gson.fromJson<ConnBean>(content, ConnBean::class.java)
                        if (connBean.status == 1) {
                            println("握手成功,启动服务开始保持长连接")
                            //开启心跳检查任务
                            pingCheckTask.startTask()
                            isConn = true
                            startTCPServer()
                        } else {
                            isConn = false
                            _onHandshakFail(Exception("握手失败,重新链接"))
                            println("握手失败,重新链接")
                            reConn()
                        }
                    }
                    Thread.sleep(1000 * 5)
                } else {
                    println("aaa")
                }
            } catch (e: JsonParseException) {
                //json解析异常
                TCPClient.println("握手数据解析异常")
                _onHandshakFail(e)
            } catch (e: Exception) {
                println("握手失败,重新链接:${e.printStackTrace()}")
                _onHandshakFail(e)
                reConn()
            }
        }
    
        /**
         * 开启长连接服务
         */
        private fun startTCPServer() {
            while (true)
                try {
                    if (!socket?.isInputShutdown!!) {
                        val reader = socket?.getInputStream()
                        var content = ""
                        var size = 0
                        //等待服务器推数据过来,并获取数据大小
                        while (size == 0)
                            size = reader?.available()!!
                        val buffer = ByteArray(size)
    //                    Log.d("lucas111", "size:" + size.toString())
                        reader?.read(buffer)
                        content = String(buffer, 0, size)
                        //判断数据格式是否为json格式
                        if (isJson(content)) {
                            pingCheckTask.receiverPing()
                            //分离出心跳
                            if (ping == content) {
                                TCPClient.println("ping.")
                                //回复心跳
                                replyHeartbeat()
                            } else {
                                //解析数据
                                TCPClient.println("收到数据:$content")
                                //释放锁,继续发送下一条请求
                                synchronized(lock,{ lock.notifyAll()})
                                TODO("回调接口")
                            }
                        } else {
                            println("数据格式错误:$content")
                            TODO("数据格式错误")
                        }
                    }
                } catch (e: Exception) {
                    println("服务异常")
                    TODO("服务异常")
                }
        }
    
        /**
         * 加入请求
         * Tip:该方法一般都是在主线程或者其他线程中调用,而请求体要加入到缓冲区中,这里必须要用同步锁来处理。
         * @param request 请求内容
         */
        fun sendRequest(request: Request) {
            if (requestTask == null) {
                requestTask = RequestTask()
                requestTask?.startTask()
            }
            //加入任务
            requestTask?.addTask(request)
        }
    
        /**
         * 心跳检查任务
         * Tip:由于心跳是由服务器主动发送断开的,所以我们自己也要有一套检查心跳的程序来保证链接
         */
        class PingTask : Runnable {
            //上次收到心跳时间
            private var lastReceiverPingTime = System.currentTimeMillis()
            //心跳间隔 30S  一般这个间隔要设置的比服务器的大,因为网络有延时,比如服务器的是10s,我就设置30s
            private val pingSpace = 30 * 1000L
            //检查间隔 5S
            private val checkSpace = 5 * 1000L
    
            override fun run() {
                if (System.currentTimeMillis() - lastReceiverPingTime > pingSpace) {
                    TCPClient.println("心跳异常,尝试重新链接..")
                    handler.removeCallbacks(this)
                    reConn()
                } else {
                    handler.postDelayed(this, checkSpace)
                }
            }
    
            //重置接受心跳时间
            fun receiverPing() {
                lastReceiverPingTime = System.currentTimeMillis()
            }
    
            //开始任务
            fun startTask() {
                handler.post(this)
            }
    
            //结束任务
            fun stopTask() {
                handler.removeCallbacks(this)
            }
        }
    
        //同步锁
        val lock = java.lang.Object()
        //锁状态
        var isLock = false
    
        class RequestTask : Thread() {
            //请求缓冲区
            val requestBuffer = ArrayList<Request>()
            //运行状态
            var isRunning = false
    
            override fun run() {
                synchronized(lock) {
                    while (true) {
                        TCPClient.println(isRunning.toString())
                        if (isRunning) {
                            //取出一条等待发送的请求
                            var request: Request? = getRequest()
                            if (request == null) {
                                TCPClient.println("缓冲区暂无可发送的请求,结束任务。")
                                isRunning = false
                                continue
                            }
                            //发送请求
                            try {
                                if (socket != null && socket?.isConnected!! && !socket?.isOutputShutdown!!) {
                                    val writer = PrintWriter(BufferedWriter(OutputStreamWriter(socket?.getOutputStream())))
                                    val content = gson.toJson(request)
                                    writer.print(content)
                                    writer.flush()
                                    println("send data:$content")
                                    //标记发送成功
                                    synchronized(syncTag, {
                                        request.sendTag = SendTag.SUCCESS
                                    })
                                    //锁住线程,等待数据接收完毕
                                    lockThread()
                                }
                            } catch (e: Exception) {
                                println("请求发送失败")
                                //移除失败的任务
                                synchronized(syncTag, {
                                    request.sendTag = SendTag.FAIL
                                    request.onFail(Exception("请求发送失败"))
                                    requestBuffer.filter { request == it }
                                })
                            }
                        } else {
                            lockThread()
                        }
                    }
                }
            }
    
            private fun getRequest(): Request? {
                synchronized(syncTag, {
                    requestBuffer.forEach {
                        if (it.sendTag == SendTag.WAIT_SEND) {
                            it.sendTag = SendTag.SENDING
                            return it
                        }
                    }
                })
                return null
            }
    
            /**
             * 开启任务
             */
            fun startTask() {
                start()
            }
    
            /**
             * 恢复任务
             */
            fun addTask(request: Request) {
                synchronized(syncTag, {
                    //判断请求是否存在,如果存在就移除,并使用新的请求
                    requestBuffer.filter {
                        request.msg_source == it.msg_source && request.cmd == it.cmd_rec
                    }
                    requestBuffer.add(request)
                })
                if (!isConn) {
                    TCPClient.println("服务器正在链接或者链接失败")
                    return
                }
                if (isRunning || requestBuffer.isEmpty()) return
                if (isLock)
                    synchronized(lock, {
                        lock.notifyAll()
                    })
                isRunning = true
            }
    
            /**
             * 暂停任务
             * Tip:软关闭,当当前的一条请求发送完才关闭
             */
            fun parseTask() {
                isRunning = false
            }
    
            /**
             * 强制关闭
             */
            fun foreClose() {
                interrupt()
            }
        }
    
        /**
         * 锁住线程
         */
        private fun lockThread() {
            isLock = true
            //强制关闭线程会导致InterruptedException异常,暂时不处理它
            try {
                lock.wait()
            } catch (e: Exception) {
                e.printStackTrace()
            }
        }
    
        /**
         * 回复心跳
         */
        private fun replyHeartbeat() {
            try {
                if (socket != null && socket?.isConnected!! && !socket?.isOutputShutdown!!) {
                    val writer = PrintWriter(BufferedWriter(OutputStreamWriter(socket?.getOutputStream())))
                    writer.print(ping)
                    writer.flush()
                    println("send ping.")
                }
            } catch (e: Exception) {
                println("心跳回复失败")
            }
        }
    
        private fun println(msg: String) {
            Log.d("lucas111", "$msg\n")
    //        print("$msg\n")
        }
    
        /**
         * 判断字符串是否为json格式
         */
        private fun isJson(json: String): Boolean {
            if (json.isEmpty()) return false
            return try {
                JsonParser().parse(json)
                true
            } catch (e: Exception) {
                false
            }
        }
    
        enum class SendTag {
            //发送失败 发送成功  发送中 未发送
            FAIL,
            SUCCESS,
            SENDING,
            WAIT_SEND
        }
    }
    

    相关文章

      网友评论

        本文标题:封装TCP请求框架

        本文链接:https://www.haomeiwen.com/subject/jhdlmqtx.html