美文网首页
SOFA Bolt源码分析 1-简单的服务启动和服务端启动设计

SOFA Bolt源码分析 1-简单的服务启动和服务端启动设计

作者: 折浪君 | 来源:发表于2019-02-24 23:28 被阅读0次

    一、启动方式

    1.业务逻辑处理器定义

    /**

    * 自定义的业务逻辑用户处理器

    * SyncUserProcessor属于同步,soaf bolt还提供了异步的方式AsyncUserProcessor

    * 二者的区别在于,前者需要在当前处理线程以return返回值的形式返回处理结果;而后者,有一个 AsyncContext 客户端的调用,调用 sendResponse 方法,内部通过穿件RCMD对象,可以在当前线程,也可以在异步线程,返回处理结果

    * 如果一个处理器需要对多种数据模型感兴趣,两种方式,一种是使用基类的方式处理,第二种方式通过MultiInterestUserProcessor

    */

    public class TestServerUserProcessorextends SyncUserProcessor {

    @Override

        public Object handleRequest(BizContext bizCtx, TestRequest request)throws Exception {

    TestResponse response =new TestResponse();

    if (request !=null) {

    System.out.println(request);

    response.setResp("from server -> " + request.getReq());

    }

    return response;

    }

    /**

    * 业务处理器感兴趣的消息体,

    */

        @Override

        public String interest() {

    return TestRequest.class.getName();

    }

    }

    2.服务端启动

    //创建RpcServer实例,指定监听ip和port,指定是否使用链接管理器

    RpcServer server =new RpcServer("127.0.0.1",8888,false);

    //注册业务逻辑处理器 UserProcessor

    server.registerUserProcessor(new TestServerUserProcessor());

    //启动服务端

    server.start();

    3.server.start()原理分析

    server.start()主要有两步

    =doInit();

    ==this.addressParser =new RpcAddressParser();//初始化地址解析器

    ==this.connectionEventHandler =new ConnectionEventHandler(switches());//初始化链接事件处理器,主要负责入站出站数据的处理

    ==this.connectionEventHandler.setConnectionManager(this.connectionManager);//添加初始化的连接管理器到链接事件处理器

    ==this.connectionEventHandler.setConnectionEventListener(this.connectionEventListener);//添加时间监听器

    ==initRpcRemoting();//初始化远程调用对象RpcServerRemoting

    ==this.bootstrap =new ServerBootstrap();

        this.bootstrap.group(bossGroup,workerGroup)

        .channel(NettyEventLoopUtil.getServerSocketChannelClass())

    .   option(ChannelOption.SO_BACKLOG, ConfigManager.tcp_so_backlog())//tcp等待队列大小,默认1024

        .option(ChannelOption.SO_REUSEADDR, ConfigManager.tcp_so_reuseaddr())//端口快速释放,默认true

        .childOption(ChannelOption.TCP_NODELAY, ConfigManager.tcp_nodelay())

        .childOption(ChannelOption.SO_KEEPALIVE, ConfigManager.tcp_so_keepalive());

    ==initWriteBufferWaterMark();//设置出站的低水位和高水位

    ==if (ConfigManager.netty_buffer_pooled()) { this.bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); } else { this.bootstrap.option(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT) .childOption(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT); }//根据配置决定是否使用对象池化

    ==NettyEventLoopUtil.enableTriggeredMode(bootstrap)//设置selector的epoll模式是边缘触发还是水平触发,默认水平触发

    ==this.bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override     protected void initChannel(SocketChannel channel) { ChannelPipeline pipeline =     channel.pipeline(); pipeline.addLast("decoder", codec.newDecoder());     pipeline.addLast("encoder", codec.newEncoder());if (idleSwitch) {     pipeline.addLast("idleStateHandler", new IdleStateHandler(0, 0, idleTime,     TimeUnit.MILLISECONDS));pipeline.addLast("serverIdleHandler", serverIdleHandler); } pipeline.addLast("connectionEventHandler", connectionEventHandler); pipeline.addLast("handler", rpcHandler); createConnection(channel); } /** * create connection operation<br> * <ul> * <li>If flag manageConnection be true, use {@link DefaultConnectionManager} to add a new connection, meanwhile bind it with the channel.</li> * <li>If flag manageConnection be false, just create a new connection and bind it with the channel.</li> * </ul> */ private void createConnection(SocketChannel channel) { Url url = addressParser.parse(RemotingUtil.parseRemoteAddress(channel)); if (switches().isOn(GlobalSwitch.SERVER_MANAGE_CONNECTION_SWITCH)) { connectionManager.add(new Connection(channel, url), url.getUniqueKey()); } else { new Connection(channel, url); } channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT); } });//添加加密、解密、心跳(原理后面讲)、远程请求(原理后面讲)处理器,并创建一个和监听端口绑定的链接(业务层面的绑定)

    =doStart();

    ==this.channelFuture =this.bootstrap.bind(new InetSocketAddress(ip(), port())).sync();//绑定指定的端口

    相关文章

      网友评论

          本文标题:SOFA Bolt源码分析 1-简单的服务启动和服务端启动设计

          本文链接:https://www.haomeiwen.com/subject/gmakyqtx.html