美文网首页
并行处理框架parseq调研

并行处理框架parseq调研

作者: chenmingang | 来源:发表于2018-03-01 20:25 被阅读197次

    引入目的:为解决项目中存在的并且不好优化的n+1查询慢问题

    示例效果:

    查询一组20个数据,每个子查询需要耗时1秒,返回结果并行组件需要1061ms,串行需要20057ms。并行处理的速度取决于任务数、线程数和子查询中最慢的那个。

    package com.github.chenmingang.parseq;
    
    import com.linkedin.parseq.ParTask;
    import com.linkedin.parseq.Task;
    import com.linkedin.parseq.Tasks;
    
    import java.util.ArrayList;
    import java.util.List;
    
    public class OnlineSimulation {
        //    static EngineAgent engine = EngineFactory.defaultEngine();
        static EngineAgent engine = EngineFactory.getEngine(20, 1, 20);
    
        public static void main(String[] args) {
            long l1 = System.currentTimeMillis();
            List<Model> result1 = new OnlineSimulation().queryAsync();
            long l2 = System.currentTimeMillis();
            System.out.println("time1:" + (l2 - l1));
            //time1:1063
    
            long l3 = System.currentTimeMillis();
            List<Model> result2 = new OnlineSimulation().query();
            long l4 = System.currentTimeMillis();
            System.out.println("time2:" + (l4 - l3));
            //time2:20060
    
            engine.shutdown();
        }
    
        //n+1 查询
        public List<Model> query() {
            // 第一步查询
            List<Model> modelList = new ArrayList<>();
            for (int i = 1; i <= 20; i++) {
                modelList.add(new Model(i));
            }
            // 第二步查询
            for (Model model : modelList) {
                setName(model);
            }
            return modelList;
        }
    
        //n+1 查询
        public List<Model> queryAsync() {
    
            // 第一步查询
            List<Model> modelList = new ArrayList<>();
            for (int i = 1; i <= 20; i++) {
                modelList.add(new Model(i));
            }
            // 第二步查询
            List<Task<Model>> tasks = new ArrayList<>();
            for (Model model : modelList) {
                Task<Model> task = engine.task(() -> setName(model));
                tasks.add(task);
            }
            ParTask<Model> parTask = Tasks.par(tasks);
            engine.run(parTask);
            try {
                parTask.await();
                List<Model> successful = parTask.getSuccessful();
                Throwable error = parTask.getError();
                if (error != null) {
                    error.printStackTrace();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return modelList;
        }
    
        /**
         * 执行子查询并合并数据的模拟
         *
         * @param model
         * @return
         */
        private Model setName(Model model) {
            model.setName("name-" + model.getId());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
            }
    //        throw new RuntimeException("");
            return model;
        }
    
        class Model {
            private Integer id;
            private String name;
    
            public Model(Integer id) {
                this.id = id;
            }
    
            public Integer getId() {
                return id;
            }
    
            public void setId(Integer id) {
                this.id = id;
            }
    
            public String getName() {
                return name;
            }
    
            public void setName(String name) {
                this.name = name;
            }
        }
    }
    
    
    

    根据这种列表式的查询场景对parseq做的封装

    package com.github.chenmingang.parseq;
    
    import com.google.common.util.concurrent.ThreadFactoryBuilder;
    import com.linkedin.parseq.EngineBuilder;
    import com.linkedin.parseq.batching.BatchingSupport;
    
    import java.util.concurrent.*;
    
    public class EngineFactory {
    
        private static EngineFactory INSTANCE = new EngineFactory();
        private final EngineAgent defaultEngine;
    
        private EngineFactory() {
            int numCores = Runtime.getRuntime().availableProcessors();
            defaultEngine = getEngine(numCores + 1, 1, numCores + 1);
        }
    
        public static EngineAgent defaultEngine() {
            return INSTANCE.defaultEngine;
        }
    
        public static EngineAgent getEngine(int poolSize, int scheduleSize, int queueNum) {
            ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(scheduleSize);
            ThreadFactory threadFactory = new ThreadFactoryBuilder().build();
            ThreadPoolExecutor executors = new ThreadPoolExecutor(poolSize, poolSize,
                    0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(queueNum), threadFactory,
                    (r, executor) -> {
                        try {
                            executor.getQueue().put(r);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    });
    
            final EngineBuilder builder = new EngineBuilder().setTaskExecutor(executors).setTimerScheduler(scheduler);
            final BatchingSupport batchingSupport = new BatchingSupport();
    
            builder.setPlanDeactivationListener(batchingSupport);
            return new EngineAgent(builder.build(), executors, scheduler);
        }
    
    }
    
    package com.github.chenmingang.parseq;
    
    import com.linkedin.parseq.Engine;
    import com.linkedin.parseq.Task;
    import com.linkedin.parseq.promise.Promises;
    import com.linkedin.parseq.promise.SettablePromise;
    
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.function.Supplier;
    
    public class EngineAgent {
    
        private Engine engine;
        private ThreadPoolExecutor executors;
        private ScheduledExecutorService scheduler;
    
        public EngineAgent(Engine engine, ThreadPoolExecutor executors,ScheduledExecutorService scheduler) {
            this.engine = engine;
            this.executors = executors;
            this.scheduler = scheduler;
        }
    
    
        public <T> SettablePromise<T> async(Supplier<T> supplier) {
            final SettablePromise<T> promise = Promises.settable();
            getExecutors().execute(() -> {
                try {
                    promise.done(supplier.get());
                } catch (Exception e) {
                    promise.fail(e);
                }
            });
            return promise;
        }
    
        public <T> Task<T> task(Supplier<T> supplier) {
            return Task.async(() -> async(supplier));
        }
    
        public void run(final Task<?> task) {
            engine.run(task);
        }
    
        public void shutdown() {
            engine.shutdown();
            executors.shutdown();
        }
    
        public ThreadPoolExecutor getExecutors() {
            return executors;
        }
    
        public ScheduledExecutorService getScheduler() {
            return scheduler;
        }
    }
    
    

    依赖版本

            <dependency>
                <groupId>com.linkedin.parseq</groupId>
                <artifactId>parseq</artifactId>
                <version>2.6.5</version>
            </dependency>
            <dependency>
                <groupId>com.linkedin.parseq</groupId>
                <artifactId>parseq-batching</artifactId>
                <version>2.6.5</version>
            </dependency>
    

    相关文章

      网友评论

          本文标题:并行处理框架parseq调研

          本文链接:https://www.haomeiwen.com/subject/xlpnxftx.html