美文网首页微服务架构和实践
本地文件队列-异步隔离设计

本地文件队列-异步隔离设计

作者: 铁汤 | 来源:发表于2017-05-11 11:24 被阅读411次

常见的异步方式:

创建异步线程

每个新创建一个线程来执行异步任务,任务结束线程也终止。
线程的创建成本比较大,不建议使用。

使用Queue,producer/consumer方式

在内部创建一个Queue,worker线程直接将异步处理的任务放入queue,一个或多个异步线程从queue中消费并执行任务。

线程池

用线程池来替换每次创建线程,减少线程创建的成本,线程被复用,一次创建多处使用。

和使用Queue类似,也是通过BlockingQueue实现,但策略上更复杂,向线程池提交Callable&Runnable任务,由线程池调度执行。

参考:java.util.concurrent.ThreadPoolExecutor#execute

spring @Async注解

通过注解来来简化了异步编程,只需要在需要异步的方法上使用@Async注解即可。
其本质也是在线程池功能上扩展的,将异步执行方法封装为一个Callable,然后提交给线程池。

org.springframework.aop.interceptor.AsyncExecutionInterceptor:

public Object invoke(final MethodInvocation invocation) throws Throwable {
        Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
        Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
        final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);

        AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
        if (executor == null) {
            throw new IllegalStateException(
                    "No executor specified and no default executor set on AsyncExecutionInterceptor either");
        }

        Callable<Object> task = new Callable<Object>() {
            @Override
            public Object call() throws Exception {
                try {
                    Object result = invocation.proceed();
                    if (result instanceof Future) {
                        return ((Future<?>) result).get();
                    }
                }
                catch (ExecutionException ex) {
                    handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
                }
                catch (Throwable ex) {
                    handleError(ex, userDeclaredMethod, invocation.getArguments());
                }
                return null;
            }
        };

        return doSubmit(task, executor, invocation.getMethod().getReturnType());
    }

详细参考:http://docs.spring.io/spring/docs/current/spring-framework-reference/htmlsingle/#scheduling-annotation-support-async

背景和场景

产生的背景

在项目中使用了@Async来执行异步任务,但在线上运行时出现了一次OOM的故障。通过分析发现是,线程池队列设置的比较大,当时的JVM内存给的也比较少(2048M),异步任务方法参数中传了大量的数据,任务执行被后端数据库阻塞(后端数据库变慢),最后导致缓存了大量的数据被放到线程池队列。其实JVM内存配置合适,线程池队列数合适,并配置合适的RejectedExecutionHandler策略。

产生这个组件,1) 旨在替换内存队列的异步方式 2) 用来方便扩展集成分布式MQ

异步隔离

除了上面背景和场景,开发这个组件的另一个初衷就是有效异步隔离和作为一个降级备份方案。
也是主要实现了文件队列方式的一个原因。

当我们使用分布式MQ时,难免分布式MQ宕机或者其他网络等原因导致不能生产消息,或者阻塞影响到本身的业务,出现这种情况时可以降级到本地文件队列。

本地文件队列的优点是速度快,只要文件系统不出问题可以认为不会被阻塞。缺点是本地文件队列生产的消息必须自己来消费,出现故障时消息消费会延迟,文件系统的损坏也会导致消息丢失。主要看使用的姿势,更看重哪一方面了。

基本架构设计思路

采用producer/consumer生产消费设计模式。

参考了@Async思路,定义一个注解@AsyncExecutable, 使用Spring拦截器拦截注解了@AsyncExecutable的方法,可以使用AOP或者BeanPostProcessor来应用拦截器。

producer

拦截器拦截到@AsyncExecutable方法后,将该方法所有的参数和方法信息作为Message,并序列化Message,序列化采用Kryo或者Json,将序列化后的信息放入队列。

class Message {

     String beanName;
     String klassName;
     String methodName;
     Class<?>[] argTypes;
     Object[] args;
     boolean hasTransactional = true;
     
}

consumer

