美文网首页
vertx 实现动态 RPC

vertx 实现动态 RPC

作者: Double_winter | 来源:发表于2019-01-01 10:29 被阅读0次

    需求:替换ali lightApi 动态rpc的实现,因为api为商业版,不是开源的,是基于pandora 的EDAS平台的。那么我们如何实现开源的动态RPC呢?

    定义:动态RPC指的是,可以动态的让一个服务上线和下线,换句话说就是动态的从注册中心剔除服务,而不是停止这个服务,启动这个服务。这个需求是来自链路的原路返回

    简介 vertx:

    vertx 是一套全异步的基于netty通信的框架,Vertx中核心组建有 verticle, eventbus, circuit breaker,service discovery and register, Router等等。后期我会一一的去探索这里面的组件的原理。而本文介绍的vertx 的动态rpc,就是使用vertx proxy service 这个功能来实现的。另外我们得知道vertx 中的线程模型是基于netty的event loop,一个verticle 是一个微服务,而且具有HA的机制的微服务。服务之间的通信是根据event bus 的netty 通信机制。但是event bus 通信不是100%的可靠的(这点很要命,后期我会写博客探索的)。

    此外vertx需要依赖 分布式缓存建立集群(hazelcast, ignite等),基于gossip协议的p2p网络。

    探索过程:

    如果要实现需求的话,首先第一想到的还是spring cloud,dubbo,thrift 等RPC框架,hsf 是阿里pandora rpc 框架,肯定不能用。但是经一番折腾之后,没有可以让我觉得可以在代码里面直接让服务上线下线,从注册中心剔除掉或者注册到注册中心的。那怎么办呢?曾经也想过会用rabbitmq,这样的MQ 去做动态的RPC,但是发现很复杂,关键很难做scalablity,如果有上千个链路,就需要上千个topic,或者tag 这类的标记。感觉很复杂,也很难维护。

    solutions: vertx proxy service 

    这个方案上,我们不打算使用verticle的概念,仅仅使用vertx 代理服务的概念。首先我们要使用vertx 的服务发现和注册,其次我们要使用proxy service。

    第一个坑:代理服务的自动生成。请在pom或者gradle里面加入 生成代理的依赖(对接口的代理)。然后创建 package-info.java,在root package。所谓的root package就是基package,web developer 开的springboot 应该都很清楚,需要将app.class 建在基package,以便扫描都可以扫描到

        <groupId>io.vertx</groupId>

        <artifactId>vertx-service-proxy</artifactId>

        <classifier>processor</classifier>

        <groupId>io.vertx</groupId>

        <artifactId>vertx-codegen</artifactId>

        <classifier>processor</classifier>

    @ModuleGen(name ="ap-common-vertx", groupPackage ="com.xxxx.xx.xx.vertx")

    package com.xxxx.xx.xx.vertx;

    import io.vertx.codegen.annotations.ModuleGen;

    其中groupPackage ="com.xxxx.xx.xx.vertx"), 是基package, 也可以是 你定义interface 所在的包。完成之后,使用maven clean install, 你就会发现在target 目录下面会有 ...Proxy 的class,这两个class 就是代理类,对于原理,后面的博客我会慢慢分析。

    第二个坑是:服务发现publish 服务之后,需要注册服务代理,不然注册的服务,在event bus 上面是找不到的,看看代码怎么写吧:

    Record record = EventBusService.createRecord(servicePublishRequestBean.getServiceName(), servicePublishRequestBean.getEventBusAddress(), servicePublishRequestBean.getClazz());

    servicePublishRequestBean.getDiscovery().publish(record, ar -> {

    if (ar.succeeded() && ar.result() !=null) {

    publishedRecords.add(record);

            recordMessageConsumerMap.putIfAbsent(record, ProxyServiceUtil.registerProxyService(servicePublishRequestBean.getEventBusAddress(), servicePublishRequestBean.getClazz(), servicePublishRequestBean.getService()));

        }

    handler.handle(ar.map(ar.result()));

    });

    servicePublishRequestBean就是我封装的bean,里面de属性有:

    private StringserviceName;

    private StringeventBusAddress;

    private ServiceDiscoverydiscovery;

    private Classclazz;

    private T service;

    此外我们一定要注册这样的proxy service才能够生效,不然是没有用的:

    ProxyServiceUtil.registerProxyService(servicePublishRequestBean.getEventBusAddress(), servicePublishRequestBean.getClazz(), servicePublishRequestBean.getService()));

    看看里面怎么写的

    public static MessageConsumerregisterProxyService(String eventBusAddress, Class clazz, T service) {

    return ServiceBinderUtil.getBinderInstance()

    .setAddress(eventBusAddress)

    .register(clazz, service);

    }

    第三个坑是: 和第二个坑一样的,需要注销服务,注销服务的话,我们也要调用proxyService 去把服务注销了,而不能单纯的调用 service discovery and register unpublish 方法,这个是很恶心的:看看我怎么写的把,一些很细节的东西需要自己探索,我不会贴出所有的东西的。比如下面的discover record 有许多的状态,这里一不小心就会找不到你所发布的服务,另外注销代理服务,需要个奇怪的参数,这个参数我存储在map里面:recordMessageConsumerMap, 在服务publish时候,就会生成的。

    discovery.getRecord(info -> info.getName().equals(serviceName), res -> {

    if (res.succeeded() && res.result() !=null && res.result().getStatus() == Status.UP) {

    Record record = res.result();

                    discovery.unpublish(record.getRegistration(), ar -> {

    if(ar.succeeded()) {

    List records =recordMessageConsumerMap.keySet().stream().filter(map-> map.getName().equals(serviceName)).collect(Collectors.toList());

                            offlineRecord(records);

                        }

    handler.handle(ar.map((Void)null));

                    });

                }else {

    handler.handle(res.map((Void)null));

                }

    }

    );

    OfflineRecord 主要是调用ProxyServiceUtil.unregisterProxyService(recordMessageConsumerMap.get(records.get(0)));,其中unregisterProxyService 方法的实现如下,是不是比较简单?

    ServiceBinderUtil.getBinderInstance().unregister(consumer);

    然后基本上做到这里,vertx 动态的rpc 就可以实现了,主要是用了vertx 服务发现组件的,publish, unpublish方式,和proxy service 的 register和unregister方法。但是里面的坑比较多。此外除了上面所描述的坑之外,我还想告诉小伙伴们,event bus 通信exception 会出现一些奇怪的错误,所有event bus 通信也是个坑,其实proxy sevice 本质就是 event bus,所以event bus 网络连接不通的话,确实很头疼,经过我这边的测试,不管ecs 集群的部署,还是ecs 和docker 的混合部署,网络都是可以通的(我使用的是ignite分布式缓存)。暂时我先贴出来关于网络的代码,如果你们遇到了问题,先暂时按照我这个来,不会让你很恼火。

    @Bean

    public IgniteConfigurationgetIgniteSelfConfiguration()throws Exception{

    IgniteConfiguration igniteConfiguration =new IgniteConfiguration();

        igniteConfiguration.setClientMode(false);

        igniteConfiguration.setPeerClassLoadingEnabled(true);

        igniteConfiguration.setDeploymentMode(DeploymentMode.CONTINUOUS);

        igniteConfiguration.setPeerClassLoadingMissedResourcesCacheSize(0);

        igniteConfiguration.setDiscoverySpi(getTcpDiscoverySpi());

        igniteConfiguration.setCacheConfiguration(getCacheConfiguration());

      // igniteConfiguration.setLocalHost(IPUtil.getLocalIp());

        return igniteConfiguration;

    }

    @Bean

    public TcpDiscoverySpigetTcpDiscoverySpi()throws Exception{

    TcpDiscoverySpi tcpDiscoverySpi =new TcpDiscoverySpi();

        tcpDiscoverySpi.setIpFinder(getTcpDiscoveryMulticastIpFinder());

        tcpDiscoverySpi.setNetworkTimeout(10000);

        System.out.println("setting success for host");

        tcpDiscoverySpi.setLocalAddress(IPUtil.getLocalIp());

        return tcpDiscoverySpi;

    }

    @Bean

    public TcpDiscoveryMulticastIpFindergetTcpDiscoveryMulticastIpFinder(){

    TcpDiscoveryMulticastIpFinder tcpDiscoveryMulticastIpFinder =new TcpDiscoveryMulticastIpFinder();

        tcpDiscoveryMulticastIpFinder.setMulticastGroup("224.0.0.100");

        return tcpDiscoveryMulticastIpFinder;

    }

    @Bean

    public CacheConfigurationgetCacheConfiguration(){

    CacheConfiguration cacheConfiguration =new CacheConfiguration();

        cacheConfiguration.setName("myCache");

        cacheConfiguration.setCacheMode(CacheMode.PARTITIONED);

        cacheConfiguration.setBackups(1);

        return cacheConfiguration;

    }

    @Bean

    public VertxClusterStartervertxClusterStarter() {

    VertxClusterStarter vertxClusterStarter =new VertxClusterStarter();

        return vertxClusterStarter;

    }

    ClusterManager clusterManager =new IgniteClusterManager(igniteSelfConfiguration);

    TcpDiscoverySpi discoverySpi = (TcpDiscoverySpi)igniteSelfConfiguration.getDiscoverySpi();

    TcpDiscoveryMulticastIpFinder tcpDiscoveryMulticastIpFinder = (TcpDiscoveryMulticastIpFinder) discoverySpi.getIpFinder();

    tcpDiscoveryMulticastIpFinder.setAddresses(Arrays.asList(propertiesHolderUtils.getVertxClusterIps().split(CommonConstants.CHARACTER_SEPARATOR_COMMA)));

    VertxOptions vertxOptions =new VertxOptions().setClustered(true).setClusterHost(IPUtil.getLocalIp()).setClusterPort(Integer.valueOf(propertiesHolderUtils.getVertxClusterPorts().split(CommonConstants.CHARACTER_SEPARATOR_COMMA)[0])).setClusterManager(clusterManager);

    ServiceDiscoveryOptions discoveryOptions =new ServiceDiscoveryOptions();

    谢谢,希望对大家有所帮助,后面我会尝试探索event bus 通信的原理。

    相关文章

      网友评论

          本文标题:vertx 实现动态 RPC

          本文链接:https://www.haomeiwen.com/subject/kwtnlqtx.html