美文网首页
详解Dubbo(八):服务提供端之传输层

详解Dubbo(八):服务提供端之传输层

作者: 空挡 | 来源:发表于2020-07-05 12:24 被阅读0次

前言

接上一篇中的服务暴露逻辑,当接口使用Dubbo协议暴露服务时,会启动一个DubboServer来监听消费端的请求,端口的绑定和监听都是通过Exchanger层来实现的。关于Exchanger和Transporter层的原理,在前面讲Consumer的时候讲的比较清楚了,这篇主要讲下跟Consumer端逻辑不同的部分。

Exchanger层实现

Handler实现

上一篇DubboProtocol中在暴露服务时会创建一个ExchangeServer,调用的是Exchangers.bind(url, handler)方法。handler参数就是请求来的时候Dubbo协议的处理逻辑。所以对于每个协议都有一个handler实现。Dubbo协议对Handler的实现逻辑如下:

private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
       @Override
        public void received(Channel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                reply((ExchangeChannel) channel, message);

            } else {
                super.received(channel, message);
            }
        }
}

当Consumer的调用请求来的时候,会回调handler.received()方法,在received方法中,直接判断收到的是不是Invocation,如果是的话调用reply()返回结果。

       @Override
        public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
            ...
            ...
            Invocation inv = (Invocation) message;
            //更加invocation参数获取服务Invoker
            Invoker<?> invoker = getInvoker(channel, inv);
            // callback调用的校验
            if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
                 ....
            }
            //调用的来源地址放到context中
            RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
            //用invoker调用本地方法实现
            Result result = invoker.invoke(inv);
            //结果异步返回
            return result.thenApply(Function.identity());
        }

上面代码的核心其实只有两步,一是根据调用参数找到export服务的Invoker,二是调用Invoker的invoke方法,根据上一篇讲的Invoker构造,会调到javassist生成的代理类,然后调到@Service注解的实现类的方法上。第一步的getInvoker()方法如下:

Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
       ...
       ...
       String serviceKey = serviceKey(port, path, inv.getAttachments().get(VERSION_KEY), inv.getAttachments().get(GROUP_KEY));
        DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
        if (exporter == null) {
              ...
              ...
        }
        return exporter.getInvoker();
}

接口暴露服务的时候会将exporter缓存在map中,调用请求根据key来获取exporter然后拿到Invoker

Exchanger实现

跟Consumer端一样,最终bind方法会调用到HeaderExchanger实现类。

public class HeaderExchanger implements Exchanger {
    @Override
    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }
}

在handler参数上又封装了2层handler,这个也跟Consumer层没有区别,最终调用的Transporters.bind()方法,传入的handler链如下:

图1 handlers
DecodeHandler是判断收到的Request是不是实现了Decodeable接口,是的话会调用接口的decode()方法。而HeaderExchangeHandler则会在DubboProtocol.requestHandler()返回结果后,封装Response并调用channel.send()发给调用方。 这里要注意的是,虽然Client端和Server端都会用到这两个Handler的,但是在处理RequestResponse的时候,handler发挥的作用是不一样的。

Transporter

Exchange层在给Handler又封装了两层后,最终调用的Transporters.bind()方法,按照默认实现,Dubbo使用的传输层框架是netty4,所以实际调用的是NettyTransporter.bind()

public class NettyTransporter implements Transporter {
    @Override
    public RemotingServer bind(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyServer(url, listener);
    }
}
public class NettyServer extends AbstractServer implements RemotingServer{
    public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
        super(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME), ChannelHandlers.wrap(handler, url));
    }
}

Transporter中初始化一个NettyServer,构造函数中又对handler做了一次wrap,这里跟Client调用的同一个方法,wrap后的handler链变成了如下图的样子:

