美文网首页
Java互联网架构-高性能异步框架

Java互联网架构-高性能异步框架

作者: 农哥小鹏 | 来源:发表于2020-09-18 21:22 被阅读0次

序言

com.lietou.common.asyn4j 封装了一个异步方法调用框架,包括了异步方法执行、异步回调执行、异步方法持久化并且支持Spring.

功能:

异步工作可以设置执行权重

支持异步回调

任务持久化及恢复功能

Spring 支持

定期打印任务运行状态信息

一丶普通调用方法

例子:

主方法

public static void main(String[] args) {

// 初始化异步工作服务

// 默认参数,队列长度:10000,添加时间:10毫秒,工作线程数:cpu个数,回调线程数:cpu个数,停止服务时间:5秒

AsynService asynService = AsynServiceImpl.newInstance();

// 自定义参数

AsynService anycService = AsynServiceImpl.newInstance(10000, 10L, 100, 50, 5000);

// 异步工作持久化工作器

// 参数,保存目录;监测时间,单位秒

RejectedHandler rejectedHandler = new ScheduledRejectedHandler("/Users/yuanxl/empolder/tmp", 600);

anycService.setRejectedHandler(rejectedHandler);

// 异步工作异常处理器

anycService.setErrorHandler(new DefaultErrorHandler());

// 异步工作合并处理器,自有批量执行需要合并结果的时候需要

// ResultMergeHandler merge = new TestMerge()

// asynService.setResultMergeHandler(resultMergeHandler);

// 启动服务

asynService.init();

// 添加任务,example

TestService testService = new TestService();

Class<?>[] type = { int.class };

for (int i = 0; i < 1000; i++) {

// 添加异步工作- TestService 的 incr 方法 ,方法参数 i

asynService.addJob(testService, "incr", type, new Object[] { i });

// 添加异步且回调工作

asynService.addJob(testService, "incr", type, new Object[] { j }, new TestCallback());

}

// 停止服务

asynService.close();

}

异步调用对象

public class TestService {

public String incr(int num) {

//System.out.println("incr: " + num);

return num + 1 + "";

}

}

回调方法

//回调需继承AsynCallback抽象类

public class TestCallback extends AsynCallback {

private static final long serialVersionUID = -9078828850204151543L;

@Override

public void doNotify() {

System.out.println("before callback value=" + methodResult);

System.out.println("after callback value=" + methodResult + " done");

}

}

结果合并器

public class TestMerge implements ResultMergeHandler {

private static final long serialVersionUID = -3514693047424233341L;

@Override

public Object merge(Object a, Object b) {

if (a == null) {

return b;

}

if (b == null) {

return a;

}

Integer num = (Integer)a;

Integer num1 = (Integer)b;

return num + num1;

}

}

二丶调用Spring Bean的异步方法.(推荐使用spring)

例子:

调用 Spring TestService 的 incr 方法

applicationContext.xml 加入

<bean id="springBeanUtil" class="com.lietou.common.asyn4j.spring.SpringContextUtils" />

<bean id="asynService" class="com.lietou.common.asyn4j.spring.AsynServiceFactoryBean" destroy-method="destroy">

<!--设置自定义相关参数 -->

<property name="queueCapacity" value="10000" />

<property name="addJobWaitTime" value="10" />

<property name="jobThreadNum" value="100" />

<property name="callbackThreadNum" value="50" />

<property name="closeServiceWaitTime" value="3000" />

<!--添加相关处理器 -->

<property name="errorHandler">

<bean class="com.lietou.common.asyn4j.core.handler.DefaultErrorHandler" />

</property>

<property name="rejectedHandler">

<bean

class="com.lietou.common.asyn4j.core.handler.ScheduledRejectedHandler">

<constructor-arg index="0" value="${file.path}" />

<constructor-arg index="1" value="600" />

</bean>

</property>

</bean>

<bean id="testService" class="com.lietou.common.asyn4j.service.TestService" />

public class TestMain {

public AsynService asynService;

public void setAsynService(AsynService asynService) {

this.asynService = asynService;

}

public void test(){

Class<?>[] type = { int.class };

for(int i=0; i<10000; i++){

asynService.addJobBySpring("testService", "incr", type, new Object[] { j });

}

}

}

三丶相关处理器

