美文网首页
Soul源码阅读 网关集群测试【第十一天】

Soul源码阅读 网关集群测试【第十一天】

作者: cutieagain | 来源:发表于2021-01-26 20:52 被阅读0次

    启动网关代理集群

    idea中运行多个相同的服务,需要打开allow parallel run


    image.png

    admin1 配置

    server:
      port: 9095
      address: 0.0.0.0
    

    admin2 配置

    server:
      port: 9096
      address: 0.0.0.0
    

    Bootstrap1 配置

    server:
      port: 9195
      address: 0.0.0.0
    
    soul:
        sync:
          websocket :
               urls: ws://localhost:9095/websocket,ws://localhost:9096/websocket
    

    Bootstrap2 配置

    server:
      port: 9196
      address: 0.0.0.0
    
    soul:
        sync:
          websocket :
               urls: ws://localhost:9095/websocket,ws://localhost:9096/websocket
    

    依次按上述配置顺序启动
    启动没有问题

    启动soul-examples-http1

    server:
      port: 8188
      address: 0.0.0.0
    
    soul:
      http:
        adminUrl: http://localhost:9095
        port: 8188
    

    启动soul-examples-http2

    server:
      port: 8189
      address: 0.0.0.0
    
    soul:
      http:
        adminUrl: http://localhost:9096
        port: 8189
    

    访问 http://localhost:9195/http/order/findById?id=1
    访问 http://localhost:9196/http/order/findById?id=1
    返回

    {
        "code": 500,
        "message": "Internal Server Error"
    }
    

    具体堆栈信息如下

    2021-01-26 17:30:13.276 ERROR 59984 --- [oul-netty-nio-3] o.d.soul.web.handler.GlobalErrorHandler  : [b2ac814c] Resolved [NullPointerException: null] for HTTP GET /http/order/findById
    2021-01-26 17:30:13.297 ERROR 59984 --- [oul-netty-nio-3] a.w.r.e.AbstractErrorWebExceptionHandler : [b2ac814c]  500 Server Error for HTTP GET "/http/order/findById?id=1"
    
    java.lang.NullPointerException: null
        at org.dromara.soul.metrics.facade.handler.MetricsTrackerHandler.handlerCounter(MetricsTrackerHandler.java:200) ~[classes/:na]
        Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
    Error has been observed at the following site(s):
        |_ checkpoint ⇢ org.dromara.soul.web.configuration.ErrorHandlerConfiguration$1 [DefaultWebFilterChain]
        |_ checkpoint ⇢ org.dromara.soul.web.filter.WebSocketParamFilter [DefaultWebFilterChain]
        |_ checkpoint ⇢ org.dromara.soul.web.filter.FileSizeFilter [DefaultWebFilterChain]
        |_ checkpoint ⇢ org.dromara.soul.bootstrap.filter.HealthFilter [DefaultWebFilterChain]
        |_ checkpoint ⇢ org.springframework.boot.actuate.metrics.web.reactive.server.MetricsWebFilter [DefaultWebFilterChain]
        |_ checkpoint ⇢ HTTP GET "/http/order/findById?id=1" [ExceptionHandlingWebHandler]
    Stack trace:
            at org.dromara.soul.metrics.facade.handler.MetricsTrackerHandler.handlerCounter(MetricsTrackerHandler.java:200) ~[classes/:na]
            at org.dromara.soul.metrics.facade.handler.MetricsTrackerHandler.counterInc(MetricsTrackerHandler.java:93) ~[classes/:na]
            at org.dromara.soul.metrics.facade.MetricsTrackerFacade.counterInc(MetricsTrackerFacade.java:82) ~[classes/:na]
            at org.dromara.soul.web.handler.SoulWebHandler.handle(SoulWebHandler.java:71) ~[classes/:na]
            at org.springframework.web.server.handler.DefaultWebFilterChain.lambda$filter$0(DefaultWebFilterChain.java:122) ~[spring-web-5.2.2.RELEASE.jar:5.2.2.RELEASE]
            at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:44) [reactor-core-3.3.1.RELEASE.jar:3.3.1.RELEASE]
            at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:55) ~[reactor-core-3.3.1.RELEASE.jar:3.3.1.RELEASE]
            at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) [reactor-core-3.3.1.RELEASE.jar:3.3.1.RELEASE]
            at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150) ~[reactor-core-3.3.1.RELEASE.jar:3.3.1.RELEASE]
            at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67) ~[reactor-core-3.3.1.RELEASE.jar:3.3.1.RELEASE]
            at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2186) ~[reactor-core-3.3.1.RELEASE.jar:3.3.1.RELEASE]
            at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:1994) ~[reactor-core-3.3.1.RELEASE.jar:3.3.1.RELEASE]
            at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:1868) ~[reactor-core-3.3.1.RELEASE.jar:3.3.1.RELEASE]
            at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54) ~[reactor-core-3.3.1.RELEASE.jar:3.3.1.RELEASE]
            at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:55) ~[reactor-core-3.3.1.RELEASE.jar:3.3.1.RELEASE]
            at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) [reactor-core-3.3.1.RELEASE.jar:3.3.1.RELEASE]
            at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:55) ~[reactor-core-3.3.1.RELEASE.jar:3.3.1.RELEASE]
            at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) [reactor-core-3.3.1.RELEASE.jar:3.3.1.RELEASE]
            at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:55) ~[reactor-core-3.3.1.RELEASE.jar:3.3.1.RELEASE]
            at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) [reactor-core-3.3.1.RELEASE.jar:3.3.1.RELEASE]
            at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:55) ~[reactor-core-3.3.1.RELEASE.jar:3.3.1.RELEASE]
            at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) [reactor-core-3.3.1.RELEASE.jar:3.3.1.RELEASE]
            at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:55) ~[reactor-core-3.3.1.RELEASE.jar:3.3.1.RELEASE]
            at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) [reactor-core-3.3.1.RELEASE.jar:3.3.1.RELEASE]
            at reactor.core.publisher.Mono.subscribe(Mono.java:4105) ~[reactor-core-3.3.1.RELEASE.jar:3.3.1.RELEASE]
            at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:172) ~[reactor-core-3.3.1.RELEASE.jar:3.3.1.RELEASE]
            at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.3.1.RELEASE.jar:3.3.1.RELEASE]
            at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:55) ~[reactor-core-3.3.1.RELEASE.jar:3.3.1.RELEASE]
            at reactor.netty.http.server.HttpServerHandle.onStateChange(HttpServerHandle.java:64) ~[reactor-netty-0.9.2.RELEASE.jar:0.9.2.RELEASE]
            at reactor.netty.tcp.TcpServerBind$ChildObserver.onStateChange(TcpServerBind.java:226) ~[reactor-netty-0.9.2.RELEASE.jar:0.9.2.RELEASE]
            at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:441) ~[reactor-netty-0.9.2.RELEASE.jar:0.9.2.RELEASE]
            at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:89) ~[reactor-netty-0.9.2.RELEASE.jar:0.9.2.RELEASE]
            at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final]
            at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final]
            at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final]
            at reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:167) ~[reactor-netty-0.9.2.RELEASE.jar:0.9.2.RELEASE]
            at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final]
            at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final]
            at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final]
            at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final]
            at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:326) ~[netty-codec-4.1.43.Final.jar:4.1.43.Final]
            at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:300) ~[netty-codec-4.1.43.Final.jar:4.1.43.Final]
            at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final]
            at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final]
            at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final]
            at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final]
            at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final]
            at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final]
            at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final]
            at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final]
            at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final]
            at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final]
            at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final]
            at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final]
            at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514) ~[netty-transport-4.1.43.Final.jar:4.1.43.Final]
            at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1050) ~[netty-common-4.1.43.Final.jar:4.1.43.Final]
            at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.43.Final.jar:4.1.43.Final]
            at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.43.Final.jar:4.1.43.Final]
            at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_211]
    
    

    原因是开启了监控

    image.png

    关闭了之后,请求都可以成功了

    集群的话处理跟单机的有什么区别?

    从配置入手,主要是Bootstrap中的配置

    soul:
        sync:
          websocket :
               urls: ws://localhost:9095/websocket,ws://localhost:9096/websocket
    

    配置文件:WebsocketSyncDataConfiguration
    同步服务:WebsocketSyncDataService

    Bootstrap服务中会获取urls,遍历连接admin服务进行连接

    /**
         * Instantiates a new Websocket sync cache.
         *
         * @param websocketConfig      the websocket config
         * @param pluginDataSubscriber the plugin data subscriber
         * @param metaDataSubscribers  the meta data subscribers
         * @param authDataSubscribers  the auth data subscribers
         */
        public WebsocketSyncDataService(final WebsocketConfig websocketConfig,
                                        final PluginDataSubscriber pluginDataSubscriber,
                                        final List<MetaDataSubscriber> metaDataSubscribers,
                                        final List<AuthDataSubscriber> authDataSubscribers) {
            // 获取注册的url列表
            String[] urls = StringUtils.split(websocketConfig.getUrls(), ",");
            // 线程池
            executor = new ScheduledThreadPoolExecutor(urls.length, SoulThreadFactory.create("websocket-connect", true));
            // 新增插件订阅Websocket订阅者信息
            for (String url : urls) {
                try {
                    clients.add(new SoulWebsocketClient(new URI(url), Objects.requireNonNull(pluginDataSubscriber), metaDataSubscribers, authDataSubscribers));
                } catch (URISyntaxException e) {
                    log.error("websocket url({}) is error", url, e);
                }
            }
            try {
                for (WebSocketClient client : clients) {
                    // Websocket连接,30s
                    boolean success = client.connectBlocking(3000, TimeUnit.MILLISECONDS);
                    if (success) {
                        log.info("websocket connection is successful.....");
                    } else {
                        log.error("websocket connection is error.....");
                    }
                    // 定时连接同步配置信息
                    executor.scheduleAtFixedRate(() -> {
                        try {
                            if (client.isClosed()) {
                                boolean reconnectSuccess = client.reconnectBlocking();
                                if (reconnectSuccess) {
                                    log.info("websocket reconnect is successful.....");
                                } else {
                                    log.error("websocket reconnection is error.....");
                                }
                            }
                        } catch (InterruptedException e) {
                            log.error("websocket connect is error :{}", e.getMessage());
                        }
                    }, 10, 30, TimeUnit.SECONDS);
                }
                /* client.setProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("proxyaddress", 80)));*/
            } catch (InterruptedException e) {
                log.info("websocket connection...exception....", e);
            }
    
        }
    

    admin中就可以使用已经连接的WebSocket通道进行发送变动的配置信息
    DataSyncConfiguration中WebsocketListener的WebsocketDataChangedListener

    集群的话处理跟单机就是比单机的时候多推了几个配置信息,然后Bootstrap就根据配置信息进行代理就可以了。

    相关文章

      网友评论

          本文标题:Soul源码阅读 网关集群测试【第十一天】

          本文链接:https://www.haomeiwen.com/subject/idzgzktx.html