@SPI("dubbo")
public interface Protocol {
int getDefaultPort();
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
void destroy();
}
public abstract class AbstractProtocol implements Protocol {
protected final Logger logger = LoggerFactory.getLogger(getClass());
protected final Map<String, Exporter<?>> exporterMap = new ConcurrentHashMap<String, Exporter<?>>();
//TODO SOFEREFENCE
protected final Set<Invoker<?>> invokers = new ConcurrentHashSet<Invoker<?>>();
protected static String serviceKey(URL url) {
int port = url.getParameter(Constants.BIND_PORT_KEY, url.getPort());
return serviceKey(port, url.getPath(), url.getParameter(Constants.VERSION_KEY),
url.getParameter(Constants.GROUP_KEY));
}
protected static String serviceKey(int port, String serviceName, String serviceVersion, String serviceGroup) {
return ProtocolUtils.serviceKey(port, serviceName, serviceVersion, serviceGroup);
}
@Override
public void destroy()
}
/***
*抽象代理协议 会调用子类的doexport()方法 子类i有 http rest webservice
* RMI hessian
*/
public abstract class AbstractProxyProtocol extends AbstractProtocol {
}
/**
* dubbo protocol support.
*/
public class DubboProtocol extends AbstractProtocol {
public static final String NAME = "dubbo";
public static final int DEFAULT_PORT = 20880;
private static final String IS_CALLBACK_SERVICE_INVOKE = "_isCallBackServiceInvoke";
private static DubboProtocol INSTANCE;
private final Map<String, ExchangeServer> serverMap = new ConcurrentHashMap<String, ExchangeServer>(); // <host:port,Exchanger>
private final Map<String, ReferenceCountExchangeClient> referenceClientMap = new ConcurrentHashMap<String, ReferenceCountExchangeClient>(); // <host:port,Exchanger>
private final ConcurrentMap<String, LazyConnectExchangeClient> ghostClientMap = new ConcurrentHashMap<String, LazyConnectExchangeClient>();
private final ConcurrentMap<String, Object> locks = new ConcurrentHashMap<String, Object>();
private final Set<String> optimizers = new ConcurrentHashSet<String>();
//consumer side export a stub service for dispatching event
//servicekey-stubmethods
private final ConcurrentMap<String, String> stubServiceMethodsMap = new ConcurrentHashMap<String, String>();
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {Invoker<?> invoker = getInvoker(channel, inv);}
@Override
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
reply((ExchangeChannel) channel, message);
} else {
super.received(channel, message);
}
}
}
public DubboProtocol() {
INSTANCE = this;
}
public static DubboProtocol getDubboProtocol() {
if (INSTANCE == null) {
ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(DubboProtocol.NAME); // load
}
return INSTANCE;
}
public Collection<ExchangeServer> getServers() {
return Collections.unmodifiableCollection(serverMap.values());
}
public Collection<Exporter<?>> getExporters() {
return Collections.unmodifiableCollection(exporterMap.values());
}
Map<String, Exporter<?>> getExporterMap() {
return exporterMap;
}
private boolean isClientSide(Channel channel) {
InetSocketAddress address = channel.getRemoteAddress();
URL url = channel.getUrl();
return url.getPort() == address.getPort() &&
NetUtils.filterLocalHost(channel.getUrl().getIp())
.equals(NetUtils.filterLocalHost(address.getAddress().getHostAddress()));
}
Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
return exporter.getInvoker();
}
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
openServer(url);
optimizeSerialization(url);
return exporter;
}
private void openServer(URL url) {
// find server.
String key = url.getAddress();
//client can export a service which's only for server to invoke
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
if (isServer) {
ExchangeServer server = serverMap.get(key);
if (server == null) {
synchronized (this) {
server = serverMap.get(key);
if (server == null) {
~~~~~~~~~~~~~~~~~~~~~
serverMap.put(key, createServer(url));
}
}
} else {
// server supports reset, use together with override
server.reset(url);
}
}
}
private ExchangeServer createServer(URL url) {
// send readonly event when server closes, it's enabled by default
url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
// enable heartbeat by default
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
ExchangeServer server;
try {
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = url.getParameter(Constants.CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
@Override
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);
// create rpc invoker.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
private ExchangeClient[] getClients(URL url) {
// whether to share connection
boolean service_share_connect = false;
int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
// if not configured, connection is shared, otherwise, one connection for one service
if (connections == 0) {
service_share_connect = true;
connections = 1;
}
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
if (service_share_connect) {
clients[i] = getSharedClient(url);
} else {
clients[i] = initClient(url);
}
}
return clients;
}
/**
* Get shared connection
*/
private ExchangeClient getSharedClient(URL url) {
String key = url.getAddress();
ReferenceCountExchangeClient client = referenceClientMap.get(key);
if (client != null) {
if (!client.isClosed()) {
client.incrementAndGetCount();
return client;
} else {
referenceClientMap.remove(key);
}
}
locks.putIfAbsent(key, new Object());
synchronized (locks.get(key)) {
if (referenceClientMap.containsKey(key)) {
return referenceClientMap.get(key);
}
ExchangeClient exchangeClient = initClient(url);
client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap);
referenceClientMap.put(key, client);
ghostClientMap.remove(key);
locks.remove(key);
return client;
}
}
/**
* Create new connection
*/
private ExchangeClient initClient(URL url) {
// client type setting.
String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
// enable heartbeat by default
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
// BIO is not allowed since it has severe performance issue.
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported client type: " + str + "," +
" supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
}
ExchangeClient client;
try {
// connection should be lazy
if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
client = new LazyConnectExchangeClient(url, requestHandler);
} else {
client = Exchangers.connect(url, requestHandler);
}
} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
}
return client;
}
}
--------------------------------------------------------------------------------------------------------------------------------------------
/**
* Exporter 一个简单封装invoke的实体类 主要的方法是getinvoker 和unexport
*
*/
public interface Exporter<T> {
Invoker<T> getInvoker();
void unexport();
}
/**
*
*Exporter 实现类 下面的子类回调super.
*/
public abstract class AbstractExporter<T> implements Exporter<T> {
private final Invoker<T> invoker;
private volatile boolean unexported = false;
public AbstractExporter(Invoker<T> invoker) {
if (invoker == null)
throw new IllegalStateException("service invoker == null");
if (invoker.getInterface() == null)
throw new IllegalStateException("service type == null");
if (invoker.getUrl() == null)
throw new IllegalStateException("service url == null");
this.invoker = invoker;
}
@Override
public void unexport() {
if (unexported) {
return;
}
unexported = true;
getInvoker().destroy();
}
}
/**
* DubboExporter
*/
public class DubboExporter<T> extends AbstractExporter<T> {
private final String key;
private final Map<String, Exporter<?>> exporterMap;
public DubboExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) {
super(invoker);
this.key = key;
this.exporterMap = exporterMap;
}
@Override
public void unexport() {
super.unexport();
exporterMap.remove(key);
}
}
Protocol -协议下面有很多的实现类 总体来说2个功能 通过接口class和url new出一个invoke
rest协议和dubbo协议实现类型不一样
refer返回一个invoke invoke上篇写道是一个target类的保证可以调invoke方法调用正在的现实类
export 需要的是一个invoker作为参数 invoker可以获取url 通过url具体可以暴露出服务 不管rest服务也好 还是dubbo服务
Exporter 一个简单封装invoke的实体类 主要的方法是getinvoker 和unexport
网友评论