美文网首页java工具类
基于rxjava的集合并发

基于rxjava的集合并发

作者: 嗷大喵儿 | 来源:发表于2017-06-14 16:48 被阅读0次

    简介

    • 本文主要是讲基于rxjava包装的一个针对集合做并发操作的工具类
    • rxjava文档地址

    依赖

    <dependency>
        <groupId>io.reactivex</groupId>
        <artifactId>rxjava</artifactId>
        <version></version>
    </dependency>
    <dependency>
        <groupId>com.google.guava</groupId>
        <artifactId>guava</artifactId>
        <version></version>
    </dependency>
    

    支持四类function

    • 传入数据List,返回处理之后的List(不保证顺序)
    • 传入数据List,返回处理之后的数据Map
    • 传入数据List,分组大小,返回处理之后的List(不保证顺序)
    • 传入数据List,分组大小,返回处理之后的数据Map

    function接口

    public interface List2List<I,R> {
        List<R> call(List<I> list);
    }
    
    public interface List2Map<I, K, V>{
        Map<K, V> call(List<I> list);
    }
    
    public interface Object2List<I, R> {
        List<R> call(I i);
    }
    
    public interface Object2Map<I, K, V> {
        Map<K, V> call(I i);
    }
    

    线程池及工厂方法

    import rx.Observable;
    import rx.Subscriber;
    import rx.functions.Func0;
    import rx.schedulers.Schedulers;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import static java.util.concurrent.TimeUnit.SECONDS;
    /**
     * 功能描述:
     * <p>
     * </p>
     *
     * @author : yuanchao.he
     * @version 1.0 2016-03-22
     * @since mobile-oppkit-server 1.0
     */
    public class ObservableHelper {
    
        private static ThreadPoolExecutor executor = new ThreadPoolExecutor(coreSize, maxSize, 5, SECONDS,
                new LinkedBlockingQueue<Runnable>());
    
        private ObservableHelper(){}
    
        private static class ClassHolder{
            private static ObservableHelper observableHelper = new ObservableHelper();
        }
    
        public static ObservableHelper INSTANCE(){
            return ClassHolder.observableHelper;
        }
    
        public <T> Observable<T> createObservable(final Func0<T> func) {
            return Observable.create(new Observable.OnSubscribe<T>() {
                @Override
                public void call(Subscriber<? super T> subscriber) {
                    try {
                        subscriber.onNext(func.call());
                        subscriber.onCompleted();
                    } catch (Exception e) {
                        subscriber.onError(e);
                    }
                }
            }).subscribeOn(Schedulers.from(executor)).cache();
        }
    }
    

    ObservableExecutor 并行处理器

    import com.google.common.collect.Lists;
    import com.google.common.collect.Maps;
    import rx.Observable;
    import rx.functions.Func0;
    import rx.functions.Func1;
    import rx.functions.Func2;
    import java.util.Collections;
    import java.util.List;
    import java.util.Map;
    
    /**
     * 功能描述:
     * <p>
     * </p>
     *
     * @author : yuanchao.he
     * @version 1.0 2016-03-22
     * @since mobile-oppkit-server 1.0
     */
    public class ObservableExecutor {
    
        private ObservableHelper helper = ObservableHelper.INSTANCE();
    
        private static class ExecutorHolder {
            private static ObservableExecutor executor = new ObservableExecutor();
        }
    
        private ObservableExecutor(){}
    
        public static ObservableExecutor INSTANCE() {
            return ExecutorHolder.executor;
        }
    
        /**
         * 传入List, 对List中的每一项分配一条线程做处理,返回Map结构,自己需要实现Object2Map接口
         * @param input
         * @param func
         * @param <I>
         * @param <K>
         * @param <V>
         * @return
         */
        public <I, K, V> Map<K, V> executeObservable(List<I> input, final Object2Map<I, K, V> func) {
            if(input==null || input.size()==0){
                return Collections.emptyMap();
            }
            Map<K, V> resultMap = Maps.newHashMap();
            Observable.from(input).flatMap(new Func1<I, Observable<Map<K, V>>>() {
                @Override
                public Observable<Map<K, V>> call(final I i) {
                    return helper.createObservable(new Func0<Map<K, V>>() {
                        @Override
                        public Map<K, V> call() {
                            return func.call(i);
                        }
                    }).onErrorReturn(new Func1<Throwable, Map<K, V>>() {
                        @Override
                        public Map<K, V> call(Throwable throwable) {
                            return Collections.emptyMap();
                        }
                    });
                }
            }).reduce(resultMap, new Func2<Map<K, V>, Map<K, V>, Map<K, V>>() {
                @Override
                public Map<K, V> call(Map<K, V> acc, Map<K, V> map) {
                    acc.putAll(map);
                    return acc;
                }
            }).toBlocking().first();
            return resultMap;
        }
    
        /**
         * 传入List,针对List中每一项分配一条线程做处理,返回List结构,自己实现Object2List接口
         * @param input
         * @param func
         * @param <I>
         * @param <R>
         * @return
         */
        public <I, R> List<R> executeObservable(List<I> input, final Object2List<I, R> func) {
            if(input==null || input.size()==0){
                return Collections.emptyList();
            }
            List<R> result = Lists.newLinkedList();
            Observable.from(input).flatMap(new Func1<I, Observable<List<R>>>() {
                @Override
                public Observable<List<R>> call(final I i) {
                    return helper.createObservable(new Func0<List<R>>() {
                        @Override
                        public List<R> call() {
                            return func.call(i);
                        }
                    }).onErrorReturn(new Func1<Throwable, List<R>>() {
                        @Override
                        public List<R> call(Throwable throwable) {
                            return Collections.emptyList();
                        }
                    });
                }
            }).reduce(result, new Func2<List<R>, List<R>, List<R>>() {
                @Override
                public List<R> call(List<R> acc, List<R> list) {
                    acc.addAll(list);
                    return acc;
                }
            }).toBlocking().first();
            return result;
        }
    
        /**
         * 传入List数据,分组大小,对List数据做分组以后,为每一组分配一个线程做处理,返回Map结构,自己实现List2Map接口
         * @param input
         * @param partitionSize
         * @param functionRMap
         * @param <I>
         * @param <K>
         * @param <V>
         * @return
         */
        public <I, K, V> Map<K, V> executeObservable(List<I> input, int partitionSize,
                final List2Map<I, K, V> functionRMap) {
            if(input==null || input.size()==0){
                return Collections.emptyMap();
            }
            if (partitionSize <= 0)
                partitionSize = 10;
            List<List<I>> lists = Lists.partition(input, partitionSize);
            Map<K, V> resultMap = Maps.newHashMap();
            Observable.from(lists).flatMap(new Func1<List<I>, Observable<Map<K, V>>>() {
                @Override
                public Observable<Map<K, V>> call(final List<I> list) {
                    return helper.createObservable(new Func0<Map<K, V>>() {
                        @Override
                        public Map<K, V> call() {
                            return functionRMap.call(list);
                        }
                    }).onErrorReturn(new Func1<Throwable, Map<K, V>>() {
                        @Override
                        public Map<K, V> call(Throwable throwable) {
                            return Collections.emptyMap();
                        }
                    });
                }
            }).reduce(resultMap, new Func2<Map<K, V>, Map<K, V>, Map<K, V>>() {
                @Override
                public Map<K, V> call(Map<K, V> acc, Map<K, V> map) {
                    acc.putAll(map);
                    return acc;
                }
            }).toBlocking().first();
            return resultMap;
        }
    
        /**
         * 传入List数据,分组大小,为每一组数据分配一条线程处理,返回List结构数据,自己实现List2List接口
         * @param input
         * @param partitionSize
         * @param func
         * @param <I>
         * @param <R>
         * @return
         */
        public <I, R> List<R> executeObservable(List<I> input, int partitionSize,
                final List2List<I, R> func) {
            if(input==null || input.size()==0){
                return Collections.emptyList();
            }
            List<R> result = Lists.newLinkedList();
            if (partitionSize <= 0)
                partitionSize = 10;
            List<List<I>> partitions = Lists.partition(input, partitionSize);
            Observable.from(partitions).flatMap(new Func1<List<I>, Observable<List<R>>>() {
                @Override public Observable<List<R>> call(final List<I> is) {
                    return helper.createObservable(new Func0<List<R>>() {
                        @Override public List<R> call() {
                            return func.call(is);
                        }
                    }).onErrorReturn(new Func1<Throwable, List<R>>() {
                        @Override public List<R> call(Throwable throwable) {
                            return Collections.emptyList();
                        }
                    });
                }
            }).reduce(result, new Func2<List<R>, List<R>, List<R>>() {
                @Override public List<R> call(List<R> acc, List<R> list) {
                    acc.addAll(list);
                    return acc;
                }
            }).toBlocking().first();
            return result;
        }
    }
    

    demos

    import com.google.common.base.Function;
    import com.google.common.collect.Lists;
    import com.google.common.collect.Maps;
    
    import java.util.List;
    import java.util.Map;
    
    /**
     * 功能描述:
     * <p>
     * </p>
     *
     * @author : yuanchao.he
     * @version 1.0 2016-03-23
     * @since mobile-oppkit-server 1.0
     */
    public class Demo {
        private static ObservableExecutor observableExecutor = ObservableExecutor.INSTANCE();
    
        public static void main(String[] args) {
    
            List<Integer> numbers = Lists.newLinkedList();
            for(int i=500;i<=1000;i++){
                numbers.add(i);
            }
    
            Map<Integer, String> result1 = observableExecutor.executeObservable(numbers,
                new Object2Map<Integer, Integer, String>() {
                    @Override
                    public Map<Integer, String> call(Integer integer) {
                        Map<Integer, String> map = Maps.newHashMap();
                        map.put(integer, String.valueOf(integer*integer%1000));
                        return map;
                    }
                });
    
            System.out.println(result1);
            /**
             * 将数字转换为字符串,每个线程处理一个数字转换,并以list的结构返回
             */
            List<String> result2 = observableExecutor.executeObservable(numbers,
                new Object2List<Integer, String>() {
                    @Override
                    public List<String> call(Integer integer) {
                        List<String> result = Lists.newLinkedList();
                        result.add(String.valueOf(integer*integer%1000));
                        return result;
                    }
                });
    
            System.out.println(result2);
    
            /**
             * 将数字转换为字符串,对 numbers 分组,每组2个元素,每个线程处理一组,并以map结构返回
             */
            Map<Integer, String> result3 = observableExecutor.executeObservable(numbers, 2,
                new List2Map<Integer, Integer, String>() {
                    @Override
                    public Map<Integer, String> call(List<Integer> list) {
                        Map<Integer, String> map = Maps.newHashMap();
                        for (Integer integer : list) {
                            map.put(integer, String.valueOf(integer*integer%1000));
                        }
                        return map;
                    }
                });
            System.out.println(result3);
    
            /**
             * 将数字转换为字符串 对 numbers 分组,每组2个元素,每个线程处理一组,并以list结构返回
             */
            List<String> result4 = observableExecutor.executeObservable(numbers, 2,
                new List2List<Integer, String>() {
                    @Override
                    public List<String> call(List<Integer> list) {
                        List<String> result = Lists.newLinkedList(Lists.transform(list,
                            new Function<Integer, String>() {
                                @Override
                                public String apply(Integer input) {
                                    return String.valueOf(input*input%1000);
                                }
                            }));
                        return result;
                    }
                });
            System.out.println(result4);
        }
    }
    

    相关文章

      网友评论

        本文标题:基于rxjava的集合并发

        本文链接:https://www.haomeiwen.com/subject/jbxeqxtx.html