图2 Handler链
上图中具体Handler的作用Consumer Transporter部分也讲过了,不再重复。直接进入NettyServer.doOpen()方法,开始监听客户端请求:
    @Override
    protected void doOpen() throws Throwable {
        bootstrap = new ServerBootstrap();
        
        bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
        workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
                new DefaultThreadFactory("NettyServerWorker", true));
        //这个Handler中缓存了所有已经建立的channel
        final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
        channels = nettyServerHandler.getChannels();

        bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                        if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
                            ch.pipeline().addLast("negotiation",
                                    SslHandlerInitializer.sslServerHandler(getUrl(), nettyServerHandler));
                        }
                        ch.pipeline()
                                .addLast("decoder", adapter.getDecoder())   //Dubbo协议解码
                                .addLast("encoder", adapter.getEncoder())  //Dubbo协议编码
                                .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
                                .addLast("handler", nettyServerHandler);
                    }
                });
        // bind
        ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
        channelFuture.syncUninterruptibly();
        channel = channelFuture.channel();
    }

Netty的逻辑不多解释,直接看pipeline中添加的handler,当请求到达Server端时,InboundHandler会发挥作用,所以依次经过的handler的顺序如下:

  • ByteToMessageDecoder:由adapter.getDecoder()返回,负责把二进制数据转换成java对象,这里会使用export时传入的DubboCodec将二进制解码成Request
  • IdleStateHandler : 数据流入和流出都会经过
  • NettyServerHandler : 在接收数据时,使用channel和Request回调Dubbo的Handler,将数据交给后续handler处理,也就是前面图2中的handler链

总结

最后整理一下整个服务的暴露过程:
1)Spring启动的时候Dubbo会根据配置文件或者@Service注解找到哪些类需要暴露成远程服务。找到这些类之后,会把每个类都注册成一个Spring 的Bean,同时定义一个新的Bean,这个Bean的类型是ServiceBean,所以最终暴露的是ServiceBean
2)调用ServiceBean的export方法后,Dubbo会根据配置生成url,根据实现类生成一个Invoker代理。有了url就可以知道使用什么协议暴露服务在哪个端口,比如dubbo协议就会调用DubboProtocol的方法,使用netty框架启动一个server,将server的handler和Invoker绑定成一个exporter。
3)export成功后,将url注册到注册中心。当server收到请求时,根据请求参数找到exporter,调用exporter中的Invoker执行本地类的实现。
本篇文章的传输层部分讲的比较简单,因为大部分内容跟Consumer端是重复的,理解有困难可以查阅下消费端Exchenger和Transporter两篇文章的内容。

相关文章

  • 详解Dubbo(八):服务提供端之传输层

    前言 接上一篇中的服务暴露逻辑,当接口使用Dubbo协议暴露服务时,会启动一个DubboServer来监听消费端的...

  • 三.传输层

    第一节 传输层的基本服务 传输层功能 传输层的核心任务: 应用进程之间提供端到端的 逻辑通信服务回顾:只有主机才有...

  • 深入理解TCP和UDP

    简介 OSI七层模型中,传输层(Transport Layer)为应用进程提供端到端的通信服务。传输层的协议主要包...

  • TCP/IP 协议栈的几个问题

    一。传输层的主要功能是什么? 分割并重新组装上层提供的数据流,为数据流提供端到端的传输服务 二。传输层有哪些协议?...

  • 图解TCP/UDP

    初始传输层 传输层的作用是建立应用程序间的端到端连接,为数据传输提供可靠或不可靠的通信服务。 传输层有两个重要协议...

  • HTTP 详解

    1、经典五层模型 传输层: 向用户提供可靠的端到端服务。传输层向高层屏蔽了下层数据通信的细节。 应用层: 为应用软...

  • 2. Dubbo服务提供端启动流程

    Dubbo服务提供端启动时序图 首先通过一个时序图直观地看一下Dubbo服务提供端的启动流程 源码解析 Dubbo...

  • 计网期末考点整理(第5,6章)

    网络层通过路由选择算法,为IP分组从源主机到目的主机选择一条合适的传输路径,为传输层端—端数据传输提供服务。 IP...

  • Dubbo服务端

    Dubbo服务端的主要功能就是服务提供端向注册中心注册服务,这样消费端能够从注册中心获取相应的服务。 Dubbo ...

  • 3传输层

    3.1传输层服务 3.1.1传输层服务概述 传输层服务和协议 ■传输层协议为运行在不同Host上的进程提供了一种逻...

网友评论

      本文标题:详解Dubbo(八):服务提供端之传输层

      本文链接:https://www.haomeiwen.com/subject/cjauqktx.html