美文网首页
dubbo 不依赖提供者的sdk对服务进行监听和调用

dubbo 不依赖提供者的sdk对服务进行监听和调用

作者: ToufuDrago_c53f | 来源:发表于2019-10-03 09:34 被阅读0次

    最近接到一个关于dubbo调用的需求,要求如下:

    • 不依赖dubbo服务提供方的sdk调用其服务,只知道服务的包名、接口名、方法名。
    • 检索出某个特定包下的所有dubbo服务,对其进行监听(监听提供者的上线、下线、版本更新、参数变化)。

    现在功能已实现,在此记录一下。

    一、大体方案

    1.不依赖提供方的sdk调用其服务

    可以通过dubbo的泛化调用实现,关于dubbo的泛化调用,可以查看:
    http://dubbo.apache.org/zh-cn/blog/dubbo-generic-invoke.html
    我们可以了解到,要进行泛化调用,需要的参数有:

    • zookeeper的地址以及端口号。
    • 服务的包名+接口名+方法名。
    • 方法的参数类型,调用时传入的参数。

    如果被泛化调用的方法返回的类型是pojo,那么会自动转为Map<String, Any>格式。
    泛化调用需要的参数,只有方法的参数类型需要想办法获取。通过查阅资料,得知获取方法的参数类型有两种:
    1.使用dubbo 2.7之后的版本且服务提供方在provider.xml中使用了<dubbo:metadata-report>标签,这样zookeeper上就会存储dubbo服务方法的参数类型,否则不会存。
    2.通过zkClient获取到服务提供者的ip地址、端口号,再通过socket编程连接到服务提供方获取方法的参数类型。
    通过实验发现我这的dubbo版本在2.7之前或没用<dubbo:metadata-report>标签,只能使用方法2了。

    2.检索出特定包下的服务并监听

    首先是检索出特定包下的服务,查阅资料得知可以通过zkClient连接到zookeeper,然后获取注册到该zookeeper的所有服务,最后筛选出特定包下的服务即可。
    服务的监听,包括两种:
    1.有新的特定包的服务时出现时要监听到。
    2.监听服务的提供者上线、下线、版本更新、参数变化。
    经过实验,这两点都能通过给zkClient添加IZkChildListener监听到。

    二、一些关键的代码

    1.泛化调用

    class GenericCaller private constructor(val env: String) {
        private val application = ApplicationConfig()
        private val registry = RegistryConfig()
        // 按照dubbo 泛化调用文档所说,缓存ReferenceConfig<GenericService>
        private val referenceMap : ConcurrentHashMap<String, ReferenceConfig<GenericService>> = ConcurrentHashMap(64)
    
        companion object {
            private val listCaller: List<GenericCaller> by lazy { listOf(GenericCaller("dev"), GenericCaller("fat")) }
            // env 用于区分开发环境、测试环境
            fun get(env: String): GenericCaller? {
                if (env == "dev") {
                    return listCaller[0]
                }
                if (env == "fat") {
                    return listCaller[1]
                }
                return null
            }
        }
    
        init {
            application.name = "$env-databank-generic-consumer"
            // 到配置中心获取zookeeper的地址和端口号
            val zkHost = ApolloConfigReaderUtils.getString("${env}.zookeeper.hostName")
            val zkPort = ApolloConfigReaderUtils.getString("${env}.zookeeper.port")
            registry.address = "zookeeper://$zkHost:$zkPort"
            application.registry = registry
        }
    
        fun call(interfaceName: String, methodName: String, version: String, group: String, parameters: List<Parameter>,
                 jsonObj: JSONObject): Any {
            val listParameterValue = parameters.map {
                when (it.parameterType) {
                    "int", "java.lang.Integer" -> jsonObj.getInteger(it.name)
                    "String", "java.lang.String" -> jsonObj.getString(it.name)
                    "com.alibaba.fastjson.JSONArray" -> jsonObj.getJSONArray(it.name)
                    "long" -> jsonObj.getLong(it.name)
                    "boolean" -> jsonObj.getBoolean(it.name)
                    else -> jsonObj[it.name] ?: throw HandlerInvokeException("no parameter!")
                }
            }
    
            val genericService = getReferenceConfig(interfaceName, version, group).get()
            // 泛化调用
            return genericService.`$invoke`(methodName, parameters.map { it.parameterType }.toTypedArray(),
                    listParameterValue.toTypedArray())
        }
    
        // 先到缓存取ReferenceConfig,取不到再new
        private fun getReferenceConfig(interfaceName: String, version : String, group : String) : ReferenceConfig<GenericService>{
            val key = "$interfaceName-$version-$group"
            if (referenceMap[key] == null){
                synchronized("$env-$key".intern()){
                    if (referenceMap[key] == null){
                        val referenceTemp = ReferenceConfig<GenericService>()
                        referenceTemp.setInterface(interfaceName)
                        referenceTemp.isGeneric = true
                        referenceTemp.application = application
                        referenceTemp.version = version
                        referenceTemp.group = group
                        referenceMap[key] = referenceTemp
                    }
                }
            }
            return referenceMap[key]!!
        }
    
    }
    

    2.服务的监听

    ZkServiceListener监听是否有新的服务

    class ZkServiceListener(val env : String) : IZkChildListener {
    
        override fun handleChildChange(path : String?, listService: MutableList<String>?) {
            println("--------This is ZkServiceListener-------")
            if (listService == null || listService.isEmpty()){
                return
            }
            // 筛选出databank的服务
            val listInnerService = listService.filter { it.contains(ZkCommandLineRunner.getKeyWord()) }
            synchronized("$env/dubbo".intern()){
                // 筛选出cache中没有的服务, 添加到cache
                val listNewInnerService = listInnerService.filter {!DubboServiceCache.contains(it, env) }
                listNewInnerService.forEach{
                    ZkCommandLineRunner.get(env)?.updateProvidersToCache(it)
                }
            }
        }
    }
    

    ZkProviderListener监听已有服务的上线、下线

    class ZkProviderListener(val env : String) : IZkChildListener {
        private val moduleFactory : ModuleFactory by lazy {  SpringContextUtil.getBean("moduleFactory") as ModuleFactory}
    
        override fun handleChildChange(path : String?, providersUrl: MutableList<String>?) {
            println("--------This is ZkProviderListener-------")
    
            // providerUrl为null或长度为0,说明该服务没有提供方
            val tmpUrls = providersUrl ?: listOf<String>()
            if (path.isNullOrBlank()){
                return
            }
            // path : /dubbo/xxxxx/providers
            val serviceName = path.substring(7, path.lastIndexOf("/"))
            ZkCommandLineRunner.get(env)?.updateProvidersToCache(serviceName, tmpUrls)
            moduleFactory.updateModuleState(serviceName, env)
            // 没法应对参数类型不变,但参数意义改变的情况。
        }
    }
    

    ZkCommandLineRunner负责启动时读取dubbo服务以及添加上述的监听器

    /**
     * 服务启动时自动到Zookeeper读取所有dubbo服务,然后筛选出与KEY_WORD相关的服务
     */
    class ZkCommandLineRunner private constructor(private val env : String){
        private val zkServer : String
        private lateinit var zkClient : ZkClient
        private val listJob = mutableListOf<Job>()
        private var inited = false
    
        companion object{
            private val listRunner : List<ZkCommandLineRunner> by lazy { listOf(ZkCommandLineRunner("dev"), ZkCommandLineRunner("fat")) }
            fun get(env : String) : ZkCommandLineRunner?{
                if (env == "dev"){
                    return listRunner[0]
                }
                if (env =="fat"){
                    return listRunner[1]
                }
                return null
            }
            private var KEY_WORD = ".databank."
            fun getKeyWord() : String{
                return KEY_WORD
            }
        }
    
        init {
            val zkHost = ApolloConfigReaderUtils.getString("${env}.zookeeper.hostName")
            val zkPort = ApolloConfigReaderUtils.getString("${env}.zookeeper.port")
            zkServer = "$zkHost:$zkPort"
            KEY_WORD = ApolloConfigReaderUtils.getString("${env}.key_word")
        }
    
        fun init() {
            zkClient = ZkClient(zkServer, 5000, 5000, MyZkSerializer())
            // 监听是否有新增的服务
            zkClient.subscribeChildChanges("/dubbo", ZkServiceListener(env))
            val listInnerService = getServiceListAndInitCache()
            val zkProviderListener = ZkProviderListener(env)
            listInnerService.forEach{
                // 监听已有服务的上下线、更新
                zkClient.subscribeChildChanges("/dubbo/$it/providers", zkProviderListener)
                updateProvidersToCache(it)
            }
            inited = true
        }
    
        private fun getServiceListAndInitCache() : List<String>{
            // 加锁,防添加service时zookeeper有service上下线
            synchronized("$env/dubbo".intern()){
                val dubboChildren = zkClient.getChildren("/dubbo")
                val listInnerService = dubboChildren.filter { it.contains(KEY_WORD) }
                ProviderEditor.initCache(listInnerService, env)
                return listInnerService
            }
        }
    
        // 该函数在启动时调用
        fun updateProvidersToCache(serviceName : String){
            // 加锁,防添加providers的同时zookeeper上providers的变化
            synchronized("$env/$serviceName".intern()){
                val listProviderUrl = zkClient.getChildren("/dubbo/$serviceName/providers")
                updateProvidersToCache(serviceName, listProviderUrl)
            }
        }
    
        // 该函数在ZkProviderListener监听到provider变化时调用
        fun updateProvidersToCache(serviceName: String, listProviderUrl : List<String>){
            // 加锁,防添加providers的同时zookeeper上providers的变化
            synchronized("$env/$serviceName".intern()){
                val listProvider = ProviderEditor.updateProviders(serviceName, listProviderUrl, env)
                listProvider.forEach{
                    initProviderMethods(it)
                }
            }
        }
    
        // 判断协程是否全部执行完毕
        fun isCompleted() : Boolean{
            if (listJob.isEmpty()){
                return true
            }
            listJob.forEach{
                if (!it.isCompleted){
                    return false
                }
            }
            listJob.clear()
            return true
        }
    
        private fun initProviderMethods(provider : Provider){
            // 新建协程,分别用socket到dubbo服务方拿方法信息
            val job = GlobalScope.launch {
                val socket = AccessDubboProviderSocket(provider.host, provider.port)
                val strMethods = socket.getMethodsByInterfaceName(provider.serviceName)
                strMethods.forEach{
                    val splitSpace = it.split(" ")
                    val returnType = splitSpace[0]
                    val splitLeftParenthesis = splitSpace[1].split("(")
                    val methodName = splitLeftParenthesis[0]
                    val parameterTypes = splitLeftParenthesis[1].substring(0, splitLeftParenthesis[1].lastIndexOf(")")).split(",")
                    provider.methods.add(Method(methodName, returnType, parameterTypes))
                }
            }
            listJob.add(job)
        }
    
        fun isInited() : Boolean{
            return inited
        }
    
    }
    

    3.直接连接dubbo服务提供方,获取方法参数的socket

    关于连接dubbo服务方后可执行的命令,可查看:
    https://dubbo.gitbooks.io/dubbo-user-book/content/references/telnet.html

    /**
     * dubbo使用2.7之前的版本 或者2.7之后的版本但没有使用<dubbo:metadata-report>标签,zookeeper不会存接口方法的具体信息
     * 该类使用socket的方式直接连接duboo服务提供方,获取接口中方法的参数类型、返回类型
     *
     */
    class AccessDubboProviderSocket(val host: String, val port: Int) {
    
        // 通过接口名获取该接口所有方法
        fun getMethodsByInterfaceName(interfaceName : String): List<String> {
            val socket = Socket()
            socket.connect(InetSocketAddress(host, port), 3000)
            val writer = PrintWriter(socket.getOutputStream())
            val input = socket.getInputStream()
            try {
                writer.println("ls -l $interfaceName")
                writer.flush()
                val strRead = getReturnString(input)
                val strSplit = strRead.split("\r\n")
                writer.println("exit")
                writer.flush()
                // strRead的最后会有一行 "dubbo>" ,将其过滤掉
                return strSplit.subList(0, strSplit.size -1 )
            } finally {
                input.close()
                writer.close()
                socket.close()
            }
        }
    
    
        // 将dubbo 提供方返回的信息全部read,返回一个字符串String
        private fun getReturnString(input: InputStream): String {
            val strBuilder = StringBuilder()
            val byteArray = ByteArray(512)
            input.read(byteArray)
            var str = String(byteArray).trim('\u0000')
            strBuilder.append(str)
            while (!strBuilder.endsWith("dubbo>")) {
                // byteArray全元素置0, 否则上次read到 "abcdefg" 这次read到"123" ,byteArray的结果是 "123defg",后半不会被覆盖掉
                byteArray.fill(0, 0, byteArray.size)
                input.read(byteArray)
                str = String(byteArray).trim('\u0000')
                strBuilder.append(str)
            }
            return strBuilder.toString()
        }
    }
    

    4. 保存监听到的服务

    /**
     * 该类用于操作DubboServiceCache中的provider
     */
    object ProviderEditor {
    
        // 此方法往cache内put空的列表是为了表示 "cache已经知道zookeeper上有该服务", 这样zookeeper监听/duubo路径时可以通过cache判断某服务之前是否存在
        fun initCache(listService: List<String>, env: String) {
            DubboServiceCache.clear(env)
            listService.forEach {
                DubboServiceCache.put(it, mutableListOf(), env)
            }
        }
    
        fun updateProviders(serviceName: String, listProviderUrl: List<String>, env: String): List<Provider> {
            val listProvider: MutableList<Provider> = mutableListOf()
            listProviderUrl.forEach {
                val hostPort = UrlUtils.getHostAndPort(it)
                val hostPortSplit = hostPort.split(":")
                try {
                    val host = hostPortSplit[0]
                    val port = hostPortSplit[1].toInt()
                    val map = UrlUtils.getUrlParameters(it)
                    val version = map["version"] ?: ""
                    val group = map["group"] ?: ""
                    // 不知为何有时会有地址、端口号、版本都相同的、重复的提供者,只好过滤一下
                    val flag: Boolean = listProvider.none { it.host == host && it.port == port && it.version == version }
                    if (flag) {
                        listProvider.add(Provider(host, port, version, mutableListOf(), serviceName, group))
                    }
                } catch (e: Exception) {
                    e.printStackTrace()
                }
            }
            val tempList = listProvider.sortedBy { it.group }
            val listProviderOrderByVersion = tempList.sortedByDescending { it.version }
            DubboServiceCache.put(serviceName, listProviderOrderByVersion, env)
            return listProviderOrderByVersion
        }
    
    }
    

    5. zookeeper URL解析工具(噗,原来是要进行URLDecode,当时不知道。。。)

    object UrlUtils {
        // 从zookeeper获取的url ,符号都用 % + 符号的ascii码 表示
        fun getUrlParameters(url: String): Map<String, String> {
            val map = mutableMapOf<String, String>()
            if (url.isNullOrBlank()) {
                return map
            }
            val tmpUrl = url.trim()
            if (tmpUrl.isNullOrBlank()) {
                return map
            }
            // %3F ?
            val urlParts = tmpUrl.split("%3F")
            if (urlParts.size == 1) {
                return map
            }
            // %26 &
            val params = urlParts[1].split("%26")
            params.forEach {
                // %3D =
                val keyValue = it.split("%3D")
                map[keyValue[0]] = keyValue[1]
            }
    
            return map
        }
    
        fun getHostAndPort(url: String): String {
            // %2F /
            val strHostAndPort = url.substring(url.indexOf("%2F%2F") + 6, url.lastIndexOf("%2F"))
            // $3A :
            return strHostAndPort.replace("%3A", ":")
        }
    }
    

    相关文章

      网友评论

          本文标题:dubbo 不依赖提供者的sdk对服务进行监听和调用

          本文链接:https://www.haomeiwen.com/subject/gzgdectx.html