NettyHandler#messageReceived
从NettyServer的doOpen得到处理流程是NettyHandler。
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
try {
handler.received(channel, e.getMessage());
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
}
}
handler的对象是NettyClient(MultiMessageHandler(HeartbeatHandler(AllDispatcher(DecodeHandler(HeaderExchangeHandler(ExchangeHandlerAdapter)))))),所以进入DubboProtocol$ExchangeHandlerAdapter的received方法
HeaderExchangeHandler.received
public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
if (message instanceof Request) {//请求方法
// handle request.
Request request = (Request) message;
if (request.isEvent()) {
handlerEvent(channel, request);
} else {
if (request.isTwoWay()) {//双向
handleRequest(exchangeChannel, request);
} else {
handler.received(exchangeChannel, request.getData());
}
}
} else if (message instanceof Response) {//响应方法
handleResponse(channel, (Response) message);
} else if (message instanceof String) {//字符串
if (isClientSide(channel)) {
Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
logger.error(e.getMessage(), e);
} else {
String echo = handler.telnet(channel, (String) message);
if (echo != null && echo.length() > 0) {
channel.send(echo);
}
}
} else {//接收
handler.received(exchangeChannel, message);
}
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
DubboProtocol$ExchangeHandlerAdapter.received
public void received(Channel channel, Object message) throws RemotingException {
//RpcInvocation
if (message instanceof Invocation) {
reply((ExchangeChannel) channel, message);
} else {
super.received(channel, message);
}
}
ExchangeHandlerAdapter#reply
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
Invocation inv = (Invocation) message;
//获取服务端对应的invoker
Invoker<?> invoker = getInvoker(channel, inv);
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
//调用方法
Result result = invoker.invoke(inv);
//结果返回
return result.completionFuture().thenApply(Function.identity());
}
DubboProtocol#getInvoker
Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
boolean isCallBackServiceInvoke = false;
boolean isStubServiceInvoke = false;
int port = channel.getLocalAddress().getPort();
//接口名称
String path = inv.getAttachments().get(PATH_KEY);
String serviceKey = serviceKey(port, path, inv.getAttachments().get(VERSION_KEY), inv.getAttachments().get(GROUP_KEY));
DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
return exporter.getInvoker();
}
AbstractProtocol#exporterMap
protected final Map<String, Exporter<?>> exporterMap;
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
//获取服务标识,理解成服务坐标也行。由服务组名,服务名,服务版本号以及端口组成如${group}/copm.my.practice.dubbo.ISayHelloService:${version}:20880
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
return exporter;
}
invoker.invoke(inv)
invoker=InvokerDelegate(DelegateProviderMetaDataInvoker(AbstractProxyInvoker()))
AbstractProxyInvoker
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
/**
* proxy:接口的实现类的对象
* type:接口的类型
* url: registry://ip:port...
*/
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
public Object invokeMethod (Object o, String n, Class[]p, Object[]v) throws
java.lang.reflect.InvocationTargetException {
com.my.dubbo.PayServiceImpl w;
try {
w = ((com.my.dubbo.PayServiceImpl) $1);
} catch (Throwable e) {
throw new IllegalArgumentException(e);
}
try {
if ("pay".equals($2) && $3.length == 1) {
return ($w) w.pay((java.lang.String) $4[0]);
}
} catch (Throwable e) {
throw new java.lang.reflect.InvocationTargetException(e);
}
throw new org.apache.dubbo.common.bytecode.NoSuchMethodException("Not found method \"" + $2 + "\" in class com.my.dubbo.PayServiceImpl.");
}
网友评论