在上一篇文章之后,这两周时间主要是在工作之余对它进行各种重构,前期有不少考虑不周的地方,比如只能连接一个集群,比如QUEUE和TOPIC分开考虑等等。重构的过程充满了乐趣和挫败感,有时能想到很多有趣的点子,有些很有效,有些试验下来又发现不是那么回事。这篇文章简单记录一下重构过程中的几个逻辑点和考量点。
重要的几个类图
1. Broker对象池
使用一个Broker池(BrokerPool)来存放曾经使用过的Broker信息,这样一旦连接建立上,多个用户可以实现连接的复用。BrokerPool是一个单例,在Controller的每次获取信息时,都通过BrokerPool来获取Broker的实例,这些实例都存放在一个ConcurrentHashMap中,key为BrokerName。获取Broker时有以下逻辑:
- Map中不存在BrokerName对应的Broker时,自动创建一个
- Map中存在BrokerName对应的Broker时,判断JMX连接是否正常,如果JMX连接已经被断开,则自动恢复。
- 每次从Broker池中获取连接时,给Broker打上一个时间戳,标记该Broker最后一次被使用的时间loadTime。
- 定时线程定期遍历BrokerPool中的brokerMap,如果发现有Broker已经10分钟没有被调用过,则自动断开该Broker的连接。
2. 自动获取Broker的对象信息
在Broker中使用了一个getTargetInfo的方法,该方法的入参是类信息,返回TargetInfo,而TargetInfo就是所有的Info类的接口类(其实就是java bean)。其实对应的就是所有需要获取的信息。我们可以把AMQ的JMX接口中的主要信息分成几类:
- BrokerInfo:对应的ObjectName是
org.apache.activemq:type=Broker,brokerName=BROKERNAME
,也就是Broker基本信息的MBean。 - DestinationInfo:对应的是Queue和Topic的信息,ObjectName类似
org.apache.activemq:type=Broker,brokerName=BROKERNAME,destinationType=Queue,destinationName=QUEUENAME
- NetworkInfo:对应的是集群的连接信息
- ClientInfo:对应的是客户端连接的信息
- ConsumerInfo:对应每个destination的消费者的信息
- ProducerInfo:对应每个destination的生产者的信息
而后使用反射的机制,通过调用broker1.getTargetInfo(new BrokerInfo)方法,在方法内部对入参进行判断,并从JMX连接中获取到java bean中的所有private属性,再调用每个属性对应的set方法。
这里有个小技巧,所有属性的名称都设置成与JMX中MBean属性名相同就好,比如BrokerInfo中的当前连接数是ConnectionCount,属性名就可以定义为connectionCount。这样可以通过反射机制里的field.getDeclaredField
和method.invoke(clazz.newInstance(),"set"+fieldName,type)
等方法自动装配对应TargetInfo对象。
如果有一些对象的属性不希望进行自动获取,而是需要手工进行判断,比如ConnectionInfo中一个连接的端口信息(port),是需要通过从RemoteAddress中获取到的,可以通过把port属性定义为protected,而后加个判断,只对所有修饰符为private的属性需要获取对应的MBean信息即可。
这样做的好处是只需要通过在java bean对象里增加一个private属性,比如在BrokerInfo中增加一个private long memoryLimit,就可以让程序自动从JMX中获取该Broker的堆内存上限。
3. 集群池
与Broker池类似的,我们也需要有一个专门存放集群的池子ClusterPool,这个池子里存放的是集群的拓扑。这个集群池的创建是通过这样的方式来得到的:
- 每个Broker对象建立的时候,自动获取该Broker所连接的所有Broker,并将信息存放到ClusterPool的一个属性brokerClusterMap里,这时候会有几种可能:
1.1. 现有集群中已经存在该Broker A的信息,也就是其他Broker B对象创建的时候发现这台Broker A和它相连,记录了这个Broker A的信息,这个时候更新一下Broker A属于Broker B的集群即可;
1.2. 现有集群中不存在该Broker A的信息,那就创建一个新的Cluster,Cluster名字为Cluster_BrokerName_A,然后将与Broker A相连的Broker信息记入map里。 - 通过ClusterPool来获取Cluster的时候,判断Cluster是否已存在,如果存在就取出Cluster,否则需要进行判断:
2.1. 如果现有的brokerClusterMap中存在Cluster Name,意味着这个Cluster中曾经有一个Broker被访问过,就从这个Broker入手,从BrokerPool里获取到该Broker对象和所有与之相连的Broker对象,这个过程中需要不停更新brokerClusterMap。最终将Cluster中所有的Broker对象都创建出来,存入Cluster对象中,返回该Cluter对象。
2.2. 如果现有的brokerClusterMap中不存在Cluster Name,则解析Cluster Name,看看是否符合Cluster_BrokerName的规则,再从BrokerName里取出Broker信息,创建Broker。而后重复2.1步骤创建出集群中所有Broker对象。
emmm。。。写完我自己都有点晕。不过代码证明了这样的方式是可行哒。
集群池
进一步其实可以在Cluster中保存Broker之间的拓扑结构图,但是这里没用到,因为我的所有环境里都是两两互连的完全图模式。
4. 异步线程池
在一开始构建的时候,其实用的是单线程的方式,但是很快就能发现单线程严重影响MBean的获取速度。所以后来搞了个线程池,专门用于信息的获取。springboot里用线程池还挺简单的,直接用ThreadPoolTaskExecutor就行。然后在方法上注解@Async就可以让方法的执行使用线程池了。
这里碰到了一个问题,比如前台调用一个获取broker所有队列信息的请求,后台需要通过线程并发的方式从连接中获取到对应的信息,而后当所有信息获取成功以后才能返回前台数据。所以这里使用的模式是CountDownLatch的方式。将CountDownLatch发送给所有futuretask线程,执行完毕后latch.countDown(),通过latch.await进行超时的设置,而后将所有的信息组合成一个List发送给前端。其实就是最基本的多线程并发操作了。
造轮子的感受
自己造轮子的优点就是根据业务来自定义展示页面和展示维度,当然还有自我训练和提升。其实完全可以通过更简单的方法来实现监控,比如jmxtrans,更简单可以直接用jmx exporter+prometheus+grafana完成从采集到拉取数据再到展现。
目前这个工具已经完成了,找个时间往生产上放。下一步会试试prometheus这条路,作为架构师,在持续保持自己编码能力的同时,其他轮子和技术框架都要尝试一下。
最终效果图,由于涉及到公司内部的资料,所以打了点马赛克。
网友评论