美文网首页java工具类
基于rxjava的集合并发

基于rxjava的集合并发

作者: 嗷大喵儿 | 来源:发表于2017-06-14 16:48 被阅读0次

简介

  • 本文主要是讲基于rxjava包装的一个针对集合做并发操作的工具类
  • rxjava文档地址

依赖

<dependency>
    <groupId>io.reactivex</groupId>
    <artifactId>rxjava</artifactId>
    <version></version>
</dependency>
<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version></version>
</dependency>

支持四类function

  • 传入数据List,返回处理之后的List(不保证顺序)
  • 传入数据List,返回处理之后的数据Map
  • 传入数据List,分组大小,返回处理之后的List(不保证顺序)
  • 传入数据List,分组大小,返回处理之后的数据Map

function接口

public interface List2List<I,R> {
    List<R> call(List<I> list);
}

public interface List2Map<I, K, V>{
    Map<K, V> call(List<I> list);
}

public interface Object2List<I, R> {
    List<R> call(I i);
}

public interface Object2Map<I, K, V> {
    Map<K, V> call(I i);
}

线程池及工厂方法

import rx.Observable;
import rx.Subscriber;
import rx.functions.Func0;
import rx.schedulers.Schedulers;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import static java.util.concurrent.TimeUnit.SECONDS;
/**
 * 功能描述:
 * <p>
 * </p>
 *
 * @author : yuanchao.he
 * @version 1.0 2016-03-22
 * @since mobile-oppkit-server 1.0
 */
public class ObservableHelper {

    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(coreSize, maxSize, 5, SECONDS,
            new LinkedBlockingQueue<Runnable>());

    private ObservableHelper(){}

    private static class ClassHolder{
        private static ObservableHelper observableHelper = new ObservableHelper();
    }

    public static ObservableHelper INSTANCE(){
        return ClassHolder.observableHelper;
    }

    public <T> Observable<T> createObservable(final Func0<T> func) {
        return Observable.create(new Observable.OnSubscribe<T>() {
            @Override
            public void call(Subscriber<? super T> subscriber) {
                try {
                    subscriber.onNext(func.call());
                    subscriber.onCompleted();
                } catch (Exception e) {
                    subscriber.onError(e);
                }
            }
        }).subscribeOn(Schedulers.from(executor)).cache();
    }
}

ObservableExecutor 并行处理器

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import rx.Observable;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.functions.Func2;
import java.util.Collections;
import java.util.List;
import java.util.Map;

/**
 * 功能描述:
 * <p>
 * </p>
 *
 * @author : yuanchao.he
 * @version 1.0 2016-03-22
 * @since mobile-oppkit-server 1.0
 */
public class ObservableExecutor {

    private ObservableHelper helper = ObservableHelper.INSTANCE();

    private static class ExecutorHolder {
        private static ObservableExecutor executor = new ObservableExecutor();
    }

    private ObservableExecutor(){}

    public static ObservableExecutor INSTANCE() {
        return ExecutorHolder.executor;
    }

    /**
     * 传入List, 对List中的每一项分配一条线程做处理,返回Map结构,自己需要实现Object2Map接口
     * @param input
     * @param func
     * @param <I>
     * @param <K>
     * @param <V>
     * @return
     */
    public <I, K, V> Map<K, V> executeObservable(List<I> input, final Object2Map<I, K, V> func) {
        if(input==null || input.size()==0){
            return Collections.emptyMap();
        }
        Map<K, V> resultMap = Maps.newHashMap();
        Observable.from(input).flatMap(new Func1<I, Observable<Map<K, V>>>() {
            @Override
            public Observable<Map<K, V>> call(final I i) {
                return helper.createObservable(new Func0<Map<K, V>>() {
                    @Override
                    public Map<K, V> call() {
                        return func.call(i);
                    }
                }).onErrorReturn(new Func1<Throwable, Map<K, V>>() {
                    @Override
                    public Map<K, V> call(Throwable throwable) {
                        return Collections.emptyMap();
                    }
                });
            }
        }).reduce(resultMap, new Func2<Map<K, V>, Map<K, V>, Map<K, V>>() {
            @Override
            public Map<K, V> call(Map<K, V> acc, Map<K, V> map) {
                acc.putAll(map);
                return acc;
            }
        }).toBlocking().first();
        return resultMap;
    }

    /**
     * 传入List,针对List中每一项分配一条线程做处理,返回List结构,自己实现Object2List接口
     * @param input
     * @param func
     * @param <I>
     * @param <R>
     * @return
     */
    public <I, R> List<R> executeObservable(List<I> input, final Object2List<I, R> func) {
        if(input==null || input.size()==0){
            return Collections.emptyList();
        }
        List<R> result = Lists.newLinkedList();
        Observable.from(input).flatMap(new Func1<I, Observable<List<R>>>() {
            @Override
            public Observable<List<R>> call(final I i) {
                return helper.createObservable(new Func0<List<R>>() {
                    @Override
                    public List<R> call() {
                        return func.call(i);
                    }
                }).onErrorReturn(new Func1<Throwable, List<R>>() {
                    @Override
                    public List<R> call(Throwable throwable) {
                        return Collections.emptyList();
                    }
                });
            }
        }).reduce(result, new Func2<List<R>, List<R>, List<R>>() {
            @Override
            public List<R> call(List<R> acc, List<R> list) {
                acc.addAll(list);
                return acc;
            }
        }).toBlocking().first();
        return result;
    }