有1个调度主线程和worker线程组成,主线程负责从队列中拉取消息,并分发到worker线程,worker线程采用线程池,使用了spring提供的TaskExecutor。

worker线程反序列化消息为Message对象,并根据Message中的方法信息在spring ApplicationContext中查找到spring 管理的bean,并通过反射来调用。

队列

队列抽象了一个BlockableQueue, 通过BlockableQueue具体实现来扩展,可以是内存,文件,或分布式MQ。

 public interface BlockableQueue<T> {

    String DefaultQueueName = "fileQueue";

    /**
     * push一个消息到队列
     *
     * @param t
     * @return
     */
    boolean offer(T t);

    /**
     * 从队列pop一个消息,如果队列中无可用消息,则阻塞
     *
     * @return
     * @throws InterruptedException
     */
    T take() throws InterruptedException;

    /**
     * 从队列pop一个消息,如果队列中无可用消息,则返回null
     *
     * @return
     */
    T poll();

    /**
     * 队列中消息数量
     *
     * @return
     */
    int size();
}

通用的默认实现:

 
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class DefaultBlockableQueue implements BlockableQueue<byte[]> {
    final Lock lock = new ReentrantLock();
    final Condition notEmpty = lock.newCondition();
    private Queue<byte[]> queue = null;


    public FileBlockableQueue(Queue<byte[]> queue) {
        this.queue = queue;
    }

    @Override
    public boolean offer(byte[] bytes) {
        lock.lock();
        try {
            boolean v = queue.offer(bytes);
            notEmpty.signal();
            return v;
        } finally {
            lock.unlock();
        }
    }


    @Override
    public byte[] take() throws InterruptedException {
        lock.lock();
        try {
            while (queue.size() == 0) {
                notEmpty.await();
            }
            byte[] bytes = queue.poll();
            return bytes;
        } finally {
            lock.unlock();
        }

    }

    @Override
    public byte[] poll() {
        return queue.poll();
    }

    @Override
    public int size() {
        return queue.size();
    }
}

本文实现了一个文件队列,采用去哪儿文件队列实现,这是一个fork:https://github.com/tietang/fqueue

对编程模型来说不用关心异步细节,只需要在需要异步的方法上注解@AsyncExecutable即可。

相关文章

  • 本地文件队列-异步隔离设计

    常见的异步方式: 创建异步线程 每个新创建一个线程来执行异步任务,任务结束线程也终止。线程的创建成本比较大,不建议...

  • GCD基础总结一

    上代码~ 同步串行队列 同步并行队列 异步串行队列 异步并行队列 主队列同步 会卡住 主队列异步

  • GCD的几种创建方式及基本使用

    同步函数 同步函数+主队列 同步函数+串行队列 同步函数+并发队列 异步函数 异步函数+主队列 异步函数+串行队列...

  • GCD队列、同步异步

    GCD队列、同步异步 GCD队列、同步异步

  • 多线程GCD笔记

    同步函数 + 主队列 异步函数 + 主队列 同步函数 + 串行队列 异步函数 + 串行队列 同步函数 + 并发队列...

  • Tape源码解析

    Tape是一个队列集合类库,主要包含内存对象队列,文件对象队列和任务队列,特别是文件对象队列的设计,使用了Rand...

  • IOS多线程总结

    目录 简述 NSThread GCD操作与队列异步操作并行队列同步操作并行队列同步操作串行队列异步操作串行队列队列...

  • Ajaxcross origin request are onl

    chrome异步加载本地json文件报错:cross origin request are only suppor...

  • 多线程的运用

    同步串行队列 同步并发队列 异步串行队列 异步并发队列 队列组 栅栏 队列组和栅栏的组合 信号量 死锁主线程 分析...

  • GCD详解

    一 使用步骤 创建队列(串行队列或并发队列) 调用函数(同步或异步) 二 重要概念 1,同步和异步 同步或异步最大...

网友评论

    本文标题:本地文件队列-异步隔离设计

    本文链接:https://www.haomeiwen.com/subject/gzhotxtx.html