服务发布流程
dubbo rest 服务发布流程-
1、Dubbo Rest协议的服务发布流程如上图所示,包括创建Server、启动Server、部署服务的3个阶段。
-
2、创建Server阶段:RestProtocol通过createServer()创建Server对象,Server对象目前分为DubboHttpServer和NettyServer两类。
-
3、启动Server阶段:RestProtocol通过调用DubboHttpServer的start()方法进入启动过程,启动过程中通过httpBinder.bind()方法创建真正的监听请求的server,包括JettyHttpServer、ServletHttpServer、TomcatHttpServer。
-
4、部署服务阶段:部署服务阶段主要是把接口的定义部署到容器中构建请求mapping的过程。
RestProtocol类图
RestProtocol类图Rest协议Server类图
Server类图HttpBinder类图
HttpBinder类图HttpServer类图
HttpServer类图服务发布流程
服务主流程
public abstract class AbstractProxyProtocol extends AbstractProtocol {
@Override
@SuppressWarnings("unchecked")
public <T> Exporter<T> export(final Invoker<T> invoker) throws RpcException {
final String uri = serviceKey(invoker.getUrl());
Exporter<T> exporter = (Exporter<T>) exporterMap.get(uri);
if (exporter != null) {
return exporter;
}
// 不确定为啥需要执行proxyFactory.getProxy(invoker, true)????
// invoker对象已经是由impl转为invoker对象
final Runnable runnable = doExport(proxyFactory.getProxy(invoker, true), invoker.getInterface(), invoker.getUrl());
exporter = new AbstractExporter<T>(invoker) {
@Override
public void unexport() {
super.unexport();
exporterMap.remove(uri);
if (runnable != null) {
try {
runnable.run();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
}
};
exporterMap.put(uri, exporter);
return exporter;
}
}
public class RestProtocol extends AbstractProxyProtocol {
private static final int DEFAULT_PORT = 80;
private final Map<String, RestServer> servers = new ConcurrentHashMap<String, RestServer>();
private final RestServerFactory serverFactory = new RestServerFactory();
public void setHttpBinder(HttpBinder httpBinder) {
// HttpBinder$Adaptive
serverFactory.setHttpBinder(httpBinder);
}
@Override
protected <T> Runnable doExport(T impl, Class<T> type, URL url) throws RpcException {
String addr = getAddr(url);
Class implClass = (Class) StaticContext.getContext(Constants.SERVICE_IMPL_CLASS).get(url.getServiceKey());
RestServer server = servers.get(addr);
if (server == null) {
// 通过serverFactory创建Server对象
server = serverFactory.createServer(url.getParameter(Constants.SERVER_KEY, "jetty"));
// 启动Server的服务
server.start(url);
servers.put(addr, server);
}
String contextPath = getContextPath(url);
if ("servlet".equalsIgnoreCase(url.getParameter(Constants.SERVER_KEY, "jetty"))) {
ServletContext servletContext = ServletManager.getInstance().getServletContext(ServletManager.EXTERNAL_SERVER_PORT);
if (servletContext == null) {
throw new RpcException("No servlet context found. Since you are using server='servlet', " +
"make sure that you've configured " + BootstrapListener.class.getName() + " in web.xml");
}
String webappPath = servletContext.getContextPath();
if (StringUtils.isNotEmpty(webappPath)) {
webappPath = webappPath.substring(1);
if (!contextPath.startsWith(webappPath)) {
throw new RpcException("Since you are using server='servlet', " +
"make sure that the 'contextpath' property starts with the path of external webapp");
}
contextPath = contextPath.substring(webappPath.length());
if (contextPath.startsWith("/")) {
contextPath = contextPath.substring(1);
}
}
}
final Class resourceDef = GetRestful.getRootResourceClass(implClass) != null ? implClass : type;
// 部署服务到server端
server.deploy(resourceDef, impl, contextPath);
final RestServer s = server;
return new Runnable() {
@Override
public void run() {
// TODO due to dubbo's current architecture,
// it will be called from registry protocol in the shutdown process and won't appear in logs
s.undeploy(resourceDef);
}
};
}
}
- RestProtocol按照export() => doExport()进行服务导出操作。
- doExport()过程通过serverFactory.createServer()创建Server对象。
- doExport()过程通过server.start()启动Server对象。
- doExport()过程通过server.deploy()进行部署服务。
创建Server对象
public class RestServerFactory {
private HttpBinder httpBinder;
public void setHttpBinder(HttpBinder httpBinder) {
this.httpBinder = httpBinder;
}
public RestServer createServer(String name) {
// TODO move names to Constants
if ("servlet".equalsIgnoreCase(name) || "jetty".equalsIgnoreCase(name) || "tomcat".equalsIgnoreCase(name)) {
return new DubboHttpServer(httpBinder);
} else if ("netty".equalsIgnoreCase(name)) {
return new NettyServer();
} else {
throw new IllegalArgumentException("Unrecognized server name: " + name);
}
}
}
- 根据类型选择创建DubboHttpServer或NettyServer对象。
Server启动过程
public abstract class BaseRestServer implements RestServer {
@Override
public void start(URL url) {
// 设置Deployment的基本上属性
getDeployment().getMediaTypeMappings().put("json", "application/json");
getDeployment().getMediaTypeMappings().put("xml", "text/xml");
getDeployment().getProviderClasses().add(RpcContextFilter.class.getName());
getDeployment().getProviderClasses().add(RpcExceptionMapper.class.getName());
loadProviders(url.getParameter(Constants.EXTENSION_KEY, ""));
doStart(url);
}
@Override
public void deploy(Class resourceDef, Object resourceInstance, String contextPath) {
if (StringUtils.isEmpty(contextPath)) {
getDeployment().getRegistry().addResourceFactory(new DubboResourceFactory(resourceInstance, resourceDef));
} else {
getDeployment().getRegistry().addResourceFactory(new DubboResourceFactory(resourceInstance, resourceDef), contextPath);
}
}
protected void loadProviders(String value) {
for (String clazz : Constants.COMMA_SPLIT_PATTERN.split(value)) {
if (!StringUtils.isEmpty(clazz)) {
getDeployment().getProviderClasses().add(clazz.trim());
}
}
}
protected abstract ResteasyDeployment getDeployment();
protected abstract void doStart(URL url);
}
public class DubboHttpServer extends BaseRestServer {
private final HttpServletDispatcher dispatcher = new HttpServletDispatcher();
private final ResteasyDeployment deployment = new ResteasyDeployment();
private HttpBinder httpBinder; // httpBinderApative
private HttpServer httpServer;
public DubboHttpServer(HttpBinder httpBinder) {
this.httpBinder = httpBinder;
}
@Override
protected void doStart(URL url) {
// 创建httpServer对象,在创建过程中启动httpServer对象
httpServer = httpBinder.bind(url, new RestHandler());
ServletContext servletContext = ServletManager.getInstance().getServletContext(url.getPort());
if (servletContext == null) {
servletContext = ServletManager.getInstance().getServletContext(ServletManager.EXTERNAL_SERVER_PORT);
}
if (servletContext == null) {
throw new RpcException("No servlet context found. If you are using server='servlet', " +
"make sure that you've configured " + BootstrapListener.class.getName() + " in web.xml");
}
// 设置deployment属性
servletContext.setAttribute(ResteasyDeployment.class.getName(), deployment);
try {
// 初始化dispatcher对象的属性
dispatcher.init(new SimpleServletConfig(servletContext));
} catch (ServletException e) {
throw new RpcException(e);
}
}
@Override
protected ResteasyDeployment getDeployment() {
// 返回server的deployment的相关属性
return deployment;
}
private class RestHandler implements HttpHandler {
@Override
public void handle(HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
RpcContext.getContext().setRemoteAddress(request.getRemoteAddr(), request.getRemotePort());
dispatcher.service(request, response);
}
}
}
- 启动过程按照start() => doStart()流程进行启动。
- doStart()通过httpBinder.bind()会调用HttpBinder$Adaptive来选择HttpBinder,不同的HttpBinder会创建不同的Server对象,返回的对象JettyHttpServer/TomcatHttpServer
/ServletHttpServer。 - httpBinder.bind()方法的参数之一RestHandler,用于处理请求的入口函数,内部handler方法调用dispatcher.service()来处理请求。
- BaseRestServer的deploy()方法执行部署动作。
HttpBinder$Adaptive
public class HttpBinder$Adaptive implements HttpBinder {
public HttpServer bind(URL uRL, HttpHandler httpHandler) {
if (uRL == null) {
throw new IllegalArgumentException("url == null");
}
URL uRL2 = uRL;
String string = uRL2.getParameter("server", "jetty");
if (string == null) {
throw new IllegalStateException(new StringBuffer().append("Fail to get extension(com.alibaba.dubbo.remoting.http.HttpBinder) name from url(").append(uRL2.toString()).append(") use keys([server])").toString());
}
HttpBinder httpBinder = (HttpBinder)ExtensionLoader.getExtensionLoader(HttpBinder.class).getExtension(string);
return httpBinder.bind(uRL, httpHandler);
}
}
- httpBinder.bind()通过扩展方式会返回对应的httpBinder对象。
- httpBinder.bind()会返回对应的httpServer对象。
HttpBinder
public class JettyHttpBinder implements HttpBinder {
@Override
public HttpServer bind(URL url, HttpHandler handler) {
return new JettyHttpServer(url, handler);
}
}
public class TomcatHttpBinder implements HttpBinder {
@Override
public HttpServer bind(URL url, HttpHandler handler) {
return new TomcatHttpServer(url, handler);
}
}
public class ServletHttpBinder implements HttpBinder {
@Override
@Adaptive()
public HttpServer bind(URL url, HttpHandler handler) {
return new ServletHttpServer(url, handler);
}
}
- httpBinder对象包括ServletHttpBinder、TomcatHttpBinder、JettyHttpBinder三类。
- ServletHttpBinder.bind()返回ServletHttpServer对象。
- TomcatHttpBinder.bind()返回TomcatHttpServer对象。
- JettyHttpBinder.bind()返回JettyHttpServer对象。
HttpServer
public class TomcatHttpServer extends AbstractHttpServer {
private final Tomcat tomcat;
private final URL url;
public TomcatHttpServer(URL url, final HttpHandler handler) {
super(url, handler);
this.url = url;
DispatcherServlet.addHttpHandler(url.getPort(), handler);
String baseDir = new File(System.getProperty("java.io.tmpdir")).getAbsolutePath();
tomcat = new Tomcat();
tomcat.setBaseDir(baseDir);
tomcat.setPort(url.getPort());
tomcat.getConnector().setProperty(
"maxThreads", String.valueOf(url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS)));
String.valueOf(url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS)));
tomcat.getConnector().setProperty(
"maxConnections", String.valueOf(url.getParameter(Constants.ACCEPTS_KEY, -1)));
tomcat.getConnector().setProperty("URIEncoding", "UTF-8");
tomcat.getConnector().setProperty("connectionTimeout", "60000");
tomcat.getConnector().setProperty("maxKeepAliveRequests", "-1");
tomcat.getConnector().setProtocol("org.apache.coyote.http11.Http11NioProtocol");
Context context = tomcat.addContext("/", baseDir);
Tomcat.addServlet(context, "dispatcher", new DispatcherServlet());
context.addServletMapping("/*", "dispatcher");
ServletManager.getInstance().addServletContext(url.getPort(), context.getServletContext());
try {
tomcat.start();
} catch (LifecycleException e) {
throw new IllegalStateException("Failed to start tomcat server at " + url.getAddress(), e);
}
}
@Override
public void close() {
super.close();
ServletManager.getInstance().removeServletContext(url.getPort());
try {
tomcat.stop();
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}
}
- TomcatHttpServer的创建过程,整个过程包含了start()操作。
public class JettyHttpServer extends AbstractHttpServer {
private static final Logger logger = LoggerFactory.getLogger(JettyHttpServer.class);
private Server server;
private URL url;
public JettyHttpServer(URL url, final HttpHandler handler) {
super(url, handler);
this.url = url;
Log.setLog(new StdErrLog());
Log.getLog().setDebugEnabled(false);
// DispatcherServlet.addHttpHandler()绑定处理的handler
DispatcherServlet.addHttpHandler(url.getParameter(Constants.BIND_PORT_KEY, url.getPort()), handler);
int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
QueuedThreadPool threadPool = new QueuedThreadPool();
threadPool.setDaemon(true);
threadPool.setMaxThreads(threads);
threadPool.setMinThreads(threads);
SelectChannelConnector connector = new SelectChannelConnector();
String bindIp = url.getParameter(Constants.BIND_IP_KEY, url.getHost());
if (!url.isAnyHost() && NetUtils.isValidLocalHost(bindIp)) {
connector.setHost(bindIp);
}
connector.setPort(url.getParameter(Constants.BIND_PORT_KEY, url.getPort()));
server = new Server();
server.setThreadPool(threadPool);
server.addConnector(connector);
ServletHandler servletHandler = new ServletHandler();
// 绑定DispatcherServlet到ServletHolder
ServletHolder servletHolder = servletHandler.addServletWithMapping(DispatcherServlet.class, "/*");
servletHolder.setInitOrder(2);
// 绑定ServletHolder到contex对象当中
Context context = new Context(server, "/", Context.SESSIONS);
context.setServletHandler(servletHandler);
ServletManager.getInstance().addServletContext(url.getParameter(Constants.BIND_PORT_KEY, url.getPort()), context.getServletContext());
try {
server.start();
} catch (Exception e) {
throw new IllegalStateException("Failed to start jetty server on " + url.getParameter(Constants.BIND_IP_KEY) + ":" + url.getParameter(Constants.BIND_PORT_KEY) + ", cause: "
+ e.getMessage(), e);
}
}
}
- JettyHttpServer的创建过程,整个过程包含了start()操作。
public class ServletHttpServer extends AbstractHttpServer {
public ServletHttpServer(URL url, HttpHandler handler) {
super(url, handler);
DispatcherServlet.addHttpHandler(url.getParameter(Constants.BIND_PORT_KEY, 8080), handler);
}
}
- ServletHttpServer的创建过程。
请求处理入门
- HttpServer内部创建server并执行start操作。
- HttpServer内部调用handler即RestHandler进行处理。
- RestHandler内部调用dispatcher.service()进行处理流程。
网友评论