本文来自网易云社区
作者:张伟
背景
限时购是网易考拉目前比较常用的促销形式,但是前期创建一个限时购活动时需要各个BU按照指定的Excel格式进行选品提报,为了保证提报数据准确,运营需要人肉校验很多信息:
-
是否已经参加了限时购
-
在线价与活动价的对比校验
-
大促价格校验
-
是否有互斥活动
-
库存检查
-
SKU 完整性
-
价格预警检查
-
商品可用性校验
这么多人肉的校验数据来自不同的系统,获取数据,检查数据;这是一件很繁琐且工作量巨大的事情。在这样的背景下,促销服务提供了限时购促销校验小工具,极大减少了运营人员配置限时购的工作量。
实现
上图是操作界面和校验结果格式示例,用户只需要按照模版将数据导入,在导入的时候完成数据基本格式校验!通过格式校验后,点击【校验】即可完成对数据的各种数据校验并将校验数据结果写到导出文件中,只需要根据到处文件提示进行修改即可。
优化
上文只是对该功能的一个简单介绍,在具体实现过程中需要调用各种服务,需要从商品基础服务获取商品信息,类目数据,供应商信息,SKU规格信息,限购信息包含,仓库信息等! 该工具上线初期,虽然功能得到了需求方的认可,但是性能问题比较突出;卡顿比较严重!为了给运营提供一个更好用的工具,通过优化代码结构,将代码中涉及到循环处理的地方都改为批量调用!此时性能有了一个明显的改善,但是校验数据量比较大的情况下还是会存在一定的性能问题,通过分析发现是因为调用外部系统耗时较大,此时通过dubbo异步并发调用的优化方式进行优化,使得该工具性能得到了极大提升。
dubbo 异步工具
在工作中总会有一些服务响应时间会和请求参数中的数量正相关,或者服务提供者明确限制了请求参数的大小,此时该如何进行优化呢? 在很多代码中都可以发现这样都写法
for(x:y){ xxxService.xxx(p); }
使用循环同步的调用方式进行处理,这样该方法的响应时间就会随着请求参数的个数增长而进行增长。有没有更好的方式呢?下面就介绍一下使用dubbo异步的方式完成对该方法的优化
使用xml配置完成异步调用
基于 NIO 的非阻塞实现并行调用,客户端不需要启动多线程即可完成并行调用多个远程服务,相对多线程开销较小。
在 consumer.xml 中配置:
<dubbo:reference id="fooService" interface="com.alibaba.foo.FooService"> <dubbo:method name="findFoo" async="true" /></dubbo:reference><dubbo:reference id="barService" interface="com.alibaba.bar.BarService"> <dubbo:method name="findBar" async="true" /></dubbo:reference>
调用代码:
// 此调用会立即返回nullfooService.findFoo(fooId);// 拿到调用的Future引用,当结果返回后,会被通知和设置到此FutureFuture<Foo> fooFuture = RpcContext.getContext().getFuture(); // 此调用会立即返回nullbarService.findBar(barId);// 拿到调用的Future引用,当结果返回后,会被通知和设置到此FutureFuture<Bar> barFuture = RpcContext.getContext().getFuture(); // 此时findFoo和findBar的请求同时在执行,客户端不需要启动多线程来支持并行,而是借助NIO的非阻塞完成// 如果foo已返回,直接拿到返回值,否则线程wait住,等待foo返回后,线程会被notify唤醒Foo foo = fooFuture.get(); // 同理等待bar返回Bar bar = barFuture.get(); // 如果foo需要5秒返回,bar需要6秒返回,实际只需等6秒,即可获取到foo和bar,进行接下来的处理。
你也可以设置是否等待消息发出:
-
sent="true" 等待消息发出,消息发送失败将抛出异常。
-
sent="false" 不等待消息发出,将消息放入 IO 队列,即刻返回。
<dubbo:method name="findFoo" async="true" sent="true" />
如果你只是想异步,完全忽略返回值,可以配置 return="false",以减少 Future 对象的创建和管理成本:
<dubbo:method name="findFoo" async="true" return="false" />
关于使用xml配置完成异步的缺点
如果在xml中配置会导致工程中所有引用者都使用了异步方法,修改成本较高
dubbo异步调用工具
通过编码方式实现dubbo的异步调用,通过使用RpcContext.getContext().asyncCall()完成异步调用,考虑到促销这边需要将分页大小到最小值限制为200,在促销优化中使用该方式进行优化,效果明显
/** * Desc:Dubbo异步调用辅助工具类 * * @author wei.zw * @since 2017年7月14日 下午4:36:08 * @version v 0.1 */public class RpcAsyncUtil { private static final Logger logger = LoggerFactory.getLogger(RpcAsyncUtil.class); /** * dubbo异步调用,将远程调用结果组合后返回;本次耗时取决于最大的耗时 * * @param paramList * 需要进行远程调用所有参数列表长度 * @param pageSize * 一次远程调用使用的列表最大长度 * @param syncCallable * 具体dubbo调用 * @return * @author wei.zw */ public static <T, R> List<R> async(List<T> paramList, int pageSize, final SyncCallable<T, R> syncCallable) { if (pageSize < 200) { pageSize = 200; } List<List<T>> subList = ListUtils.subList(paramList, pageSize); List<Future<List<R>>> futures = new ArrayList<>(); for (final List<T> sub : subList) { futures.add(RpcContext.getContext().asyncCall(new Callable<List<R>>() { @Override public List<R> call() throws Exception { return syncCallable.call(sub); } })); } List<R> result = new ArrayList<>(); try { for (Future<List<R>> future : futures) { List<R> list = future.get(); if (CollectionUtils.isNotEmpty(list)) { result.addAll(list); } } } catch (Exception e) { logger.warn(ToStringBuilder.reflectionToString(syncCallable) + ",调用异常", e); throw new RpcException(e); } return result; } public static interface SyncCallable<T, R> { /** * 该方法中只能是一个远程调用,不能使用其他方法 * * @param params * @return * @author wei.zw */ public List<R> call(List<T> params); } public static interface SyncMapCallable<T,K, R> { /** * 该方法中只能是一个远程调用,不能使用其他方法 * * @param params * @return * @author wei.zw */ public Map<K, R> call(List<T> params); } }
-
使用示例:
Mybatis批处理
final SqlSession session= getGenericSqlSessionFactory().openSession(ExecutorType.BATCH);for( ){ session.update(); } session.commit();
PS:关于事务问题 Mybatis与Spring集成时,如果外面存在事务,则获取到的connection是同一个
public SqlSession openSession(ExecutorType execType) { return openSessionFromDataSource(execType, null, false); }private SqlSession openSessionFromDataSource(ExecutorType execType, TransactionIsolationLevel level, boolean autoCommit) { Transaction tx = null; try { final Environment environment = configuration.getEnvironment(); final TransactionFactory transactionFactory = getTransactionFactoryFromEnvironment(environment); //从environment中获取dataSource,并根据dataSource创建事务 tx = transactionFactory.newTransaction(environment.getDataSource(), level, autoCommit); final Executor executor = configuration.newExecutor(tx, execType); return new DefaultSqlSession(configuration, executor, autoCommit); } catch (Exception e) { closeTransaction(tx); // may have fetched a connection so lets call close() throw ExceptionFactory.wrapException("Error opening session. Cause: " + e, e); } finally { ErrorContext.instance().reset(); } }public SpringManagedTransaction(DataSource dataSource) { notNull(dataSource, "No DataSource specified"); this.dataSource = dataSource; } private void openConnection() throws SQLException { //从dataSouce中获取connection this.connection = DataSourceUtils.getConnection(this.dataSource); this.autoCommit = this.connection.getAutoCommit(); this.isConnectionTransactional = isConnectionTransactional(this.connection, this.dataSource); if (this.logger.isDebugEnabled()) { this.logger.debug( "JDBC Connection [" + this.connection + "] will" + (this.isConnectionTransactional ? " " : " not ") + "be managed by Spring"); } }
最佳实践
这样在需要进行批处理的使用只需要调用基类中的方法,而且不需要关注事务问题,即使在方法内部调用也存在事务问题。
网易云大礼包:https://www.163yun.com/gift
本文来自网易实践者社区,经作者张伟授权发布。
相关文章:
【推荐】 如何有效的杜绝“羊毛党“的薅羊毛行为?
【推荐】 Kudu:支持快速分析的新型Hadoop存储系统
网友评论