异步持久化工作器--(当工作队列工作数超过queueCapacity时由处里器处理,包括服务启动和关闭调用)

AsynService anycService = AsynServiceImpl.newInstance(10000, 10L, 100, 50, 5000);

RejectedHandler rejectedHandler = new ScheduledRejectedHandler("/Users/yuanxl/empolder/tmp", 600);

anycService.setRejectedHandler(rejectedHandler);

当工作队列中的工作超过10000个时,异步工作将由ScheduledRejectedHandler处理;

将任务持久化到文件,当系统启动时加载文件内容并执行,关闭时将未执行的任务持久化到文件。

可以自定义处理器,继承 RejectedHandler 抽象类即可。

异步工作异常处理器--(当工作执行出现异常时处理器)

anycService.setErrorHandler(new DefaultErrorHandler());

自定义处理器,继承 ErrorHandler 抽象类即可。

异步工作合并处理器--(只有批量执行需要合并结果的时候才需要)

asynService.setResultMergeHandler(new TestMerge());

自定义处理器,继承 ResultMergeHandler 接口即可。

四丶异步工作优先级

分成三个等级JobPriority.MIN, JobPriority.NORM, JobPriority.MAX 默认优先级为JobPriority.NORM。

五丶API使用说明

默认构造函数,创建一个异步服务

AsynServiceImpl.newInstance();

采用的默认参数为:

(queueCapacity)最大工作队列缓存工作数 – 10000(默认值)

(addJobWaitTime)当工作队列满时添加工作等待时间-- 10ms(默认值)

(jobThreadNum)异步工作执行线程池大小 ---- CPU核数/2 +1(默认值)

(callbackThreadNum)回调执行线程池大小 — CPU核数/2(默认值)

(closeServiceWaitTime) 服务关闭等待时间 ---- 5000s(默认值)

自定义参数构造函数,参数顺序对应前面的说明

AsynServiceImpl.newInstance (100000, 0L, 100, 50, 3000);

AsynServiceImpl 是线程安全的,可以初始化一个实例,所有程序再引用.

启动关闭服务

/**

* 启动服务

*/

public void init();

/**

* 关闭服务

*/

public void close();

添加任务接口API

/**

* 添加异步工作

*

* @param targetObject -- 目标对象(可以是 Class或Object)

* @param method -- 目标方法

*/

public void addJob(Object targetObject, String method);

/**

* 添加异步工作

*

* @param targetObject -- 目标对象(可以是 Class,Object)

* @param method -- 目标方法

* @param priority -- 优先级

*/

public void addJob(Object targetObject, String method, JobPriority priority);

/**

* 添加异步工作

*

* @param targetObject -- 目标对象(可以是 Class,Object)

* @param method -- 目标方法

* @param paramTypes -- 目标方法参数类型

* @param paramsValues -- 目标方法参数值

*/

public void addJob(Object targetObject, String method, Class<?>[] paramTypes, Object[] paramsValues);

/**

* 添加异步工作

*

* @param targetObject -- 目标对象(可以是 Class,Object)

* @param method -- 目标方法

* @param paramTypes -- 目标方法参数类型

* @param paramsValues -- 目标方法参数值

* @param priority -- 优先级

*/

public void addJob(Object targetObject, String method, Class<?>[] paramTypes, Object[] paramsValues,

JobPriority priority);

/**

* 添加异步工作

*

* @param targetObject -- 目标对象(可以是 Class,Object)

* @param method -- 目标方法

* @param paramTypes -- 目标方法参数类型

* @param paramsValues -- 目标方法参数值

* @param asynCallback -- 回调对象

*/

public void addJob(Object targetObject, String method, Class<?>[] paramTypes, Object[] paramsValues,

AsynCallback asynCallback);

/**

* 添加异步工作

*

* @param targetObject -- 目标对象(可以是 Class,Object)

* @param method -- 目标方法

* @param paramTypes -- 目标方法参数类型

* @param paramsValues -- 目标方法参数值

* @param asynCallback -- 回调对象

* @param priority -- 优先级

*/

public void addJob(Object targetObject, String method, Class<?>[] paramTypes, Object[] paramsValues,

AsynCallback asynCallback, JobPriority priority);

/**

* 批量添加异步工作

*

* @param targetObject -- 目标对象(可以是 Class,Object)

* @param method -- 目标方法

* @param paramTypes -- 目标方法参数类型

* @param paramsValues -- 目标方法参数值,list

* @param asynCallback -- 回调对象

*/

