美文网首页
RxJava小试牛刀

RxJava小试牛刀

作者: 丑人林宗己 | 来源:发表于2019-03-23 11:44 被阅读0次

    好久没有写技术文章了,工作上了解到RxJava框架适合我们的某些业务场景,所以尝试着RxJava整合到我们的业务代码,以解决部分问题。

    很粗糙很狂野的整合进来,没有细致的琢磨框架,以观察者模式实现的框架配合以链式调用的代码风格,看起来确实稍微‘高大上’一些。

    业务场景

    当下主流的微服务架构体系中,各个微服务的业务之间相互独立,比如商品、订单、客户等等,它们即可独立提供服务,也可与其他服务协同,于是便涉及到业务聚合,比如前端展示一个商品详情页会涉及到商品信息,价格,评价等等的数据,如下。

    image.png

    比如用户进入商品详情页,可能会发生什么?用户需要看到商品的详细信息,包括价格,属性等等,同时还需要看到优惠券信息、以及相关的评价。(这里是假设一个请求需要处理这么多事)

    image.png
    // 伪代码如下
    GoodsBean goodsBean = goodsFeignService.queryGoods();
    List<CouponBean> couponBeans = couponFeignService.queryCoupon();
    List<EvaluationBean> evaluations = evaluationFeignService.queryEvaluation();
     // 真实的业务场景往往更加复杂
    return join.map(goodsBean, couponBeans, evaluations);
    

    这么写毫无疑问也是可以把业务跑起来,但是作为一名优秀的互联网底层搬砖工作者,除了完成必要的工作,尽可能希望把代码写的优雅一些,就像砌墙的时候砌地更光滑,同时还能稍微提高一些性能?

    解决方案

    RxJava可以很好地解决该业务诉求,利用异步并发提升一点性能,将应用中的无依赖操作转为异步并发处理。无依赖(或已完成依赖)操作比如:

    • 数据库读取
    • feign远程RPC调用
    • 第三方开放平台接口调用
    • ……
    
    public abstract class AbstractObserver implements Observer<JoinQueryParameter> {
    
        /**
         * 日志
         */
        public final static Logger logger = LoggerFactory.getLogger(AbstractObserver.class);
    
        private ResultMap resultMap;
        private JoinQueryParameter joinQueryParameter;
    
        @Override
        public void onSubscribe(Disposable disposable) {}
    
        @Override
        public void onNext(JoinQueryParameter joinQueryParameter) {
            resultMap = this.queryOriginalData(joinQueryParameter);
            update(joinQueryParameter);
        }
    
        @Override
        public void onError(Throwable throwable) {
            logger.error("查询信息发生异常", throwable);
        }
    
        @Override
        public void onComplete() {
            this.onCompleteEvent(joinQueryParameter, resultMap);
        }
    
        /**
         * 查询初始化信息
         * @param JoinQueryParameter 
         * @return
         */
        protected abstract ResultMap queryOriginalData(JoinQueryParameter joinQueryParameter);
    
        /**
         * 加载数据完成后的事件
         *
         * @param JoinQueryParameter 
         * @param resultMap
         */
        protected abstract void onCompleteEvent(JoinQueryParameter joinQueryParameter, ResultMap  resultMap);
    }
    
    
    public abstract class AbstractObservableStrategy extends AbstractObserver {
    
        /**
         *
         * @param joinQueryParameter
         * @return
         */
        public AbstractObservableStrategy create(final JoinQueryParameter joinQueryParameter) {
            Observable.create(new ObservableOnSubscribe<JoinQueryParameter>(){
    
                @Override
                public void subscribe(ObservableEmitter<JoinQueryParameter> e) throws Exception {
                    e.onNext(joinQueryParameter);
                    e.onComplete();
                }
    
            }).subscribe(this);
            return this;
        }
    
        /**
         * 查询初始化依赖数据
         * @param joinQueryParameter
         * @return
         */
        protected abstract ResultMap queryOriginalData(JoinQueryParameter joinQueryParameter);
    
        /**
         * 加载数据完成后的事件
         *
         * @param joinQueryParameter
         * @param resultMap
         */
        protected abstract void onCompleteEvent(JoinQueryParameter joinQueryParameter, ResultMap resultMap);
    
        /**
         * 增加聚合查询方法
         *
         * @param joinQueryParameter
         * @param iQueryData
         * @return
         */
        protected Observable addObservable(JoinQueryParameter joinQueryParameter, final IQueryData iQueryData) {
            return Observable.just(joinQueryParameter)
                    .flatMap(new Function<JoinQueryParameter, ObservableSource<ResultMap>>() {
    
                        @Override
                        public ObservableSource apply(JoinQueryParameter joinQueryParameter) throws Exception {
                            return Observable.just(iQueryData.queryData(joinQueryParameter));
                        }
                    }).subscribeOn(Schedulers.io())
                  ;
        }
    }
    
    public interface IQueryData {
    
    
        /**
         * 查询列表
         * @param joinQueryParameter
         * @return
         */
        ResultMap queryData(JoinQueryParameter joinQueryParameter);
    }
    
    
    public class DefaultObservbleStrategy extends AbstractObservableStrategy {
    
        /**
         * 日志
         */
        public final static Logger logger = LoggerFactory.getLogger(DefaultObservbleStrategy.class);
    
        /**
         * 查询初始化信息
         * @param joinQueryParameter
         * @return
         */
        @Override
        protected ResultMap queryOriginalData(JoinQueryParameter joinQueryParameter) {
            return new GoodsQueryDataImpl().queryData(joinQueryParameter);
        }
    
        /**
         * 加载数据完成后的事件
         *
         * @param joinQueryParameter
         * @param originalData
         */
        @Override
        protected void onCompleteEvent(JoinQueryParameter joinQueryParameter, ResultMap originalData) {
            ResultMap resultMap = (ResultMap)Observable.zip(
                    Observable.just(originalData),
                    addObservable(joinQueryParameter, new QueryCouponImpl()),
                    addObservable(joinQueryParameter, new QueryEvaluationImpl()),
                    new Function5<ResultMap, ResultMap, ResultMap, ResultMap, ResultMap, ResultMap>() {
    
                        @Override
                        public ResultMap apply(ResultMap goodsMap, ResultMap couponMap, ResultMap couponEvaluation) throws Exception {
                            return join.map(goodsMap, couponMap, couponEvaluation);
                        }
    
                    }
            ).blockingLast();
            DefaultObservbleStrategy.this.setResultMap(resultMap);
        }
    }
    
    

    以上是实现时写的大致骨架,由于具体业务远远比此文列举的需求要复杂,故而整的略微复杂了,但是找到关键的节点代码即可。

    Observable.zip(
                    Observable.just(joinQueryParameter)
                    .flatMap(new Function<JoinQueryParameter, ObservableSource<ResultMap>>() {
    
                        @Override
                        public ObservableSource apply(JoinQueryParameter joinQueryParameter) throws Exception {
                            return Observable.just(iQueryData.queryData(joinQueryParameter));
                        }
                    }).subscribeOn(Schedulers.io()),
                     Observable.just(joinQueryParameter)
                    .flatMap(new Function<JoinQueryParameter, ObservableSource<ResultMap>>() {
    
                        @Override
                        public ObservableSource apply(JoinQueryParameter joinQueryParameter) throws Exception {
                            return Observable.just(iQueryData.queryData(joinQueryParameter));
                        }
                    }).subscribeOn(Schedulers.io()),
                    new Function<ResultMap, ResultMap, ResultMap>() {
    
                        @Override
                        public ResultMap apply(ResultMap goodsMap, ResultMap couponMap, ResultMap couponEvaluation) throws Exception {
                            return join.map(goodsMap, couponMap, couponEvaluation);
                        }
    
                    }
            ).blockingLast();
    

    总而言之,微服务盛行后,服务与服务之间的数据聚合愈发复杂,让人有点头疼。

    相关文章

      网友评论

          本文标题:RxJava小试牛刀

          本文链接:https://www.haomeiwen.com/subject/xmoxvqtx.html