美文网首页
fescar源码分析-RPC模型

fescar源码分析-RPC模型

作者: do_young | 来源:发表于2019-01-30 14:53 被阅读212次

    通过fescar项目官方的这张图可以知道,RM及RM与TC之间需要进行远程通讯。

    TM&RM&Tc_COM.png
    而通过查看工程依赖及源码能了解到其远程通讯主要是依赖于netty的技术组件包来实现的RPC。具体的设计模型如下所示: image.png

    ChannelDuplexHandler

    从下图中可以了解到fescar对netty最主要的API依赖就是io.netty.channel.ChannelDuplexHandler
    它实现了基于Socket进行网络传输的基本操作,如:

    image.png
    • bind 端口绑定。
    • connect/disconnect 创建连接/关闭连接。
    • read 从端口连接中接收数据。
    • write 从端口连接中写入数据。
      等等......
    ChannelDuplexHandler.png

    AbstractRpcRemoting

    image.png

    AbstractRpcRemoting继承于ChannelDuplexHandler,主要是针对于fescar的业务场景再进行一次封装。如:

    • channelRead 过滤消息,并使用线程池异步对消息做业务处理。
    • 定义抽象方法dispatch,将消息的业务处理做进一步封装。(因为前端与后端对消息处理的逻辑不一样,交由子类来实现。)
    • sendAsycRequest 添加多线程异步请求及响应处理。
    • 对远程调用的数据结构进行封装,所有消息都需要实现MessageCodec接口。
    public interface MessageCodec {
        short getTypeCode();
        byte[] encode();
        boolean decode(ByteBuf in);
    }
    
    • 等等......

    AbstractRpcRemotingClient

    image.png

    AbstractRpcRemotingClient继承于AbstractRpcRemoting,主要是针对fescar的业务场景对客户端的RPC调用做进一步的业务封装。如:

    • 实现RemotingService接口,实现RPC服务开启及关闭的逻辑 。
    public interface RemotingService {
        void start();
        void shutdown();
    }
    
    • 实现ClientMessageSender接口,实现客户端RPC消息发送的功能。
    public interface ClientMessageSender {
        Object sendMsgWithResponse(Object msg, long timeout) throws TimeoutException;
        Object sendMsgWithResponse(String serverAddress, Object msg, long timeout) throws TimeoutException;
        Object sendMsgWithResponse(Object msg) throws TimeoutException;
        void sendResponse(long msgId, String serverAddress, Object msg);
    }
    
    • channelRead : 基于客户端消息处理逻辑做业务封装。
    • dispatch:针对客户端实现消息处理逻辑,(基于ClientMessageListener接口的实现来处理)。
            if (clientMessageListener != null) {
                String remoteAddress = NetUtil.toStringAddress(ctx.channel().remoteAddress());
                clientMessageListener.onMessage(msgId, remoteAddress, msg, this);
            }
    
    • init :初始化netty连接配置
    • 等等......

    TmRpcClient

    image.png

    TmRpcClient继承于AbstractRpcRemotingClient,针对事务管理器的RPC调用实现了以下功能:

    • 实现了RegisterMsgListener接口,对客户端注册到服务端的消息做监听处理。
    public interface RegisterMsgListener {
        void onRegisterMsgSuccess(String serverAddress, Channel channel, Object response, AbstractMessage requestMessage);
        void onRegisterMsgFail(String serverAddress, Channel channel, Object response, AbstractMessage requestMessage);
    }
    
    • 重载父类对ClientMessageSender接口实现,实现RPC客户端发送消息的功能。
    • 重载父类 init方法:基于配置创建线程池并使用NettyAPI创建RPC连接池。
    • 等等......

    RmRpcClient

    RmRpcClient与TmRpcClient都继承于AbstractRpcRemotingClient,基本要实现的功能与其类似,只是针对资源管理器逻辑有差异,这里不再赘述。

    image.png

    AbstractRpcRemotingServer

    image.png

    AbstractRpcRemotingServer继承于AbstractRpcRemoting,主要是针对fescar的业务场景对服务端的RPC调用做进一步的业务封装。如:

    • 实现RemotingService接口,实现RPC服务开启及关闭的逻辑 。
    • 实现RemotingServer接口,该接口暂无需要的实现方法。
    • 等等......

    RpcServer

    image.png

    RpcServer继承于AbstractRpcRemotingServer,针对事务协调管理器的RPC调用实现了以下功能:

    • 实现ServerMessageSender接口,实现服务端RPC消息发送的功能。
    public interface ServerMessageSender {
        void sendResponse(long msgId, Channel channel, Object msg);
        Object sendSyncRequest(String resourceId, String clientId, Object message, long timeout) throws IOException, TimeoutException;
        Object sendSyncRequest(String resourceId, String clientId, Object message) throws IOException, TimeoutException;
    }
    
    • channelRead : 基于服务端消息处理逻辑做业务封装。
    • dispatch:针对服务端实现消息处理逻辑(基于ServerMessageListener接口的实现来处理)。
        public void dispatch(long msgId, ChannelHandlerContext ctx, Object msg) {
            if (msg instanceof RegisterRMRequest) {
                serverMessageListener.onRegRmMessage(msgId, ctx, (RegisterRMRequest)msg, this,
                    checkAuthHandler);
            } else {
                if (ChannelManager.isRegistered(ctx.channel())) {
                    serverMessageListener.onTrxMessage(msgId, ctx, msg, this);
                } else {
                    try {
                        closeChannelHandlerContext(ctx);
    ......
    
    • init :初始化netty连接配置
    • 等等......

    相关文章

      网友评论

          本文标题:fescar源码分析-RPC模型

          本文链接:https://www.haomeiwen.com/subject/xoksjqtx.html