美文网首页
Java实现生产者/消费者模型实战应用

Java实现生产者/消费者模型实战应用

作者: 大杯冰摩卡 | 来源:发表于2020-08-31 10:06 被阅读0次

    场景: 我们需要创建一个job,这个job是异步执行的,且任务有多个状态,每个状态需要不同的处理。

    实现: 在服务里创建一个生产消费模型,job在创建后,设置初始状态,并放在队列里由消费者消费,处理业务逻辑。消费成功后,更改状态再次放入队列中,等待下一次消费。

    实现一: wait && notify

    最朴素也是最简单的方案:wait && notify机制 。
    队列中有数据就阻塞生产者线程,消费者消费后就唤醒生产者。反之,队列中没有数据就阻塞消费者线程,生产者添加数据后唤醒消费者线程。wait && notify机制虽然足够简单,但是不够灵活,并发效率也不佳,不能满足实际场景需求。

     // 存储生产者产生的数据
        static List<String> list = new ArrayList<>();
    
        public static void main(String[] args) {
    
            new Thread(() -> {
                while (true) {
                    synchronized (list) {
                        // 判断 list 中是否有数据,如果有数据的话,就进入等待状态,等数据消费完
                        if (list.size() != 0) {
                            try {
                                list.wait();
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
    
                        // list 中没有数据时,产生数据添加到 list 中
                        try {
                            Thread.sleep(5000);
                            list.add(UUID.randomUUID().toString());
                            list.notify();
                            System.out.println(Thread.currentThread().getName() + list);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }, "生产者线程 A ").start();
    
    
            new Thread(() -> {
                while (true) {
                    synchronized (list) {
                        // 如果 list 中没有数据,则进入等待状态,等收到有数据通知后再继续运行
                        if (list.size() == 0) {
                            try {
                                list.wait();
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
    
                        // 有数据时,读取数据
                        System.out.println(Thread.currentThread().getName() + list);
                        list.notify();
                        // 读取完毕,将当前这条 UUID 数据进行清除
                        list.clear();
                    }
                }
            }, "消费者线程 B ").start();
    
        }
    

    实现二: BlockingQueue

    BlockingQueue的写法最简单。核心思想是,把并发和容量控制封装在缓冲区中

     public static void main(String[] args) throws InterruptedException {
            LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();
            for(int i =1;i<=10;i++){
                queue.put("第"+i+"条消息");
            }
            System.out.println("当前队列还有"+queue.size()+"消息");
    
    
            new Thread(()->{
                try {
                    System.out.println("睡眠中");
                    Thread.sleep(10000);
                    for(int i =1;i<=10;i++){
                        queue.put("新的消息:第"+i+"条消息");
                    }
    
                } catch (InterruptedException e) {
    
                }
            }).start();
    
            int nThreads = 1 ;
            ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
            for(int i = 0 ;i<nThreads;i++){
                executorService.submit(()->{
                    while (true){
                        System.out.println("消费者");
                        String poll = null;
                        try {
                            poll = queue.take();
    
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println("消费者:"+poll==null?"":poll);
                        System.out.println(Thread.currentThread().getName());
                    }
                });
            }
    
    
        }
    

    demo中通过put方法生产数据,take方法消费数据。这个两个方法都有阻塞线程的效果,我们来看下:

    2.1 put()

        public void put(E e) throws InterruptedException {
            if (e == null) throw new NullPointerException();
            int c = -1;
            Node<E> node = new Node<E>(e);
            final ReentrantLock putLock = this.putLock;
            final AtomicInteger count = this.count;
            
            // 以可中断的形式获取put锁
            putLock.lockInterruptibly();
            try {
                // 与offer(e, timeout, unit)相比,采用了无限等待的方式
                while (count.get() == capacity) {
                    // 当执行了移除元素操作后,会通过signal操作来唤醒notFull队列中的一个线程
                    notFull.await();
                }
                enqueue(node);
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
            } finally {
                putLock.unlock();
            }
            if (c == 0)
                signalNotEmpty();
        }
    
    

    2.2 take()

    public E take() throws InterruptedException {
            E x;
            int c = -1;
            final AtomicInteger count = this.count;
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lockInterruptibly();
            try {
                while (count.get() == 0) {
                    notEmpty.await();
                }
                // 出队,并自减
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)
                // 只要队列还有元素,就唤醒一个take操作
                    notEmpty.signal();
            } finally {
                takeLock.unlock();
            }
            if (c == capacity)
            // 如果在队列满的情况下移除一个元素,会唤醒一个put操作
                signalNotFull();
            return x;
        }
    

    这里能看到take其实就是put的一个翻版。这里也不难发现wait && notify机制实际上也是在模拟实现一个BlockingQueue。使用BlockingQueue不枉为最佳选择。

    实战:

    利用@PostConstruct在服务启动时加载生产者/消费者线程。exportRunnerListener循环执行asyncJobRunner()消费队列,使用take()方法阻塞线程,避免资源浪费。执行消费者之前,我们需要在db里面捞一下未完成的任务,避免因服务重启造成的任务丢失。

    private final ExecutorService executorService;
    private static final LinkedBlockingQueue<String> jobQueue = new LinkedBlockingQueue<>();
    ...
    @PostConstruct
        public void NeoExportRunner() {
            int nThreads = 3;
            //每次重启找出未完成的job
            exportJobRepository.findByStatusIn(Arrays.asList(new String[]{PENDING, LOADED, UPLOADED, NOTIFIED}))
                    .orElse(new ArrayList<>()).forEach(
                    //加入队列
                    o -> jobQueue.add(o.getId())
            );
            this.executorService = Executors.newFixedThreadPool(nThreads);
    
            for (int i = 0; i < nThreads; i++) {
                executorService.execute(() -> {
                    exportRunnerListener();
                });
            }
    
        }
        
        public void exportRunnerListener() {
            while (true) {
                log.info("asyncJobRunner is working... {}", Thread.currentThread().getName());
                try {
                    //消费者
                    asyncJobRunner();
                } catch (Exception e) {
                    log.error("asyncJobRunner is error...{}",e);
                }
            }
        }
    

    asyncJobRunner()是一个任务调度器。拿到队列里的消息,根据状态来处理不同的业务逻辑。每个job执行完后,变更任务状态,重新写回队列,下次消费时进行下一个状态的处理,从而实现状态扭转。

        public static void asyncJobRunner() throws InterruptedException {
            
            //消费jobQueue中的数据
            Optional.ofNullable(jobQueue.take()).flatMap(id -> exportJobRepository.findById(id)).ifPresent(job -> {
                switch (job.getStatus()) {
                    case PENDING:
                        loadData(job);
                        log.info("Job had been loaded.. job -> " + job.toString());
                        break;
    
                    case LOADED:
                        upload(job);
                        log.info("Job had been uploaded.. job -> " + job.toString());
                        break;
    
                    case UPLOADED:
                        notify(job);
                        log.info("Job had been notify.. job -> " + job.toString());
                        break;
    
                    case FAILED:
                        retry(job);
                        log.info("Job had been failed.. will be retry. .job -> " + job.toString());
                        break;
    
                    case NOTIFIED:
                        finish(job);
                        log.info("Job had been finished. . job -> " + job.toString());
                        break;
    
                    case FINISHED:
                        log.warn("Finished job should not appear in job queue, check for logical error. job -> " + job.toString());
                        break;
    
                    case CANCELED:
                        log.info("Job had been canceled. Nothing to do here. job -> " + job.toString());
                        break;
    
                    default:
                        log.error("Unrecognized job status. job -> " + job.toString());
    
                }
            });
        }
      
    

    这里说一下retry机制,当任务在某个状态发生异常,并未执行成功,我们来设置一个retry机制在任务FAILED的时候进行补偿。某个状态异常时,将当前状态保存在LastStatus中并设置当前状态为FAILED,同时记录retry的次数。这样以来下次我们拿到这个job的状态是FAILED,在调用retry方法时把失败时的状态在写回去丢到队列里,下一次就可以继续执行了。

        public static void retry(ExportJob job) {
        //判断重试次数是否<最大重试次数
            if (job.getRetry() < maximumRetry) {
                job.setStatus(job.getLastStatus());
                job.setRetry(job.getRetry() + 1);
                jobQueue.add(job.getId());
            } else {
                //save db -> error status
                log.error("Max retry exceeds. job -> " + job.toString());
            }
        }
    

    addJob()cancelJob() 提供给我们的业务代码调用,用来创建任务和取消任务,这里的取消任务做不到实时性,具体代码需要根据实际业务场景进行调整。

    public static String addJob(String id, String type, String channel) {
            ExportJob exportJob = ExportJob.builder()
                    .channel(channel)
                    .jobId(id)
                    //初始化任务
                    .status(PENDING)
                    .createAt(sdf.format(System.currentTimeMillis()))
                    .type(type)
                    .id(UUID.randomUUID().toString().replaceAll("-", ""))
                    .retry(0)
                    .build();
            ExportJob saved = exportJobRepository.saveAndFlush(exportJob);
            jobQueue.add(saved.getId());
            return saved.getId();
        }
    
    public static String cancelJob(String id) {
            canceled.add(id);
            exportJobRepository.findById(id).ifPresent(job -> {
                //取消任务
                job.setStatus(CANCELED);
                exportJobRepository.saveAndFlush(job);
            });
            return id;
        }
    

    相关文章

      网友评论

          本文标题:Java实现生产者/消费者模型实战应用

          本文链接:https://www.haomeiwen.com/subject/gfwlsktx.html