序言
com.lietou.common.asyn4j 封装了一个异步方法调用框架,包括了异步方法执行、异步回调执行、异步方法持久化并且支持Spring.
功能:
异步工作可以设置执行权重
支持异步回调
任务持久化及恢复功能
Spring 支持
定期打印任务运行状态信息
一丶普通调用方法
例子:
主方法
public static void main(String[] args) {
// 初始化异步工作服务
// 默认参数,队列长度:10000,添加时间:10毫秒,工作线程数:cpu个数,回调线程数:cpu个数,停止服务时间:5秒
AsynService asynService = AsynServiceImpl.newInstance();
// 自定义参数
AsynService anycService = AsynServiceImpl.newInstance(10000, 10L, 100, 50, 5000);
// 异步工作持久化工作器
// 参数,保存目录;监测时间,单位秒
RejectedHandler rejectedHandler = new ScheduledRejectedHandler("/Users/yuanxl/empolder/tmp", 600);
anycService.setRejectedHandler(rejectedHandler);
// 异步工作异常处理器
anycService.setErrorHandler(new DefaultErrorHandler());
// 异步工作合并处理器,自有批量执行需要合并结果的时候需要
// ResultMergeHandler merge = new TestMerge()
// asynService.setResultMergeHandler(resultMergeHandler);
// 启动服务
asynService.init();
// 添加任务,example
TestService testService = new TestService();
Class<?>[] type = { int.class };
for (int i = 0; i < 1000; i++) {
// 添加异步工作- TestService 的 incr 方法 ,方法参数 i
asynService.addJob(testService, "incr", type, new Object[] { i });
// 添加异步且回调工作
asynService.addJob(testService, "incr", type, new Object[] { j }, new TestCallback());
}
// 停止服务
asynService.close();
}
异步调用对象
public class TestService {
public String incr(int num) {
//System.out.println("incr: " + num);
return num + 1 + "";
}
}
回调方法
//回调需继承AsynCallback抽象类
public class TestCallback extends AsynCallback {
private static final long serialVersionUID = -9078828850204151543L;
@Override
public void doNotify() {
System.out.println("before callback value=" + methodResult);
System.out.println("after callback value=" + methodResult + " done");
}
}
结果合并器
public class TestMerge implements ResultMergeHandler {
private static final long serialVersionUID = -3514693047424233341L;
@Override
public Object merge(Object a, Object b) {
if (a == null) {
return b;
}
if (b == null) {
return a;
}
Integer num = (Integer)a;
Integer num1 = (Integer)b;
return num + num1;
}
}
二丶调用Spring Bean的异步方法.(推荐使用spring)
例子:
调用 Spring TestService 的 incr 方法
applicationContext.xml 加入
<bean id="springBeanUtil" class="com.lietou.common.asyn4j.spring.SpringContextUtils" />
<bean id="asynService" class="com.lietou.common.asyn4j.spring.AsynServiceFactoryBean" destroy-method="destroy">
<!--设置自定义相关参数 -->
<property name="queueCapacity" value="10000" />
<property name="addJobWaitTime" value="10" />
<property name="jobThreadNum" value="100" />
<property name="callbackThreadNum" value="50" />
<property name="closeServiceWaitTime" value="3000" />
<!--添加相关处理器 -->
<property name="errorHandler">
<bean class="com.lietou.common.asyn4j.core.handler.DefaultErrorHandler" />
</property>
<property name="rejectedHandler">
<bean
class="com.lietou.common.asyn4j.core.handler.ScheduledRejectedHandler">
<constructor-arg index="0" value="${file.path}" />
<constructor-arg index="1" value="600" />
</bean>
</property>
</bean>
<bean id="testService" class="com.lietou.common.asyn4j.service.TestService" />
public class TestMain {
public AsynService asynService;
public void setAsynService(AsynService asynService) {
this.asynService = asynService;
}
public void test(){
Class<?>[] type = { int.class };
for(int i=0; i<10000; i++){
asynService.addJobBySpring("testService", "incr", type, new Object[] { j });
}
}
}
三丶相关处理器
异步持久化工作器--(当工作队列工作数超过queueCapacity时由处里器处理,包括服务启动和关闭调用)
AsynService anycService = AsynServiceImpl.newInstance(10000, 10L, 100, 50, 5000);
RejectedHandler rejectedHandler = new ScheduledRejectedHandler("/Users/yuanxl/empolder/tmp", 600);
anycService.setRejectedHandler(rejectedHandler);
当工作队列中的工作超过10000个时,异步工作将由ScheduledRejectedHandler处理;
将任务持久化到文件,当系统启动时加载文件内容并执行,关闭时将未执行的任务持久化到文件。
可以自定义处理器,继承 RejectedHandler 抽象类即可。
异步工作异常处理器--(当工作执行出现异常时处理器)
anycService.setErrorHandler(new DefaultErrorHandler());
自定义处理器,继承 ErrorHandler 抽象类即可。
异步工作合并处理器--(只有批量执行需要合并结果的时候才需要)
asynService.setResultMergeHandler(new TestMerge());
自定义处理器,继承 ResultMergeHandler 接口即可。
四丶异步工作优先级
分成三个等级JobPriority.MIN, JobPriority.NORM, JobPriority.MAX 默认优先级为JobPriority.NORM。
五丶API使用说明
默认构造函数,创建一个异步服务
AsynServiceImpl.newInstance();
采用的默认参数为:
(queueCapacity)最大工作队列缓存工作数 – 10000(默认值)
(addJobWaitTime)当工作队列满时添加工作等待时间-- 10ms(默认值)
(jobThreadNum)异步工作执行线程池大小 ---- CPU核数/2 +1(默认值)
(callbackThreadNum)回调执行线程池大小 — CPU核数/2(默认值)
(closeServiceWaitTime) 服务关闭等待时间 ---- 5000s(默认值)
自定义参数构造函数,参数顺序对应前面的说明
AsynServiceImpl.newInstance (100000, 0L, 100, 50, 3000);
AsynServiceImpl 是线程安全的,可以初始化一个实例,所有程序再引用.
启动关闭服务
/**
* 启动服务
*/
public void init();
/**
* 关闭服务
*/
public void close();
添加任务接口API
/**
* 添加异步工作
*
* @param targetObject -- 目标对象(可以是 Class或Object)
* @param method -- 目标方法
*/
public void addJob(Object targetObject, String method);
/**
* 添加异步工作
*
* @param targetObject -- 目标对象(可以是 Class,Object)
* @param method -- 目标方法
* @param priority -- 优先级
*/
public void addJob(Object targetObject, String method, JobPriority priority);
/**
* 添加异步工作
*
* @param targetObject -- 目标对象(可以是 Class,Object)
* @param method -- 目标方法
* @param paramTypes -- 目标方法参数类型
* @param paramsValues -- 目标方法参数值
*/
public void addJob(Object targetObject, String method, Class<?>[] paramTypes, Object[] paramsValues);
/**
* 添加异步工作
*
* @param targetObject -- 目标对象(可以是 Class,Object)
* @param method -- 目标方法
* @param paramTypes -- 目标方法参数类型
* @param paramsValues -- 目标方法参数值
* @param priority -- 优先级
*/
public void addJob(Object targetObject, String method, Class<?>[] paramTypes, Object[] paramsValues,
JobPriority priority);
/**
* 添加异步工作
*
* @param targetObject -- 目标对象(可以是 Class,Object)
* @param method -- 目标方法
* @param paramTypes -- 目标方法参数类型
* @param paramsValues -- 目标方法参数值
* @param asynCallback -- 回调对象
*/
public void addJob(Object targetObject, String method, Class<?>[] paramTypes, Object[] paramsValues,
AsynCallback asynCallback);
/**
* 添加异步工作
*
* @param targetObject -- 目标对象(可以是 Class,Object)
* @param method -- 目标方法
* @param paramTypes -- 目标方法参数类型
* @param paramsValues -- 目标方法参数值
* @param asynCallback -- 回调对象
* @param priority -- 优先级
*/
public void addJob(Object targetObject, String method, Class<?>[] paramTypes, Object[] paramsValues,
AsynCallback asynCallback, JobPriority priority);
/**
* 批量添加异步工作
*
* @param targetObject -- 目标对象(可以是 Class,Object)
* @param method -- 目标方法
* @param paramTypes -- 目标方法参数类型
* @param paramsValues -- 目标方法参数值,list
* @param asynCallback -- 回调对象
*/
public void addJobs(Object targetObject, String method, Class<?>[] paramTypes, List<Object[]> paramsValues,
AsynCallback asynCallback);
/**
* 添加异步工作
*
* @param beanId -- springId
* @param method -- 目标方法
*/
public void addJobBySpring(String beanId, String method);
/**
* 添加异步工作
*
* @param beanId -- springId
* @param method -- 目标方法
* @param priority -- 优先级
*/
public void addJobBySpring(String beanId, String method, JobPriority priority);
/**
* 添加异步工作
*
* @param beanId -- springId
* @param method -- 目标方法
* @param paramTypes -- 目标方法参数类型
* @param paramsValues -- 目标方法参数值
*/
public void addJobBySpring(String beanId, String method, Class<?>[] paramTypes, Object[] paramsValues);
/**
* 添加异步工作
*
* @param beanId -- springId
* @param method -- 目标方法
* @param paramTypes -- 目标方法参数类型
* @param paramsValues -- 目标方法参数值
* @param priority -- 优先级
*/
public void addJobBySpring(String beanId, String method, Class<?>[] paramTypes, Object[] paramsValues,
JobPriority priority);
/**
* 添加异步工作
*
* @param beanId -- springId
* @param method -- 目标方法
* @param paramTypes -- 目标方法参数类型
* @param paramsValues -- 目标方法参数值
* @param asynCallback -- 回调方法
*/
public void addJobBySpring(String beanId, String method, Class<?>[] paramTypes, Object[] paramsValues,
AsynCallback asynCallback);
/**
* 添加异步工作
*
* @param beanId -- springId
* @param method -- 目标方法
* @param paramTypes -- 目标方法参数类型
* @param paramsValues -- 目标方法参数值
* @param asynCallback -- 回调方法
* @param priority -- 优先级
*/
public void addJobBySpring(String beanId, String method, Class<?>[] paramTypes, Object[] paramsValues,
AsynCallback asynCallback, JobPriority priority);
/**
* 批量添加异步工作
*
* @param beanId -- springId
* @param method -- 目标方法
* @param paramTypes -- 目标方法参数类型
* @param paramsValues -- 目标方法参数值,list
* @param asynCallback -- 回调方法
*/
public void addJobsBySpring(String beanId, String method, Class<?>[] paramTypes, List<Object[]> paramsValues,
AsynCallback asynCallback);
六丶注意
由于需要持久化任务,回调类及批量结果合并类需要实现序列化。
当目标对象是class是,类必须有无参构造函数。
网友评论