美文网首页
Servlet3之NIO线程池隔离

Servlet3之NIO线程池隔离

作者: 离别刀 | 来源:发表于2018-05-26 19:35 被阅读0次

    线程隔离主要是针对业务中不同业务场景,按照权重区分使用不同的线程池,以达到某一个业务出现问题,不会将故障扩散到其他的业务线程池,从而达到保证主要业务高可用。
    本案例主要讲解基于servlet3的线程隔离术。
    首先我们回忆一下在tomcat6,tomcat6只支持BIO,它的处理流程如下:
    1).tomcat负责接收servletRequest请求
    2).将接收的请求分配给servlet处理业务
    3).处理完请求之后,通过servletResponse写会数据
    上面这三步都是在一个线程里面完成的,也就是同步进行。
    如下图:


    t6.png

    tomcat7之后版本引入了servlet3,它基于NIO能处理更大的并发数。
    我们可以将整个请求改造成如下步骤:
    1).tomcat单线程进行请求解析
    2).解析完之后将task放入队列(可以根据不同业务类型放入不同的队列)
    3).每个队列指定相应业务线程池对task进行处理
    这样改造以后就可以把业务按照重要性发送到不同线程池,两个线程池分开独立配置,互不干扰。当非核心的业务出现问题之后,不会影响核心的业务。另外由于此线程池是有我们创建的,我们可以对该线程池进行监控,处理,灵活了很多。
    如下图:


    t7.png

    下面是实现代码:

    接口层调用

    @RestController
    @RequestMapping("/app")
    public class NIOCtrl {
        @Autowired
        private LocalNewsAsyncContext localNewsAsyncContext;
        @Autowired
        private NewsService newsService;
    
        @RequestMapping("/news")
        public void getNews(HttpServletRequest request,@RequestParam(value = "type",required = false) String type){
            if("1".equals(type)){
                localNewsAsyncContext.submit(request, () -> newsService.getNews());
                return;
            }
            localNewsAsyncContext.submit(request, () -> newsService.getNewsMap());
        }
    }
    

    将请求丢进指定线程池

    @Service
    public class LocalNewsAsyncContext {
        private final static Long timeOutSeconds= 5L;
        @Autowired
        private CustomAsyncListener asyncListener;
        @Autowired
        private ThreadPoolExecutor executor;
    
        public void submit(final HttpServletRequest request,final Callable<Object> task){
            final String uri= request.getRequestURI();
            final Map<String,String[]> params= request.getParameterMap();
            //开启异步上下文
            final AsyncContext asyncContext= request.startAsync();
            asyncContext.getRequest().setAttribute(Constant.URI,uri);
            asyncContext.getRequest().setAttribute(Constant.PARAMS, params);
            asyncContext.setTimeout(timeOutSeconds * 1000);
            if(asyncContext!=null){
                asyncContext.addListener(asyncListener);
            }
            executor.submit(new CustomCallable(asyncContext, task));
    
        }
    }
    

    自定义线程处理

    public class CustomCallable implements Callable{
        private static final Logger LOG = LoggerFactory.getLogger(CustomCallable.class);
    
        public AsyncContext asyncContext;
        private Callable<Object> task;
        private String uri;
        private  Map<String,String[]> params;
    
        public CustomCallable(AsyncContext asyncContext, Callable<Object> task){
            this.asyncContext= asyncContext;
            this.task= task;
            this.uri= (String) asyncContext.getRequest().getAttribute(Constant.URI);
            this.params= (Map<String, String[]>) asyncContext.getRequest().getAttribute(Constant.PARAMS);
        }
        @Override public Object call() throws Exception {
            Object o= task.call();
            if(o==null){
                callback(asyncContext,o);
            }else if(o instanceof String){
                callback(asyncContext, o);
            }else if(o instanceof CompletableFuture){
                CompletableFuture<Object> future= (CompletableFuture<Object>) o;
                future.thenAccept(o1 -> callback(asyncContext, o1))
                        .exceptionally(throwable -> {
                            callback(asyncContext,"");
                            return null;
                        });
            }else {
                callback(asyncContext, o);
            }
            return null;
        }
    
        private void callback(AsyncContext asyncContext,Object result){
            HttpServletResponse response= (HttpServletResponse) asyncContext.getResponse();
            try{
                if(result instanceof String){
                    write(response, (String) result);
                }else {
                    write(response, JSON.toJSONString(result));
                }
            }catch (Exception e){
                response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
                e.printStackTrace();
                try {
                    LOG.error("get info error for uri:{}, params:{}",uri,JSON.toJSONString(params),e);
                }catch (Exception e1){}
            }finally {
                asyncContext.complete();
            }
        }
    
        private void write(HttpServletResponse response,String result) throws IOException {
            response.getOutputStream().write(result.getBytes());
        }
    }
    
    

    定义业务线程池

    @Configuration
    public class LocalNewsPoolConfig {
        private final static Logger LOG= LoggerFactory.getLogger(LocalNewsPoolConfig.class);
    
        @Bean
        public ThreadPoolExecutor init(){
            int corePoolSize= 10;
            int maximumPoolSize= 100;
            int queueCapacity= 200;
            LinkedBlockingDeque<Runnable> queue= new LinkedBlockingDeque<>(queueCapacity);
            ThreadPoolExecutor executor= new ThreadPoolExecutor(corePoolSize,maximumPoolSize,60L, TimeUnit.SECONDS,queue);
            executor.allowCoreThreadTimeOut(true);
            executor.setRejectedExecutionHandler((r, executor1) -> {
                if(r instanceof CustomCallable){
                    CustomCallable call= (CustomCallable) r;
                    AsyncContext asyncContext= call.asyncContext;
                    if(asyncContext!=null){
                        handler(asyncContext);
                    }
                }
            });
            return executor;
        }
    
        private static void handler(AsyncContext asyncContext){
            try{
                ServletRequest req= asyncContext.getRequest();
                String uri= (String) req.getAttribute(Constant.URI);
                Map params= (Map) req.getAttribute(Constant.PARAMS);
                LOG.error("async req rejected. uri :{},params:{}",uri, JSON.toJSONString(params));
            }catch (Exception e){
                e.printStackTrace();
                try{
                    HttpServletResponse response= (HttpServletResponse) asyncContext.getResponse();
                    response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
                }catch (Exception e1){}
            }finally {
                asyncContext.complete();
            }
        }
        
    }
    

    定义异步请求监听

    @Component
    public class CustomAsyncListener implements AsyncListener {
        private Logger LOG= LoggerFactory.getLogger(CustomAsyncListener.class);
        @Override public void onComplete(AsyncEvent asyncEvent) throws IOException {
    
        }
    
        @Override public void onTimeout(AsyncEvent asyncEvent) throws IOException {
            AsyncContext asyncContext= asyncEvent.getAsyncContext();
            try{
                ServletRequest req= asyncContext.getRequest();
                String uri= (String) req.getAttribute(Constant.URI);
                Map params= (Map) req.getAttribute(Constant.PARAMS);
                LOG.error("async req timeOut. uri :{},params:{}",uri, JSON.toJSONString(params));
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                try{
                    HttpServletResponse response= (HttpServletResponse) asyncContext.getResponse();
                    response.setStatus(HttpServletResponse.SC_REQUEST_TIMEOUT);
                }catch (Exception e1){}
                asyncContext.complete();
            }
        }
    
        @Override public void onError(AsyncEvent asyncEvent) throws IOException {
            AsyncContext asyncContext= asyncEvent.getAsyncContext();
            try{
                ServletRequest req= asyncContext.getRequest();
                String uri= (String) req.getAttribute(Constant.URI);
                Map params= (Map) req.getAttribute(Constant.PARAMS);
                LOG.error("async req error. uri :{},params:{}",uri, JSON.toJSONString(params));
            }catch (Exception e){
                e.printStackTrace();
                try{
                    HttpServletResponse response= (HttpServletResponse) asyncContext.getResponse();
                    response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
                }catch (Exception e1){}
            }finally {
                asyncContext.complete();
            }
        }
    
        @Override public void onStartAsync(AsyncEvent asyncEvent) throws IOException {
    
        }
    }
    

    业务处理和常量类

    @Service
    public class NewsService {
        public String getNews(){
            return "servlet3 nio test.";
        }
        public StringBuilder getNewsMap(){
            return new StringBuilder("I do and i understand.");
        }
    }
    public class Constant {
        public static final String URI= "uri";
        public static final String PARAMS= "params";
    }
    

    相关文章

      网友评论

          本文标题:Servlet3之NIO线程池隔离

          本文链接:https://www.haomeiwen.com/subject/dmrmjftx.html