    /**
     * 传入List数据,分组大小,对List数据做分组以后,为每一组分配一个线程做处理,返回Map结构,自己实现List2Map接口
     * @param input
     * @param partitionSize
     * @param functionRMap
     * @param <I>
     * @param <K>
     * @param <V>
     * @return
     */
    public <I, K, V> Map<K, V> executeObservable(List<I> input, int partitionSize,
            final List2Map<I, K, V> functionRMap) {
        if(input==null || input.size()==0){
            return Collections.emptyMap();
        }
        if (partitionSize <= 0)
            partitionSize = 10;
        List<List<I>> lists = Lists.partition(input, partitionSize);
        Map<K, V> resultMap = Maps.newHashMap();
        Observable.from(lists).flatMap(new Func1<List<I>, Observable<Map<K, V>>>() {
            @Override
            public Observable<Map<K, V>> call(final List<I> list) {
                return helper.createObservable(new Func0<Map<K, V>>() {
                    @Override
                    public Map<K, V> call() {
                        return functionRMap.call(list);
                    }
                }).onErrorReturn(new Func1<Throwable, Map<K, V>>() {
                    @Override
                    public Map<K, V> call(Throwable throwable) {
                        return Collections.emptyMap();
                    }
                });
            }
        }).reduce(resultMap, new Func2<Map<K, V>, Map<K, V>, Map<K, V>>() {
            @Override
            public Map<K, V> call(Map<K, V> acc, Map<K, V> map) {
                acc.putAll(map);
                return acc;
            }
        }).toBlocking().first();
        return resultMap;
    }

    /**
     * 传入List数据,分组大小,为每一组数据分配一条线程处理,返回List结构数据,自己实现List2List接口
     * @param input
     * @param partitionSize
     * @param func
     * @param <I>
     * @param <R>
     * @return
     */
    public <I, R> List<R> executeObservable(List<I> input, int partitionSize,
            final List2List<I, R> func) {
        if(input==null || input.size()==0){
            return Collections.emptyList();
        }
        List<R> result = Lists.newLinkedList();
        if (partitionSize <= 0)
            partitionSize = 10;
        List<List<I>> partitions = Lists.partition(input, partitionSize);
        Observable.from(partitions).flatMap(new Func1<List<I>, Observable<List<R>>>() {
            @Override public Observable<List<R>> call(final List<I> is) {
                return helper.createObservable(new Func0<List<R>>() {
                    @Override public List<R> call() {
                        return func.call(is);
                    }
                }).onErrorReturn(new Func1<Throwable, List<R>>() {
                    @Override public List<R> call(Throwable throwable) {
                        return Collections.emptyList();
                    }
                });
            }
        }).reduce(result, new Func2<List<R>, List<R>, List<R>>() {
            @Override public List<R> call(List<R> acc, List<R> list) {
                acc.addAll(list);
                return acc;
            }
        }).toBlocking().first();
        return result;
    }
}

demos

import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

import java.util.List;
import java.util.Map;

/**
 * 功能描述:
 * <p>
 * </p>
 *
 * @author : yuanchao.he
 * @version 1.0 2016-03-23
 * @since mobile-oppkit-server 1.0
 */
public class Demo {
    private static ObservableExecutor observableExecutor = ObservableExecutor.INSTANCE();

    public static void main(String[] args) {

        List<Integer> numbers = Lists.newLinkedList();
        for(int i=500;i<=1000;i++){
            numbers.add(i);
        }

        Map<Integer, String> result1 = observableExecutor.executeObservable(numbers,
            new Object2Map<Integer, Integer, String>() {
                @Override
                public Map<Integer, String> call(Integer integer) {
                    Map<Integer, String> map = Maps.newHashMap();
                    map.put(integer, String.valueOf(integer*integer%1000));
                    return map;
                }
            });

        System.out.println(result1);
        /**
         * 将数字转换为字符串,每个线程处理一个数字转换,并以list的结构返回
         */
        List<String> result2 = observableExecutor.executeObservable(numbers,
            new Object2List<Integer, String>() {
                @Override
                public List<String> call(Integer integer) {
                    List<String> result = Lists.newLinkedList();
                    result.add(String.valueOf(integer*integer%1000));
                    return result;
                }
            });

        System.out.println(result2);

        /**
         * 将数字转换为字符串,对 numbers 分组,每组2个元素,每个线程处理一组,并以map结构返回
         */
        Map<Integer, String> result3 = observableExecutor.executeObservable(numbers, 2,
            new List2Map<Integer, Integer, String>() {
                @Override
                public Map<Integer, String> call(List<Integer> list) {
                    Map<Integer, String> map = Maps.newHashMap();
                    for (Integer integer : list) {
                        map.put(integer, String.valueOf(integer*integer%1000));
                    }
                    return map;
                }
            });
        System.out.println(result3);

        /**
         * 将数字转换为字符串 对 numbers 分组,每组2个元素,每个线程处理一组,并以list结构返回
         */
        List<String> result4 = observableExecutor.executeObservable(numbers, 2,
            new List2List<Integer, String>() {
                @Override
                public List<String> call(List<Integer> list) {
                    List<String> result = Lists.newLinkedList(Lists.transform(list,
                        new Function<Integer, String>() {
                            @Override
                            public String apply(Integer input) {
                                return String.valueOf(input*input%1000);
                            }
                        }));
                    return result;
                }
            });
        System.out.println(result4);
    }
}

相关文章

网友评论

    本文标题:基于rxjava的集合并发

    本文链接:https://www.haomeiwen.com/subject/jbxeqxtx.html