public void addJobs(Object targetObject, String method, Class<?>[] paramTypes, List<Object[]> paramsValues,

AsynCallback asynCallback);

/**

* 添加异步工作

*

* @param beanId -- springId

* @param method -- 目标方法

*/

public void addJobBySpring(String beanId, String method);

/**

* 添加异步工作

*

* @param beanId -- springId

* @param method -- 目标方法

* @param priority -- 优先级

*/

public void addJobBySpring(String beanId, String method, JobPriority priority);

/**

* 添加异步工作

*

* @param beanId -- springId

* @param method -- 目标方法

* @param paramTypes -- 目标方法参数类型

* @param paramsValues -- 目标方法参数值

*/

public void addJobBySpring(String beanId, String method, Class<?>[] paramTypes, Object[] paramsValues);

/**

* 添加异步工作

*

* @param beanId -- springId

* @param method -- 目标方法

* @param paramTypes -- 目标方法参数类型

* @param paramsValues -- 目标方法参数值

* @param priority -- 优先级

*/

public void addJobBySpring(String beanId, String method, Class<?>[] paramTypes, Object[] paramsValues,

JobPriority priority);

/**

* 添加异步工作

*

* @param beanId -- springId

* @param method -- 目标方法

* @param paramTypes -- 目标方法参数类型

* @param paramsValues -- 目标方法参数值

* @param asynCallback -- 回调方法

*/

public void addJobBySpring(String beanId, String method, Class<?>[] paramTypes, Object[] paramsValues,

AsynCallback asynCallback);

/**

* 添加异步工作

*

* @param beanId -- springId

* @param method -- 目标方法

* @param paramTypes -- 目标方法参数类型

* @param paramsValues -- 目标方法参数值

* @param asynCallback -- 回调方法

* @param priority -- 优先级

*/

public void addJobBySpring(String beanId, String method, Class<?>[] paramTypes, Object[] paramsValues,

AsynCallback asynCallback, JobPriority priority);

/**

* 批量添加异步工作

*

* @param beanId -- springId

* @param method -- 目标方法

* @param paramTypes -- 目标方法参数类型

* @param paramsValues -- 目标方法参数值,list

* @param asynCallback -- 回调方法

*/

public void addJobsBySpring(String beanId, String method, Class<?>[] paramTypes, List<Object[]> paramsValues,

AsynCallback asynCallback);

六丶注意

由于需要持久化任务,回调类及批量结果合并类需要实现序列化。

当目标对象是class是,类必须有无参构造函数。

相关文章

  • Java互联网架构-高性能异步框架

    序言 com.lietou.common.asyn4j 封装了一个异步方法调用框架,包括了异步方法执行、异步回调执...

  • 搭建web服务

    python架构: 1.sanic异步框架之中文文档 java架构:

  • Netty 总结

    简述: Netty 是一个基于JAVA NIO 类库的异步通信框架,用于创建异步非阻塞、基于事件驱动、高性能、高可...

  • 那些与Netty有关的知识点,你知道多少

    Netty 原理 Netty 是一个高性能、异步事件驱动的 NIO 框架,基于 JAVA NIO 提供的 API ...

  • 如何弄懂Netty高性能原理 与 RMI 实现方式

    Netty 原理 Netty 是一个高性能、异步事件驱动的 NIO 框架,基于 JAVA NIO 提供的 API...

  • Netty 原理+高性能

    Netty 原理 Netty 是一个高性能、异步事件驱动的NIO 框架,基于JAVA NIO 提供的API 实现。...

  • Dubbo 架构与实践

    1、Dubbo架构概述 1.1、什么是Dubbo? Apache dubbo是一款高性能的Java RPC框架,其...

  • Netty学习

    Netty是一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可...

  • Netty源代码解析一:EventLoop

    Netty是一个java的高性能同步/异步通讯框架,基于SEDA模型。最近因为要逐渐接触java项目,就看了下它的...

  • 6

    Netty: 1、总体描述, 应用场景Netty是一个高性能、异步事件驱动的NIO框架。封装了Java NIO那些...

网友评论

      本文标题:Java互联网架构-高性能异步框架

      本文链接:https://www.haomeiwen.com/subject/ieksyktx.html