美文网首页DeFiBus
DeFiBus的Rpc调用实现原理

DeFiBus的Rpc调用实现原理

作者: 晴天哥_王志 | 来源:发表于2021-07-25 23:13 被阅读0次

DeFiBus的Rpc调用

Rpc调用原理图

整个调用过程包含了两个消息的产生和消费过程。

  • 1.请求方产生请求消息,服务响应方消费这条请求消息。请求方根据服务提供方的协议将请求内容设置到消息体中,并将消息发送到Broker上。服务响应方订阅相应的Topic,从Broker上获取到请求消息并消费。

  • 2.服务响应方产生响应消息,请求方接收这条响应消息。服务响应方收到请求消息后,执行相应的处理,并将请求结果设置到响应消息的消息体中,将响应消息发送到Broker上。

  • 3.Broker接收响应消息的方式采用的是Broker推送的形式,而不是由Producer订阅的方式,从而使得响应消息能够精准回到发出请求消息的实例上。

  • 4.DeFiBus在每条请求消息中增加REPLY_TO属性来唯一标识每一个请求方实例。在创建响应消息时将REPLY_TO属性透传到响应消息中。Broker收到响应消息后根据REPLY_TO属性查找出对应的请求方实例的连接,将响应消息推送给该请求方实例。

DeFiBus的Producer

public class DeFiBusProducerImpl {

    public Message request(Message requestMsg, final SendCallback sendCallback, RRCallback rrCallback, long timeout)
        throws InterruptedException, RemotingException, MQClientException, MQBrokerException {

        boolean isAsyncRR = (rrCallback != null);

        final String uniqueRequestId = DeFiBusRequestIDUtil.createUniqueName("w");
        DefaultMQProducer producer = deFiBusProducer.getDefaultMQProducer();
        requestMsg.putUserProperty(DeFiBusConstant.KEY, DeFiBusConstant.PERSISTENT);
        requestMsg.putUserProperty(DeFiBusConstant.PROPERTY_RR_REQUEST_ID, uniqueRequestId);
        // 在请求的消息中增加了PROPERTY_MESSAGE_REPLY_TO属性
        requestMsg.putUserProperty(DeFiBusConstant.PROPERTY_MESSAGE_REPLY_TO, producer.buildMQClientId());
        requestMsg.putUserProperty(DeFiBusConstant.PROPERTY_MESSAGE_TTL, String.valueOf(timeout));

        final RRResponseFuture responseFurture = new RRResponseFuture(rrCallback, timeout);

        String topic = requestMsg.getTopic()
        ResponseTable.getRrResponseFurtureConcurrentHashMap().put(uniqueRequestId, responseFurture);
        if (isAsyncRR) {
          // 省略代码
        } else {
            publish(requestMsg, new SendCallback() {
               // 省略相关代码
            }, timeout);
            Message retMessage = responseFurture.waitResponse(timeout);
            ResponseTable.getRrResponseFurtureConcurrentHashMap().remove(uniqueRequestId);
            return retMessage;
        }
    }


    public String buildMQClientId() {
        StringBuilder sb = new StringBuilder();
        // 携带producer所在的IP地址
        sb.append(this.getClientIP());
        sb.append("@");
        sb.append(this.getInstanceName());
        if (!UtilAll.isBlank(this.unitName)) {
            sb.append("@");
            sb.append(this.unitName);
        }

        return sb.toString();
    }
}
  • DeFiBus的Producer请求消息中增加PROPERTY_MESSAGE_REPLY_TO属性来标识每一个请求方实例。
  • PROPERTY_MESSAGE_REPLY_TO包含请求方的IP地址和实例名字。

DeFiBus的Consumer

public class DeFiBusClientUtil {
    public static final Logger LOGGER = LoggerFactory.getLogger(DeFiBusClientUtil.class);

    public static Message createReplyMessage(MessageExt sourceMsg, byte[] content) {
        String cluster = sourceMsg.getUserProperty(DeFiBusConstant.PROPERTY_MESSAGE_CLUSTER);
        String replyTopic = DeFiBusConstant.RR_REPLY_TOPIC;
        if (!StringUtils.isEmpty(cluster)) {
            replyTopic = cluster + "-" + replyTopic;
        }

        Message msg = new Message();
        msg.setTopic(replyTopic);//回程topic
        msg.setBody(content);//body
        msg.putUserProperty(DeFiBusConstant.PROPERTY_MESSAGE_REPLY_TO, 
                     sourceMsg.getUserProperty(DeFiBusConstant.PROPERTY_MESSAGE_REPLY_TO));//回给谁
        msg.putUserProperty(DeFiBusConstant.PROPERTY_RR_REQUEST_ID, 
                    sourceMsg.getUserProperty(DeFiBusConstant.PROPERTY_RR_REQUEST_ID));//原uniqueId
        String sourceBroker = sourceMsg.getUserProperty(DeFiBusConstant.PROPERTY_MESSAGE_BROKER);
        if (!StringUtils.isEmpty(sourceBroker)) {
            msg.putUserProperty(DeFiBusConstant.PROPERTY_MESSAGE_BROKER, sourceBroker);//消息从哪个broker来
        }

        return msg;
    }
}
  • DeFiBus的Consumer在响应的报文体中同样携带PROPERTY_MESSAGE_REPLY_TO属性。
  • DeFiBus的Broker在收到Consumer的响应消息后会进行特殊处理。

DeFiBus的Broker

public class DeFiReplyMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {


    private RemotingCommand processReplyMessageRequest(final ChannelHandlerContext ctx, //
        final RemotingCommand request, //
        final SendMessageContext sendMessageContext, //
        final SendMessageRequestHeader requestHeader) throws RemotingCommandException {

        if (msgInner.getProperties() != null && DeFiBusConstant.REPLY.equals(msgInner.getProperties().get(DeFiBusConstant.KEY))) {
            // 获取发送者的消息Id
            String senderId = msgInner.getProperties().get(DeFiBusConstant.PROPERTY_MESSAGE_REPLY_TO);
            if (senderId == null) {
                // 省略相关代码
            } else {
                // 查找senderId对应的Chennel信息
                ClientChannelInfo clientChannelInfo = this.deFiBrokerController.getProducerManager().getClientChannel(senderId);
                if (clientChannelInfo == null || clientChannelInfo.getChannel() == null || !clientChannelInfo.getChannel().isActive()) {
                    // 省略相关代码
                } else {
                    Map<String, String> map = MessageDecoder.string2messageProperties(replyMessageRequestHeader.getProperties());
                    map.put(DeFiBusMessageConst.LEAVE_TIME, String.valueOf(System.currentTimeMillis()));
                    replyMessageRequestHeader.setProperties(MessageDecoder.messageProperties2String(map));

                    try {
                        this.deFiBrokerController.getPushReplyMessageExecutor().submit(new Runnable() {
                            @Override public void run() {
                                boolean isPushSuccess = deFiBrokerController.getDeFiBusBroker2Client()
                                   .pushRRReplyMessageToClient(clientChannelInfo.getChannel(), replyMessageRequestHeader, msgInner);
                                } 
                            }
                        });
                    } catch (RejectedExecutionException e) {
                    }
                }
            }
        }
    }
}
  • DeFiBus的Broker在收到Rpc的响应消息后通过线程池异步执行pushRRReplyMessageToClient发送Rpc消息。

相关文章

网友评论

    本文标题:DeFiBus的Rpc调用实现原理

    本文链接:https://www.haomeiwen.com/subject/opzimltx.html